Kafka Connect: Moving Data Into Topics Without Writing a Consumer

One of the recurring costs in a Kafka-based pipeline is writing the glue code. You have a source — a database, an API, a file drop — and you need to get its data into a topic. Writing a producer from scratch means handling connection management, offset tracking, error recovery, and schema serialization. For every source. Every time.

Kafka Connect is the answer to that problem. It is a framework — included with Kafka since version 0.9 — for running configurable, restartable, monitored connectors between Kafka and external systems. Source connectors move data into Kafka. Sink connectors move data out. The connectors themselves are plugins; you configure them with JSON or properties files rather than writing code.

How Connect Works

Connect runs as a separate process (or cluster of processes) alongside your Kafka brokers. You deploy connector plugins — JAR files — and then configure connector instances via REST API or configuration files. Connect manages worker processes, task parallelism, offset tracking, and restarts. You do not write a producer; you describe what you want to connect.

# Start Kafka Connect in standalone mode (single worker, local development)
bin/connect-standalone.sh config/connect-standalone.properties   config/connect-jdbc-source.properties

# Or distributed mode for production (multiple workers, high availability)
bin/connect-distributed.sh config/connect-distributed.properties
# Then configure connectors via REST:
# POST http://localhost:8083/connectors

JDBC Source Connector: Pulling From a Database

The JDBC source connector reads from any JDBC-compatible database (SQL Server, PostgreSQL, MySQL, Oracle) and writes each row as a Kafka message. You configure the query, the topic name, the polling interval, and the incrementing or timestamp column used for watermarking.

{
  "name": "sqlserver-orders-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://sql-server-01:1433;database=OrdersDB",
    "connection.user": "kafka_reader",
    "connection.password": "${file:/opt/kafka/secrets.properties:db.password}",
    "mode": "incrementing",
    "incrementing.column.name": "OrderId",
    "table.whitelist": "dbo.Orders",
    "topic.prefix": "sqlserver.ordersdb.",
    "poll.interval.ms": "30000",
    "batch.max.rows": "5000"
  }
}

This configuration polls dbo.Orders every 30 seconds for new rows (identified by an incrementing OrderId) and writes them to the topic sqlserver.ordersdb.dbo.Orders. The connector tracks the highest OrderId it has seen in Kafka's internal offset store, so it survives restarts without re-reading the entire table.

For tables with updates (not just inserts), use mode: "timestamp+incrementing" with a UpdatedAt column. This catches both new rows and modified rows, though it cannot detect deletes. For delete detection you need CDC — either SQL Server CDC feeding a separate connector, or a tool like Debezium.

File Source Connector: Pulling From a File Drop

{
  "name": "file-drop-sensor-source",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file": "/data/incoming/sensor_readings.jsonl",
    "topic": "sensor_readings_raw",
    "tasks.max": "1"
  }
}

The file connector reads lines from a file and writes each line as a message. It is primitive compared to the JDBC connector — no schema inference, no watermarking — but useful for prototyping or for integrating with systems that drop files as their output mechanism.

What Connect Does Not Do

Connect is not a transformation engine. The connectors have basic Single Message Transforms (SMTs) available — field renaming, filtering, timestamp routing — but anything beyond simple field manipulation belongs in a downstream consumer, not in the connector configuration. The connector's job is to move data reliably; transformation is a separate concern.

This is consistent with the raw zone philosophy: the connector lands data as-is (or with minimal shaping for serialization purposes), and the transformation work happens later in a job that can be independently deployed and re-run.

If there is a connector for your data source, use it. The ecosystem is large — Debezium for CDC, Confluent's JDBC connector, S3/ADLS sink connectors, Elasticsearch sink connectors. Writing a producer from scratch is sometimes necessary, but the connector ecosystem has covered most of the common cases. I am here to help if you want to evaluate whether a connector fits your source system.

Read more