Skip to main content

Best Practices

This guide covers production-ready best practices for using the NestJS Kafka Client effectively and safely.

Message Design

Use Meaningful Keys

// ✅ Good: Use meaningful keys for partitioning
await this.kafkaClient.send('orders', {
key: order.customerId, // Groups orders by customer
value: JSON.stringify(order),
});

// ❌ Bad: Random or no keys
await this.kafkaClient.send('orders', {
key: Math.random().toString(), // Defeats partitioning
value: JSON.stringify(order),
});

Include Idempotency Keys

// ✅ Good: Always include idempotency keys for critical operations
await this.kafkaClient.send('payments', {
key: payment.orderId,
value: JSON.stringify(payment),
headers: {
'idempotency-key': payment.transactionId,
'timestamp': new Date().toISOString(),
'version': '1.0',
},
});

Message Schema Evolution

// ✅ Good: Design for schema evolution
interface OrderEventV1 {
version: '1.0';
orderId: string;
customerId: string;
amount: number;
// New fields should be optional
currency?: string;
}

interface OrderEventV2 {
version: '2.0';
orderId: string;
customerId: string;
amount: number;
currency: string; // Now required
// Always keep backward compatibility
items?: OrderItem[];
}

Consumer Design Patterns

Batch Processing for High Throughput

// ✅ Good: Use batch processing for high-volume topics
@Consumer('analytics-events', {
batch: true,
batchSize: 500,
batchTimeout: 5000,
groupByKey: true,
})
export class AnalyticsConsumer {
async handleBatch(messages: KafkaMessage[]) {
// Process in chunks to avoid memory issues
const chunks = this.chunkArray(messages, 100);

for (const chunk of chunks) {
await this.processChunk(chunk);
}
}
}

Single Message Processing for Low Latency

// ✅ Good: Use single message processing for real-time requirements
@Consumer('fraud-alerts', {
batch: false, // Process immediately
maxConcurrency: 10,
})
export class FraudAlertConsumer {
async handleMessage(message: KafkaMessage) {
const alert = JSON.parse(message.value.toString());

// Immediate processing for time-sensitive alerts
await this.processFraudAlert(alert);
}
}

Error Handling Strategy

// ✅ Good: Comprehensive error handling
@Consumer('orders', {
dlq: {
topic: 'orders-dlq',
maxRetries: 3,
retryDelay: (attempt) => Math.min(1000 * Math.pow(2, attempt), 30000),
shouldRetry: (error, message, attempt) => {
// Don't retry validation errors
if (error.name === 'ValidationError') return false;

// Don't retry after 3 attempts for business logic errors
if (error.name === 'BusinessLogicError' && attempt >= 3) return false;

// Retry transient errors
return error.name === 'TransientError';
},
},
})
export class OrderConsumer {
private readonly logger = new Logger(OrderConsumer.name);

async handleMessage(message: KafkaMessage) {
try {
const order = this.validateOrder(message);
await this.processOrder(order);
} catch (error) {
this.logger.error(`Order processing failed: ${error.message}`, {
orderId: message.headers['order-id'],
error: error.stack,
});
throw error; // Let DLQ handle it
}
}

private validateOrder(message: KafkaMessage): Order {
// Throw ValidationError for invalid messages
// These won't be retried
}
}

Performance Optimization

Batch Size Tuning

// ✅ Good: Tune batch sizes based on message size and processing time
@Consumer('small-messages', {
batch: true,
batchSize: 1000, // Larger batches for small messages
batchTimeout: 2000,
})
export class SmallMessageConsumer {}

@Consumer('large-messages', {
batch: true,
batchSize: 10, // Smaller batches for large messages
batchTimeout: 10000,
})
export class LargeMessageConsumer {}

Memory Management

// ✅ Good: Process large batches in chunks
@Consumer('large-volume-topic', {
batch: true,
batchSize: 1000,
})
export class MemoryEfficientConsumer {
async handleBatch(messages: KafkaMessage[]) {
// Process in smaller chunks to manage memory
const chunkSize = 50;

for (let i = 0; i < messages.length; i += chunkSize) {
const chunk = messages.slice(i, i + chunkSize);
await this.processChunk(chunk);

// Optional: Force garbage collection for very large batches
if (i % 500 === 0) {
global.gc?.();
}
}
}
}

Connection Optimization

