confluent-kafka-connect
About
This skill provides expert guidance on Kafka Connect for building data pipelines, including configuring source and sink connectors for systems like JDBC, Elasticsearch, and Debezium CDC. It helps developers implement data integration patterns, manage connector configurations, and apply Single Message Transforms (SMTs). Use it for designing, troubleshooting, and optimizing your Kafka Connect deployments.
Quick Install
Claude Code
Recommended/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/confluent-kafka-connectCopy and paste this command in Claude Code to install this skill
Documentation
Confluent Kafka Connect Skill
Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.
What I Know
Connector Types
Source Connectors (External System → Kafka):
- JDBC Source: Databases → Kafka
- Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
- S3 Source: AWS S3 files → Kafka
- File Source: Local files → Kafka
Sink Connectors (Kafka → External System):
- JDBC Sink: Kafka → Databases
- Elasticsearch Sink: Kafka → Elasticsearch
- S3 Sink: Kafka → AWS S3
- HDFS Sink: Kafka → Hadoop HDFS
Single Message Transforms (SMTs):
- Field operations: Insert, Mask, Replace, TimestampConverter
- Routing: RegexRouter, TimestampRouter
- Filtering: Filter, Predicates
When to Use This Skill
Activate me when you need help with:
- Connector setup ("Configure JDBC connector")
- CDC patterns ("Debezium MySQL CDC")
- Data pipelines ("Stream database changes to Kafka")
- SMT transforms ("Mask sensitive fields")
- Connector troubleshooting ("Connector task failed")
Common Patterns
Pattern 1: JDBC Source (Database → Kafka)
Use Case: Stream database table changes to Kafka
Configuration:
{
"name": "jdbc-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-",
"table.whitelist": "users,orders",
"poll.interval.ms": "5000"
}
}
Modes:
incrementing: Track by auto-increment IDtimestamp: Track by timestamp columntimestamp+incrementing: Both (most reliable)
Pattern 2: Debezium CDC (MySQL → Kafka)
Use Case: Capture all database changes (INSERT/UPDATE/DELETE)
Configuration:
{
"name": "debezium-mysql-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "mysql",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mydb"
}
}
Output Format (Debezium Envelope):
{
"before": null,
"after": {
"id": 1,
"name": "John Doe",
"email": "john@example.com"
},
"source": {
"version": "1.9.0",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1620000000000,
"snapshot": "false",
"db": "mydb",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 12345,
"row": 0,
"thread": null,
"query": null
},
"op": "c", // c=CREATE, u=UPDATE, d=DELETE, r=READ
"ts_ms": 1620000000000
}
Pattern 3: JDBC Sink (Kafka → Database)
Use Case: Write Kafka events to PostgreSQL
Configuration:
{
"name": "jdbc-sink-enriched-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"topics": "enriched-orders",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"connection.user": "postgres",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "order_id",
"table.name.format": "orders_${topic}"
}
}
Insert Modes:
insert: Append only (fails on duplicate)update: Update only (requires PK)upsert: INSERT or UPDATE (recommended)
Pattern 4: S3 Sink (Kafka → AWS S3)
Use Case: Archive Kafka topics to S3
Configuration:
{
"name": "s3-sink-events",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "user-events,order-events",
"s3.region": "us-east-1",
"s3.bucket.name": "my-kafka-archive",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "60000",
"rotate.schedule.interval.ms": "3600000",
"timezone": "UTC",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timestamp.extractor": "Record"
}
}
Partitioning (S3 folder structure):
s3://my-kafka-archive/
topics/user-events/year=2025/month=01/day=15/hour=10/
user-events+0+0000000000.json
user-events+0+0000001000.json
topics/order-events/year=2025/month=01/day=15/hour=10/
order-events+0+0000000000.json
Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch)
Use Case: Index Kafka events for search
Configuration:
{
"name": "elasticsearch-sink-logs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "application-logs",
"connection.url": "http://localhost:9200",
"connection.username": "elastic",
"connection.password": "password",
"key.ignore": "true",
"schema.ignore": "true",
"type.name": "_doc",
"index.write.wait_for_active_shards": "1"
}
}
Single Message Transforms (SMTs)
Transform 1: Mask Sensitive Fields
Use Case: Hide email/phone in Kafka topics
Configuration:
{
"transforms": "maskEmail",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email,phone"
}
Before:
{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"}
After:
{"id": 1, "name": "John", "email": null, "phone": null}
Transform 2: Add Timestamp
Use Case: Add processing timestamp to all messages
Configuration:
{
"transforms": "insertTimestamp",
"transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTimestamp.timestamp.field": "processed_at"
}
Transform 3: Route by Field Value
Use Case: Route high-value orders to separate topic
Configuration:
{
"transforms": "routeByValue",
"transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByValue.regex": "(.*)",
"transforms.routeByValue.replacement": "$1-high-value",
"transforms.routeByValue.predicate": "isHighValue",
"predicates": "isHighValue",
"predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHighValue.pattern": "orders"
}
Transform 4: Flatten Nested JSON
Use Case: Flatten nested structures for JDBC sink
Configuration:
{
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
Before:
{
"user": {
"id": 1,
"profile": {
"name": "John",
"email": "john@example.com"
}
}
}
After:
{
"user_id": 1,
"user_profile_name": "John",
"user_profile_email": "john@example.com"
}
Best Practices
1. Use Idempotent Connectors
✅ DO:
// JDBC Sink with upsert mode
{
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
❌ DON'T:
// WRONG: insert mode (duplicates on restart!)
{
"insert.mode": "insert"
}
2. Monitor Connector Status
# Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status
3. Use Schema Registry
✅ DO:
{
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
4. Configure Error Handling
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
"errors.deadletterqueue.context.headers.enable": "true"
}
Connector Management
Deploy Connector
# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-source.json
# Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \
-H "Content-Type: application/json" \
-d @jdbc-source.json
Monitor Connectors
# List all connectors
curl http://localhost:8083/connectors
# Get connector info
curl http://localhost:8083/connectors/jdbc-source-users
# Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks
Pause/Resume Connectors
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume
# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart
# Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart
Common Issues & Solutions
Issue 1: Connector Task Failed
Symptoms: Task state = FAILED
Solutions:
- Check connector logs:
docker logs connect-worker - Validate configuration:
curl http://localhost:8083/connector-plugins/<class>/config/validate - Restart task:
curl -X POST .../tasks/0/restart
Issue 2: Schema Evolution Error
Error: Incompatible schema detected
Solution: Enable auto-evolution:
{
"auto.create": "true",
"auto.evolve": "true"
}
Issue 3: JDBC Connection Pool Exhausted
Error: Could not get JDBC connection
Solution: Increase pool size:
{
"connection.attempts": "3",
"connection.backoff.ms": "10000"
}
References
- Kafka Connect Documentation: https://kafka.apache.org/documentation/#connect
- Confluent Hub: https://www.confluent.io/hub/
- Debezium Documentation: https://debezium.io/documentation/
- Transform Reference: https://kafka.apache.org/documentation/#connect_transforms
Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!
GitHub Repository
Related Skills
content-collections
MetaThis skill provides a production-tested setup for Content Collections, a TypeScript-first tool that transforms Markdown/MDX files into type-safe data collections with Zod validation. Use it when building blogs, documentation sites, or content-heavy Vite + React applications to ensure type safety and automatic content validation. It covers everything from Vite plugin configuration and MDX compilation to deployment optimization and schema validation.
llamaindex
MetaLlamaIndex is a data framework for building RAG-powered LLM applications, specializing in document ingestion, indexing, and querying. It provides key features like vector indices, query engines, and agents, and supports over 300 data connectors. Use it for document Q&A, chatbots, and knowledge retrieval when building data-centric applications.
hybrid-cloud-networking
MetaThis skill configures secure hybrid cloud networking between on-premises infrastructure and cloud platforms like AWS, Azure, and GCP. Use it when connecting data centers to the cloud, building hybrid architectures, or implementing secure cross-premises connectivity. It supports key capabilities such as VPNs and dedicated connections like AWS Direct Connect for high-performance, reliable setups.
polymarket
MetaThis skill enables developers to build applications with the Polymarket prediction markets platform, including API integration for trading and market data. It also provides real-time data streaming via WebSocket to monitor live trades and market activity. Use it for implementing trading strategies or creating tools that process live market updates.
