Building Your First Kafka Producer and Consumer in Python

The conceptual pieces make sense — topics, partitions, offsets, consumer groups. Now you need to actually run something. This post walks through a minimal but realistic producer and consumer using the kafka-python library. I am assuming you have a Kafka broker running somewhere (local or remote) and Python 2.7 or 3.x available. The kafka-python library installs with pip install kafka-python.

Producer: Writing Events to a Topic

The producer is the simpler side. You create a KafkaProducer, call send(), and flush. The only meaningful decision at this stage is your serializer — Kafka messages are bytes, so you need to tell the producer how to turn your Python objects into bytes.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka-broker-01:9092'],
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',           # wait for all in-sync replicas to ack before returning
    retries=3             # retry transient failures
)

def publish_order_event(order_id, customer_id, total):
    future = producer.send(
        topic='order_placed',
        key=str(order_id),       # partition key: all events for this order go to same partition
        value={
            'order_id': order_id,
            'customer_id': customer_id,
            'total': total,
            'event_type': 'order_placed'
        }
    )
    # Block until the broker acknowledges (optional — remove for async fire-and-forget)
    record_metadata = future.get(timeout=10)
    return record_metadata.partition, record_metadata.offset

partition, offset = publish_order_event(order_id=1001, customer_id=42, total=129.99)
print(f"Written to partition={partition} offset={offset}")

producer.flush()
producer.close()

acks='all' waits for all in-sync replicas to confirm receipt before returning. This is the safest option — it means a broker failure after the ack will not lose your message. The tradeoff is latency. For a high-throughput analytics feed where occasional loss is acceptable, acks=1 (leader-only confirmation) is a reasonable choice. For anything that needs durability, use acks='all'.

Consumer: Reading Events From a Topic

The consumer side has a few more moving parts. You need to decide on offset management and what happens when your process restarts.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'order_placed',
    bootstrap_servers=['kafka-broker-01:9092'],
    group_id='order_raw_landing',      # consumer group — tracks offsets independently
    auto_offset_reset='earliest',      # first run: start from beginning of topic
    enable_auto_commit=False,          # commit offsets manually after confirmed write
    value_deserializer=lambda b: b     # keep raw bytes — we'll parse downstream
)

def write_to_raw_zone(partition, offset, raw_bytes):
    # Write to your raw store (file, database, etc.)
    print(f"  Landed partition={partition} offset={offset} ({len(raw_bytes)} bytes)")

try:
    for message in consumer:
        write_to_raw_zone(message.partition, message.offset, message.value)
        # Commit offset only after the write succeeds
        consumer.commit()
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

The value_deserializer=lambda b: b line is deliberate — we are not parsing the JSON here. The raw landing job stores bytes. Deserialization happens in a downstream stage. If you let the consumer parse the JSON and the schema changes, you have a crash in your most critical job — the one that keeps the raw record. Do not mix concerns.

What to Test Before Going to Production

Three things worth validating before you trust this in a real pipeline:

  1. Restart behavior — kill the consumer mid-run without committing, restart it. Confirm it re-reads the uncommitted messages. This verifies your offset management is correct.
  2. Multi-partition behavior — if your topic has multiple partitions, verify your consumer is receiving from all of them, not just partition 0.
  3. Consumer group isolation — start a second consumer with a different group_id pointing at the same topic. Confirm it starts from its own offset and does not interfere with the first consumer.

None of this requires a production cluster. A local single-broker setup with a test topic is enough to verify the behavior before you commit to a deployment pattern. Next up: the micro-batch model — when you do not need to keep a consumer running 24/7 and what that means for your cloud bill. I am here to help if you hit any snags with the setup.

Read more