Skip to main content

Implementing Kafka Integration with NestJS Workflow

This documentation explains how to integrate Apache Kafka with the NestJS Workflow engine to create event-driven workflows that respond to Kafka messages.

Prerequisites

  • A running NestJS application
  • Basic understanding of workflow concepts
  • Access to a Kafka broker

Installation

First, install the NestJS Workflow package:

npm install @jescrich/nestjs-workflow

You'll also need the Kafka client libraries:

npm install kafkajs

Basic Kafka Integration

Step 1: Configure Kafka in Your Workflow Definition

Add a Kafka property to your workflow definition that specifies:

  • Kafka broker addresses
  • Topic-to-event mappings
import { WorkflowDefinition } from '@jescrich/nestjs-workflow';
import { Order, OrderEvent, OrderStatus } from '../models/order.model';

export const orderWorkflowDefinition = (entity: Order): WorkflowDefinition<Order, any, OrderEvent, OrderStatus> => {
return {
// Standard workflow configuration
FinalStates: [OrderStatus.Completed, OrderStatus.Failed],
IdleStates: [OrderStatus.Pending, OrderStatus.Processing, OrderStatus.Completed, OrderStatus.Failed],
Transitions: [
// Your transitions here
],
FailedState: OrderStatus.Failed,

// Kafka configuration
Kafka: {
brokers: 'localhost:9092', // Comma-separated list for multiple brokers
events: [
{ topic: 'orders.submitted', event: OrderEvent.Submit },
{ topic: 'orders.completed', event: OrderEvent.Complete },
{ topic: 'orders.failed', event: OrderEvent.Fail }
]
},

Entity: {
// Entity configuration
new: () => new Order(),
update: async (entity: Order, status: OrderStatus) => {
entity.status = status;
return entity;
},
load: async (urn: string) => {
// Load entity from your database
return yourRepository.findByUrn(urn);
},
status: (entity: Order) => entity.status,
urn: (entity: Order) => entity.urn
}
};
};

order-workflow.definition.ts

Step 2: Register the Workflow Module

Register your workflow with the Kafka configuration in your module:

import { Module } from '@nestjs/common';
import { WorkflowModule } from '@jescrich/nestjs-workflow';
import { orderWorkflowDefinition } from '../workflows/order-workflow.definition';
import { OrderRepository } from '../repositories/order.repository';
import { OrderService } from '../services/order.service';

@Module({
imports: [
WorkflowModule.register({
name: 'orderWorkflow',
definition: orderWorkflowDefinition,
}),
],
providers: [OrderRepository, OrderService],
exports: [OrderService],
})
export class OrderModule {}

order.module.ts

Message Format Requirements

For the workflow engine to process Kafka messages correctly, your messages must include:

  • An entity URN to identify which entity the message applies to
  • Any additional payload data needed for workflow processing

Example Kafka message format:

{
"urn": "urn:order:123",
"price": 150,
"items": ["Item 1", "Item 2"]
}

Advanced Kafka Configuration

For more advanced Kafka configurations, you can provide additional options:

Kafka: {
brokers: 'kafka-broker-1:9092,kafka-broker-2:9092',
clientId: 'order-workflow-service',
groupId: 'order-workflow-consumers',
ssl: true,
sasl: {
mechanism: 'plain',
username: 'your-username',
password: 'your-password'
},
events: [
{ topic: 'orders.submitted', event: OrderEvent.Submit },
{ topic: 'orders.completed', event: OrderEvent.Complete },
{ topic: 'orders.failed', event: OrderEvent.Fail }
]
}

order-workflow.definition.ts

Publishing Events to Kafka

import { Injectable } from '@nestjs/common';
import { WorkflowService } from '@jescrich/nestjs-workflow';
import { Order, OrderEvent, OrderStatus } from '../models/order.model';
import { Kafka } from 'kafkajs';

@Injectable()
export class OrderService {
private kafka: Kafka;
private producer;

constructor(
private readonly workflowService: WorkflowService<Order, any, OrderEvent, OrderStatus>,
) {
// Initialize Kafka producer
this.kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
this.producer = this.kafka.producer();
this.producer.connect();
}

async submitOrder(orderId: string) {
// Publish to Kafka
await this.producer.send({
topic: 'orders.submitted',
messages: [
{
value: JSON.stringify({
urn: `urn:order:${orderId}`,
timestamp: new Date().toISOString()
})
},
],
});

// The workflow will automatically process this event when it receives it from Kafka
return { success: true, message: 'Order submission event published' };
}
}

order.service.ts

Error Handling

When integrating with Kafka, implement proper error handling:

// In your workflow definition
Transitions: [
{
from: OrderStatus.Processing,
to: OrderStatus.Failed,
event: OrderEvent.Fail,
actions: [
async (entity: Order, payload: any) => {
// Log the failure
console.error(`Order ${entity.urn} failed processing`, payload);

// You could also publish to a dead letter queue
await kafkaProducer.send({
topic: 'orders.deadletter',
messages: [{ value: JSON.stringify({ urn: entity.urn, error: payload.error }) }],
});

return entity;
}
]
}
]

order-workflow.definition.ts

Testing Kafka Integration

For testing, you can use an in-memory Kafka implementation or mock the Kafka client:

import { Test, TestingModule } from '@nestjs/testing';
import { WorkflowModule } from '@jescrich/nestjs-workflow';
import { orderWorkflowDefinition } from '../workflows/order-workflow.definition';

// Mock Kafka client
jest.mock('kafkajs', () => {
return {
Kafka: jest.fn().mockImplementation(() => {
return {
consumer: jest.fn().mockReturnValue({
connect: jest.fn(),
subscribe: jest.fn(),
run: jest.fn(),
disconnect: jest.fn(),
}),
};
}),
};
});

describe('Order Workflow with Kafka', () => {
let app;

beforeEach(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [
WorkflowModule.register({
name: 'orderWorkflow',
definition: orderWorkflowDefinition,
}),
],
}).compile();

app = moduleFixture.createNestApplication();
await app.init();
});

it('should process Kafka messages correctly', async () => {
// Test implementation
});
});

order-workflow.spec.ts

Best Practices

  • Consumer Groups: Use meaningful consumer group IDs to ensure proper message distribution
  • Error Handling: Implement robust error handling for Kafka connection issues
  • Message Validation: Validate incoming Kafka messages before processing
  • Idempotency: Design your workflow to handle duplicate messages gracefully
  • Monitoring: Set up monitoring for your Kafka consumers to track performance and errors

Conclusion

By integrating Kafka with NestJS Workflow, you can create powerful event-driven workflows that respond to messages from your event streaming platform. This enables building scalable, loosely-coupled systems where workflow state transitions are triggered by events flowing through your Kafka topics.