// ✅ Good: Optimize connection settings for your environment
KafkaModule.forRoot({
clientId: 'my-app',
brokers: ['kafka-1:9092', 'kafka-2:9092'],

// Optimize for your network conditions
connectionTimeout: 3000,
requestTimeout: 30000,

// Producer optimization
producer: {
maxInFlightRequests: 5, // Balance throughput vs memory
idempotent: true, // Prevent duplicates
compression: 'gzip', // Reduce network usage
},

// Consumer optimization
consumer: {
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytes: 1048576, // 1MB - adjust based on message size
},
})

Monitoring and Observability

Comprehensive Logging

@Consumer('orders')
export class OrderConsumer {
private readonly logger = new Logger(OrderConsumer.name);

async handleMessage(message: KafkaMessage) {
const startTime = Date.now();
const orderId = message.headers['order-id'];

this.logger.log(`Processing order ${orderId}`, {
partition: message.partition,
offset: message.offset,
});

try {
await this.processOrder(message);

const duration = Date.now() - startTime;
this.logger.log(`Order ${orderId} processed successfully in ${duration}ms`);
} catch (error) {
this.logger.error(`Order ${orderId} processing failed`, {
error: error.message,
stack: error.stack,
partition: message.partition,
offset: message.offset,
});
throw error;
}
}
}

Metrics Collection

@Injectable()
export class KafkaMetricsService {
private readonly metrics = new Map<string, number>();

@EventListener('kafka.message.consumed')
onMessageConsumed(event: { topic: string; processingTime: number }) {
this.incrementCounter(`messages.consumed.${event.topic}`);
this.recordHistogram(`processing.time.${event.topic}`, event.processingTime);
}

@EventListener('kafka.error')
onError(event: { topic: string; error: Error }) {
this.incrementCounter(`errors.${event.topic}.${event.error.name}`);
}

@Cron('0 * * * * *') // Every minute
reportMetrics() {
// Send metrics to your monitoring system
this.sendToMonitoring(Object.fromEntries(this.metrics));
this.metrics.clear();
}
}

Health Checks

@Controller('health')
export class HealthController {
constructor(
private kafkaHealth: KafkaHealthIndicator,
private metricsService: KafkaMetricsService,
) {}

@Get('kafka')
async checkKafka() {
const health = await this.kafkaHealth.isHealthy('kafka');
const metrics = this.metricsService.getMetrics();

return {
...health,
metrics: {
messagesPerMinute: metrics.messagesConsumed,
errorRate: metrics.errors / metrics.messagesConsumed,
avgProcessingTime: metrics.avgProcessingTime,
},
};
}
}

Security Best Practices

Authentication and Authorization

// ✅ Good: Use strong authentication
KafkaModule.forRoot({
clientId: 'my-app',
brokers: ['kafka:9092'],

ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('/certs/ca-cert.pem', 'utf-8')],
key: fs.readFileSync('/certs/client-key.pem', 'utf-8'),
cert: fs.readFileSync('/certs/client-cert.pem', 'utf-8'),
},

sasl: {
mechanism: 'scram-sha-256', // Use strong SASL mechanism
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
})

Message Encryption

// ✅ Good: Encrypt sensitive data
@Injectable()
export class SecureOrderService {
constructor(
private kafkaClient: KafkaClient,
private encryptionService: EncryptionService,
) {}

async createOrder(order: Order) {
// Encrypt sensitive fields
const encryptedOrder = {
...order,
customerData: await this.encryptionService.encrypt(order.customerData),
paymentInfo: await this.encryptionService.encrypt(order.paymentInfo),
};

await this.kafkaClient.send('orders', {
key: order.customerId,
value: JSON.stringify(encryptedOrder),
headers: {
'encryption-version': '1.0',
},
});
}
}

Input Validation

// ✅ Good: Always validate input
@Consumer('orders')
export class OrderConsumer {
async handleMessage(message: KafkaMessage) {
// Validate message structure
const order = this.validateAndParseOrder(message);

// Sanitize input
const sanitizedOrder = this.sanitizeOrder(order);

await this.processOrder(sanitizedOrder);
}

private validateAndParseOrder(message: KafkaMessage): Order {
try {
const order = JSON.parse(message.value.toString());

// Use a validation library like Joi or class-validator
const { error, value } = orderSchema.validate(order);
if (error) {
throw new ValidationError(`Invalid order: ${error.message}`);
}

return value;
} catch (error) {
throw new ValidationError(`Failed to parse order: ${error.message}`);
}
}
}

Deployment Best Practices

Environment Configuration

