Kafka Interview Questions

Kafka Connect Architecture and Pipelines

questions
Scroll to track progress

Kafka Connect Architecture & Pipelines

Your Kafka Connect sink connector is 6 hours behind (pulling from Kafka at 3 PM, writing to warehouse at 9 PM). Producers are healthy, source topic has 0 lag. Where's the bottleneck?

6-hour lag = connector is slow. Diagnosis: 1) Check connector status: curl http://localhost:8083/connectors/warehouse-sink/status. If state is RUNNING but connector_status is PAUSED, it's halted (check logs for why). 2) Check task status: curl http://localhost:8083/connectors/warehouse-sink/tasks/0/status. If task is FAILED, check error message. 3) Monitor task progress: curl http://localhost:8083/connectors/warehouse-sink | jq '.config' to see batch size. If batch.size=100 (small batches), it's slow. Set higher (e.g., 5000). 4) Check throughput: inspect source topic offset vs warehouse write timestamp. If offset advances 1000 msg/sec but warehouse writes only 100/sec, sink is 10x slower than source. Causes: 1) Warehouse database is slow (long transaction time). Measure: run query on warehouse to check write latency. 2) Connector is serializing inefficiently. Check if transformation rules are expensive (regex, joins). 3) Network latency between Kafka and warehouse. Measure: mtr warehouse-host to see packet loss/latency. 4) Connector has too few tasks. Set tasks.max=5 instead of 1 (parallelism). 5) Warehouse has constraint (unique key enforcement, foreign key checks). Debug: query warehouse logs. Solutions: 1) Batch inserts: use batch.size=5000, batch.bytes=1000000 instead of 1 at a time. 2) Disable constraints temporarily: drop foreign key checks on warehouse during bulk load, re-enable after. 3) Scale connector tasks: add tasks to Connect cluster. 4) Use optimized sink (e.g., bulk copy vs insert). 5) Pre-aggregate/transform in Kafka (use ksqlDB or Flink) before sinking to warehouse.

Follow-up: If batch.size=5000 helps briefly but lag grows again, what's the likely issue?

Your source connector crashes every 8 hours. It reads from an external database with 1M rows. Each restart re-reads from offset 0. How do you fix this?

Connector crashing = likely memory leak or timeout. 8-hour cycle = batch job on external DB every 8 hours (e.g., nightly vacuum, backup). This stalls DB connections, connector times out. Solutions: 1) Increase timeout: set database.timeout.ms=60000 (from default 30s). Allows longer-running queries. 2) Implement incremental CDC (Change Data Capture) instead of full scan: instead of reading 1M rows every restart, only read changed rows (incremental). Use query.mode=incremental_mode and specify query.incremental.column=updated_at. Only rows with updated_at > last_sync_time are fetched. Faster, lighter on DB. 3) Add state management: store last read timestamp in Kafka (offset). On restart, resume from that timestamp, not offset 0. Set state.store=kafka. 4) Use JDBC source connector v2 (supports incremental query). Configure: query_mode=incremental, incremental_column=id, table=users. 5) Memory leak investigation: Monitor JVM heap: jmap -heap | grep 'Heap'. If heap grows over 8 hours and doesn't stabilize, there's a leak. Upgrade connector version or check custom transformation plugins. 6) External DB maintenance window: schedule DB vacuum outside 8-hour cycle. Communicate with DB team to avoid nightly peaks. To verify connector state: curl http://localhost:8083/connectors/source-db/status shows tasks, offsets, lag. If offset keeps resetting to 0, state isn't persisting. Check if state topic (connect-offsets) is healthy: kafka-topics --describe --topic connect-offsets.

Follow-up: If incremental mode uses updated_at timestamp and DB clock is off by 2 hours, what messages are missed?

Your Kafka Connect cluster has 3 workers. You deploy a new sink connector with tasks.max=10. Only 7 tasks start, 3 are UNASSIGNED. Why?

Unassigned tasks = more tasks than workers. With 3 workers and 10 tasks requested, Kafka Connect rebalances and assigns 3-4 tasks per worker. But if one worker is busy or crashed, tasks get stuck unassigned. Diagnosis: 1) Check worker status: curl http://localhost:8083/workers. If only 2 workers healthy, 3rd worker is down. 2) Check task failures: curl http://localhost:8083/connectors/sink-connector/tasks/7/status. If status is FAILED, read error. Common: out of memory, connection error to sink. 3) Rebalance status: curl http://localhost:8083/connectors/sink-connector | jq '.name, .tasks'. If rebalance is in-progress, wait 30s. Solutions: 1) Add worker: restart dead worker. systemctl restart confluent-kafka-connect on that host. New worker joins cluster, unassigned tasks get distributed. 2) Reduce tasks: set tasks.max=3 (1 per worker). Trade-off: lower parallelism. 3) Investigate worker crash: check logs: grep ERROR /var/log/confluent/connect/connect.log. If OOM, increase JVM heap: CONNECT_HEAP_SIZE=2G. 4) Add more workers: scale cluster to 5 workers, tasks distribute evenly. To monitor task assignment: run curl http://localhost:8083/connectors/sink-connector/status | jq '.tasks[].state' and count states. Healthy = all RUNNING. Unassigned tasks are stuck (not processing). This is bad—lag grows. Priority: fix immediately.

Follow-up: If 3 of 10 tasks are permanently UNASSIGNED, how many partitions from source topic are not being processed?

