Confluent Kafka Connect Skill
Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.
What I Know
Connector Types
Source Connectors (External System → Kafka):
-
JDBC Source: Databases → Kafka
-
Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
-
S3 Source: AWS S3 files → Kafka
-
File Source: Local files → Kafka
Sink Connectors (Kafka → External System):
-
JDBC Sink: Kafka → Databases
-
Elasticsearch Sink: Kafka → Elasticsearch
-
S3 Sink: Kafka → AWS S3
-
HDFS Sink: Kafka → Hadoop HDFS
Single Message Transforms (SMTs):
-
Field operations: Insert, Mask, Replace, TimestampConverter
-
Routing: RegexRouter, TimestampRouter
-
Filtering: Filter, Predicates
When to Use This Skill
Activate me when you need help with:
-
Connector setup ("Configure JDBC connector")
-
CDC patterns ("Debezium MySQL CDC")
-
Data pipelines ("Stream database changes to Kafka")
-
SMT transforms ("Mask sensitive fields")
-
Connector troubleshooting ("Connector task failed")
Common Patterns
Pattern 1: JDBC Source (Database → Kafka)
Use Case: Stream database table changes to Kafka
Configuration:
{ "name": "jdbc-source-users", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://localhost:5432/mydb", "connection.user": "postgres", "connection.password": "password", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "postgres-", "table.whitelist": "users,orders", "poll.interval.ms": "5000" } }
Modes:
-
incrementing : Track by auto-increment ID
-
timestamp : Track by timestamp column
-
timestamp+incrementing : Both (most reliable)
Pattern 2: Debezium CDC (MySQL → Kafka)
Use Case: Capture all database changes (INSERT/UPDATE/DELETE)
Configuration:
{ "name": "debezium-mysql-cdc", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "password", "database.server.id": "1", "database.server.name": "mysql", "database.include.list": "mydb", "table.include.list": "mydb.users,mydb.orders", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", "schema.history.internal.kafka.topic": "schema-changes.mydb" } }
Output Format (Debezium Envelope):
{ "before": null, "after": { "id": 1, "name": "John Doe", "email": "john@example.com" }, "source": { "version": "1.9.0", "connector": "mysql", "name": "mysql", "ts_ms": 1620000000000, "snapshot": "false", "db": "mydb", "table": "users", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 12345, "row": 0, "thread": null, "query": null }, "op": "c", // c=CREATE, u=UPDATE, d=DELETE, r=READ "ts_ms": 1620000000000 }
Pattern 3: JDBC Sink (Kafka → Database)
Use Case: Write Kafka events to PostgreSQL
Configuration:
{ "name": "jdbc-sink-enriched-orders", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "3", "topics": "enriched-orders", "connection.url": "jdbc:postgresql://localhost:5432/analytics", "connection.user": "postgres", "connection.password": "password", "auto.create": "true", "auto.evolve": "true", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "order_id", "table.name.format": "orders_${topic}" } }
Insert Modes:
-
insert : Append only (fails on duplicate)
-
update : Update only (requires PK)
-
upsert : INSERT or UPDATE (recommended)
Pattern 4: S3 Sink (Kafka → AWS S3)
Use Case: Archive Kafka topics to S3
Configuration:
{ "name": "s3-sink-events", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "3", "topics": "user-events,order-events", "s3.region": "us-east-1", "s3.bucket.name": "my-kafka-archive", "s3.part.size": "5242880", "flush.size": "1000", "rotate.interval.ms": "60000", "rotate.schedule.interval.ms": "3600000", "timezone": "UTC", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", "locale": "US", "timestamp.extractor": "Record" } }
Partitioning (S3 folder structure):
s3://my-kafka-archive/ topics/user-events/year=2025/month=01/day=15/hour=10/ user-events+0+0000000000.json user-events+0+0000001000.json topics/order-events/year=2025/month=01/day=15/hour=10/ order-events+0+0000000000.json
Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch)
Use Case: Index Kafka events for search
Configuration:
{ "name": "elasticsearch-sink-logs", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "3", "topics": "application-logs", "connection.url": "http://localhost:9200", "connection.username": "elastic", "connection.password": "password", "key.ignore": "true", "schema.ignore": "true", "type.name": "_doc", "index.write.wait_for_active_shards": "1" } }
Single Message Transforms (SMTs)
Transform 1: Mask Sensitive Fields
Use Case: Hide email/phone in Kafka topics
Configuration:
{ "transforms": "maskEmail", "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.maskEmail.fields": "email,phone" }
Before:
{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"}
After:
{"id": 1, "name": "John", "email": null, "phone": null}
Transform 2: Add Timestamp
Use Case: Add processing timestamp to all messages
Configuration:
{ "transforms": "insertTimestamp", "transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertTimestamp.timestamp.field": "processed_at" }
Transform 3: Route by Field Value
Use Case: Route high-value orders to separate topic
Configuration:
{ "transforms": "routeByValue", "transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeByValue.regex": "(.*)", "transforms.routeByValue.replacement": "$1-high-value", "transforms.routeByValue.predicate": "isHighValue", "predicates": "isHighValue", "predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.isHighValue.pattern": "orders" }
Transform 4: Flatten Nested JSON
Use Case: Flatten nested structures for JDBC sink
Configuration:
{ "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_" }
Before:
{ "user": { "id": 1, "profile": { "name": "John", "email": "john@example.com" } } }
After:
{ "user_id": 1, "user_profile_name": "John", "user_profile_email": "john@example.com" }
Best Practices
- Use Idempotent Connectors
✅ DO:
// JDBC Sink with upsert mode { "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "id" }
❌ DON'T:
// WRONG: insert mode (duplicates on restart!) { "insert.mode": "insert" }
- Monitor Connector Status
Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status
- Use Schema Registry
✅ DO:
{ "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081" }
- Configure Error Handling
{ "errors.tolerance": "all", "errors.log.enable": "true", "errors.log.include.messages": "true", "errors.deadletterqueue.topic.name": "dlq-jdbc-sink", "errors.deadletterqueue.context.headers.enable": "true" }
Connector Management
Deploy Connector
Create connector via REST API
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d @jdbc-source.json
Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config
-H "Content-Type: application/json"
-d @jdbc-source.json
Monitor Connectors
List all connectors
curl http://localhost:8083/connectors
Get connector info
curl http://localhost:8083/connectors/jdbc-source-users
Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks
Pause/Resume Connectors
Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause
Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume
Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart
Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart
Common Issues & Solutions
Issue 1: Connector Task Failed
Symptoms: Task state = FAILED
Solutions:
-
Check connector logs: docker logs connect-worker
-
Validate configuration: curl http://localhost:8083/connector-plugins/<class>/config/validate
-
Restart task: curl -X POST .../tasks/0/restart
Issue 2: Schema Evolution Error
Error: Incompatible schema detected
Solution: Enable auto-evolution:
{ "auto.create": "true", "auto.evolve": "true" }
Issue 3: JDBC Connection Pool Exhausted
Error: Could not get JDBC connection
Solution: Increase pool size:
{ "connection.attempts": "3", "connection.backoff.ms": "10000" }
References
-
Kafka Connect Documentation: https://kafka.apache.org/documentation/#connect
-
Confluent Hub: https://www.confluent.io/hub/
-
Debezium Documentation: https://debezium.io/documentation/
-
Transform Reference: https://kafka.apache.org/documentation/#connect_transforms
Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!