The scenario: your Kafka consumer is humming along, processing thousands of messages per hour, when it hits one that does not match the expected schema. The JSON has a field with the wrong type. The Avro deserialization throws an exception. Maybe the message is simply empty bytes.
What happens next depends entirely on how you wrote your error handling. If you did nothing special, the exception propagates, your consumer crashes, it restarts from the last committed offset, and hits the same bad message again. Infinite crash loop. Your pipeline is now down until someone manually intervenes.
The fix is a dead letter queue (DLQ) — a place where bad messages go so they do not block the pipeline. This is not a new idea; MSMQ had DLQs in 1997. The streaming-specific implementation just requires some thought.
The Basic Pattern
Wrap your message processing in a try/except. On success, write to the normal output and commit the offset. On failure, write to the dead letter store with enough context to debug later, and commit the offset. The key insight: you commit the offset whether the message succeeds or fails — you just route it differently. Do not let one bad message hold up the entire partition.
from kafka import KafkaConsumer, KafkaProducer
import json
import time
consumer = KafkaConsumer(
'sensor_readings_raw',
bootstrap_servers=['kafka-broker-01:9092'],
group_id='sensor-deserialize-job',
enable_auto_commit=False
)
dlq_producer = KafkaProducer(
bootstrap_servers=['kafka-broker-01:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def deserialize_sensor_reading(raw_bytes):
data = json.loads(raw_bytes.decode('utf-8'))
return {
'sensor_id': str(data['sensor_id']),
'value': float(data['value']),
'unit': str(data['unit']),
'ts_ms': int(data['ts_ms'])
}
for message in consumer:
try:
structured = deserialize_sensor_reading(message.value)
structured_store.write(structured)
consumer.commit()
except (KeyError, ValueError, json.JSONDecodeError, UnicodeDecodeError) as e:
# Route to dead letter topic with full diagnostic context
dlq_record = {
'source_topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'kafka_timestamp': message.timestamp,
'failed_at_ms': int(time.time() * 1000),
'error_type': type(e).__name__,
'error_message': str(e),
'raw_payload': message.value.decode('utf-8', errors='replace')
}
dlq_producer.send('sensor_readings_dead_letter', value=dlq_record)
dlq_producer.flush()
consumer.commit() # advance offset — do not reprocess this message
What to Put in the Dead Letter Record
The dead letter record needs enough information to reconstruct what happened and, ideally, replay the original message after fixing the deserializer:
- Source topic, partition, and offset — so you can locate the original message in Kafka if it is still within retention
- Kafka timestamp — when the message was produced to the topic
- Failed-at timestamp — when your consumer tried to process it
- Error type and message — the specific exception, not just "error"
- Raw payload — the actual bytes that failed, stored as a string
Do not store just the error message. Storing the raw payload is what makes the DLQ useful for replay — you can reprocess directly from the DLQ record without needing to go back to the Kafka topic (which may have expired by the time you investigate).
Monitoring Dead Letter Volume
A DLQ that accumulates silently is not a safety valve — it is a hidden failure. Set up monitoring on the DLQ topic or table and alert when the dead letter rate exceeds a threshold. A few dead letters per day might be acceptable (malformed messages, network blips during encoding). A dead letter rate of 10% of your message volume means your deserializer is wrong or the producer changed schemas.
-- SQL monitoring query if DLQ is a database table
SELECT
source_topic,
error_type,
COUNT(*) AS error_count,
MIN(failed_at_ms) AS first_seen_ms,
MAX(failed_at_ms) AS last_seen_ms
FROM sensor_readings_dead_letter
WHERE failed_at_ms > DATEDIFF(ms, '1970-01-01', DATEADD(hour, -1, GETUTCDATE()))
GROUP BY source_topic, error_type
ORDER BY error_count DESC;
Replaying Dead Letters
When you have fixed the deserializer, you need to reprocess the dead letter records. The cleanest approach: treat the DLQ as a source and run the fixed deserializer against the raw payloads stored there. This is why storing the raw payload in the DLQ record matters — you can reprocess without going back to the original Kafka topic.
Never write a DLQ that just stores the error message. Write one that stores enough to recover. The extra 200 bytes per record is trivially cheap compared to the time you save when you need to investigate at 2am. I am here to help.