The conceptual pieces make sense — topics, partitions, offsets, consumer groups. Now you need to actually run something. This post walks through a minimal but realistic producer and consumer using the kafka-python library. I am assuming you have a Kafka broker running somewhere (local or remote) and Python 2.7 or 3.x available. The kafka-python library installs with pip install kafka-python.
Producer: Writing Events to a Topic
The producer is the simpler side. You create a KafkaProducer, call send(), and flush. The only meaningful decision at this stage is your serializer — Kafka messages are bytes, so you need to tell the producer how to turn your Python objects into bytes.
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka-broker-01:9092'],
key_serializer=lambda k: k.encode('utf-8') if k else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # wait for all in-sync replicas to ack before returning
retries=3 # retry transient failures
)
def publish_order_event(order_id, customer_id, total):
future = producer.send(
topic='order_placed',
key=str(order_id), # partition key: all events for this order go to same partition
value={
'order_id': order_id,
'customer_id': customer_id,
'total': total,
'event_type': 'order_placed'
}
)
# Block until the broker acknowledges (optional — remove for async fire-and-forget)
record_metadata = future.get(timeout=10)
return record_metadata.partition, record_metadata.offset
partition, offset = publish_order_event(order_id=1001, customer_id=42, total=129.99)
print(f"Written to partition={partition} offset={offset}")
producer.flush()
producer.close()
acks='all' waits for all in-sync replicas to confirm receipt before returning. This is the safest option — it means a broker failure after the ack will not lose your message. The tradeoff is latency. For a high-throughput analytics feed where occasional loss is acceptable, acks=1 (leader-only confirmation) is a reasonable choice. For anything that needs durability, use acks='all'.
Consumer: Reading Events From a Topic
The consumer side has a few more moving parts. You need to decide on offset management and what happens when your process restarts.
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'order_placed',
bootstrap_servers=['kafka-broker-01:9092'],
group_id='order_raw_landing', # consumer group — tracks offsets independently
auto_offset_reset='earliest', # first run: start from beginning of topic
enable_auto_commit=False, # commit offsets manually after confirmed write
value_deserializer=lambda b: b # keep raw bytes — we'll parse downstream
)
def write_to_raw_zone(partition, offset, raw_bytes):
# Write to your raw store (file, database, etc.)
print(f" Landed partition={partition} offset={offset} ({len(raw_bytes)} bytes)")
try:
for message in consumer:
write_to_raw_zone(message.partition, message.offset, message.value)
# Commit offset only after the write succeeds
consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()
The value_deserializer=lambda b: b line is deliberate — we are not parsing the JSON here. The raw landing job stores bytes. Deserialization happens in a downstream stage. If you let the consumer parse the JSON and the schema changes, you have a crash in your most critical job — the one that keeps the raw record. Do not mix concerns.
What to Test Before Going to Production
Three things worth validating before you trust this in a real pipeline:
- Restart behavior — kill the consumer mid-run without committing, restart it. Confirm it re-reads the uncommitted messages. This verifies your offset management is correct.
- Multi-partition behavior — if your topic has multiple partitions, verify your consumer is receiving from all of them, not just partition 0.
- Consumer group isolation — start a second consumer with a different
group_idpointing at the same topic. Confirm it starts from its own offset and does not interfere with the first consumer.
None of this requires a production cluster. A local single-broker setup with a test topic is enough to verify the behavior before you commit to a deployment pattern. Next up: the micro-batch model — when you do not need to keep a consumer running 24/7 and what that means for your cloud bill. I am here to help if you hit any snags with the setup.