// ✅ Good: Environment-specific configuration
const getKafkaConfig = (): KafkaModuleOptions => {
const environment = process.env.NODE_ENV;

const baseConfig = {
clientId: process.env.KAFKA_CLIENT_ID,
brokers: process.env.KAFKA_BROKERS.split(','),
};

switch (environment) {
case 'development':
return {
...baseConfig,
logLevel: 'debug',
producer: { allowAutoTopicCreation: true },
};

case 'staging':
return {
...baseConfig,
logLevel: 'info',
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
};

case 'production':
return {
...baseConfig,
logLevel: 'warn',
ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('/certs/ca-cert.pem', 'utf-8')],
key: fs.readFileSync('/certs/client-key.pem', 'utf-8'),
cert: fs.readFileSync('/certs/client-cert.pem', 'utf-8'),
},
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
producer: {
idempotent: true,
maxInFlightRequests: 5,
},
};

default:
throw new Error(`Unknown environment: ${environment}`);
}
};

Graceful Shutdown

// ✅ Good: Implement proper shutdown handling
@Injectable()
export class AppService implements OnApplicationShutdown {
constructor(private kafkaClient: KafkaClient) {}

async onApplicationShutdown(signal?: string) {
this.logger.log(`Received shutdown signal: ${signal}`);

// Stop accepting new messages
await this.kafkaClient.pause();

// Wait for in-flight messages to complete
await this.waitForInFlightMessages();

// Gracefully disconnect
await this.kafkaClient.disconnect();

this.logger.log('Kafka client shutdown complete');
}

private async waitForInFlightMessages(timeout = 30000): Promise<void> {
const start = Date.now();

while (this.hasInFlightMessages() && Date.now() - start < timeout) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
}

Testing Best Practices

Unit Testing Consumers

describe('OrderConsumer', () => {
let consumer: OrderConsumer;
let orderService: jest.Mocked<OrderService>;

beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
OrderConsumer,
{
provide: OrderService,
useValue: {
processOrder: jest.fn(),
},
},
],
}).compile();

consumer = module.get<OrderConsumer>(OrderConsumer);
orderService = module.get(OrderService);
});

it('should process valid order message', async () => {
const message: KafkaMessage = {
key: Buffer.from('customer-123'),
value: Buffer.from(JSON.stringify({ id: 'order-456' })),
headers: {},
partition: 0,
offset: '100',
};

await consumer.handleMessage(message);

expect(orderService.processOrder).toHaveBeenCalledWith({ id: 'order-456' });
});
});

Integration Testing

describe('Kafka Integration', () => {
let app: INestApplication;
let kafkaClient: KafkaClient;

beforeAll(async () => {
const module = await Test.createTestingModule({
imports: [
KafkaModule.forRoot({
clientId: 'test-client',
brokers: ['localhost:9092'],
}),
],
}).compile();

app = module.createNestApplication();
await app.init();

kafkaClient = app.get(KafkaClient);
});

it('should send and receive messages', async () => {
const testMessage = { id: 'test-123' };

// Send message
await kafkaClient.send('test-topic', {
key: 'test-key',
value: JSON.stringify(testMessage),
});

// Verify message was received (implementation depends on your test setup)
// This might involve checking a test consumer or database
});
});

Common Anti-Patterns to Avoid

❌ Don't Block the Event Loop

// ❌ Bad: Synchronous processing
@Consumer('orders')
export class BadOrderConsumer {
handleMessage(message: KafkaMessage) {
// This blocks the event loop
const result = this.heavyComputationSync(message);
return result;
}
}

// ✅ Good: Asynchronous processing
@Consumer('orders')
export class GoodOrderConsumer {
async handleMessage(message: KafkaMessage) {
// Non-blocking async processing
const result = await this.heavyComputationAsync(message);
return result;
}
}

❌ Don't Ignore Errors

// ❌ Bad: Swallowing errors
@Consumer('orders')
export class BadErrorHandling {
async handleMessage(message: KafkaMessage) {
try {
await this.processOrder(message);
} catch (error) {
console.log('Error occurred'); // Error is lost
}
}
}

// ✅ Good: Proper error handling
@Consumer('orders')
export class GoodErrorHandling {
async handleMessage(message: KafkaMessage) {
try {
await this.processOrder(message);
} catch (error) {
this.logger.error('Order processing failed', error);
throw error; // Let the framework handle retries/DLQ
}
}
}

❌ Don't Create Too Many Connections

// ❌ Bad: Creating multiple clients
@Injectable()
export class BadKafkaService {
async sendMessage() {
const client = new KafkaClient(config); // New connection each time
await client.send('topic', message);
}
}

// ✅ Good: Reuse connections
@Injectable()
export class GoodKafkaService {
constructor(private kafkaClient: KafkaClient) {} // Injected singleton

async sendMessage() {
await this.kafkaClient.send('topic', message);
}
}

Next Steps