Your sink connector writes to S3. Under normal load (100K msg/sec), 1 hour of data = 360M messages. S3 upload completes. But when load spikes to 500K msg/sec, sink lags 2 hours. What's happening?

Under spike, sink can't keep up with source rate. At 100K msg/sec, sink has time to batch and upload. At 500K msg/sec, batches form faster than S3 can accept. S3 throttling or network saturation occurs. Diagnosis: 1) Check S3 API limits: S3 has PUT rate limit (3,500 puts/sec per prefix). If sink writes 1 PUT per batch, and batches form every 100ms, that's 10 PUTs/sec (fine). But if batches form every 10ms (at high load), that's 100 PUTs/sec (still fine). Issue might be elsewhere. 2) Check network bandwidth: S3 upload speed depends on network. If 500K msg/sec with 1KB messages = 500MB/sec needed. If network is 1Gbps (125MB/sec), bottleneck is network, not S3. Upload can only do 125MB/sec max. 3) Monitor connector task progress: curl http://localhost:8083/connectors/s3-sink/metrics | jq '.metrics'. Look for put_count, bytes_written. If put_count is low, batching isn't happening (increase batch.size). 4) Check S3 object count: if sink creates 1 file per batch, and batch size is 1000, at 500K msg/sec you get 500 files/sec. S3 directory listing becomes slow. Solutions: 1) Increase batch.size: set to 50000 (instead of 1000). Each batch = 1 S3 object. Reduces object count and upload frequency. 2) Increase batch.bytes: set to 10MB. Wait until batch reaches 10MB before uploading (higher throughput, higher latency). 3) Partition data in S3: group by time and partition. s3.path.format=year={{year}}/month={{month}}/day={{day}}/partition={{partition}}. Reduces directory listing overhead. 4) Use multipart upload: S3 SDK automatically uses multipart for large uploads. Faster for large objects. 5) Add more sink tasks: set tasks.max=10. Each task writes to different S3 prefix, parallelizing uploads.

Follow-up: If you increase batch.size to 50000 but lag still grows, what's the next bottleneck?

You use Kafka Connect Single Message Transforms (SMTs) to rename fields: order_id → orderId, customer_id → customerId. Under load, CPU on Connect worker hits 100%. Is SMT the bottleneck?

Possibly. Each message passes through SMTs. Simple rename (regex replace) should be fast (~1μs per message). But if you have 10 SMT rules, each expensive (e.g., database lookup for translation), that adds up. At 100K msg/sec, 100K × 1ms (per SMT) = 100s of CPU per second (exceeds 1 core). Solutions: 1) Benchmark SMT: create standalone test. for i in {1..100000}; do apply_smt; done and measure time. If < 10μs per message, SMT is fine. If > 100μs, optimize. 2) Pre-process in Kafka: move SMT logic to producer or ksqlDB. Example: producer renames fields before sending to Kafka. Consumer reads already-renamed data. Connect just passes through (no SMT overhead). 3) Simplify SMT: instead of 10 rules, combine into 1 rule (if possible). 4) Implement SMT in Java (compiled) instead of expression language (interpreted). Register custom SMT plugin. 5) Scale Connect workers: if 1 worker at 100% CPU can handle 100K msg/sec, add 2nd worker. Tasks rebalance, each handles 50K msg/sec (50% CPU each). To measure SMT impact: disable SMTs, measure CPU. Re-enable, measure again. Difference = SMT overhead. If overhead is > 50% of total CPU, invest in optimization. Better: use ksqlDB for complex transformations. ksqlDB is optimized for stream processing (C++ backend). Example: CREATE STREAM orders_renamed AS SELECT order_id AS orderId, customer_id AS customerId FROM orders;. ksqlDB handles 500K+ msg/sec with low latency (10ms, vs Connect's 100ms+).

Follow-up: If you have 10 SMT rules and each adds 10μs latency, what's the end-to-end latency at 100K msg/sec?

Your source connector reads from database incrementally (updated_at > last_sync). During a bulk import (1M rows inserted at once), all rows have same timestamp (import_time). Which rows does incremental connector skip?

Connector might miss rows. If bulk import inserts 1M rows at timestamp T (e.g., 3:00 PM), and last_sync was T-1 hour, connector fetches all 1M rows (all have updated_at > T-1hour). It processes them and updates last_sync = T. If a second bulk import happens at timestamp T (same time), those rows also have updated_at = T. Connector compares: updated_at >= T is false (no rows > T). Those rows are skipped if they're inserted after cursor. This is a race condition. Solutions: 1) Use microsecond precision: change updated_at to timestamp(6) or UUID-based versioning. This ensures uniqueness even within same millisecond. 2) Add row ID: sort by (updated_at, id DESC). Connector remembers both updated_at and id of last processed row. On restart: fetch rows where (updated_at > T) OR (updated_at = T AND id > last_id). Catches rows inserted at same timestamp but different ID. 3) Use WAL (Write-Ahead Log): CDC-aware databases (PostgreSQL, MySQL 5.7+) have transaction logs. Connector reads from WAL position, captures all changes atomically. No timestamp precision needed. 4) Deduplicate in Kafka: use idempotent producer or deduplication on Kafka side. If row is inserted twice, it appears twice in Kafka. Consumer handles dedup (idempotency key = database row ID). This is acceptable if downstream can tolerate dupes. To test: manually insert 1M rows with same timestamp, run connector, verify count matches. If count is < 1M, you have a problem.

Follow-up: If you switch to WAL-based CDC, does connector throughput increase or decrease?

Want to go deeper?