Kafka Interview Questions

Cross-Cluster Replication with MirrorMaker 2

questions
Scroll to track progress

You need to replicate Kafka from US cluster to EU cluster with <1 minute lag. Currently 2M msg/sec, 50 topics. Design MirrorMaker2 setup, sizing, and lag SLO.

MirrorMaker2 (MM2) architecture: Connector-based replication. Runs as distributed workers (Kafka Connect cluster). Fetches from source cluster, writes to target cluster. Replicates topics, partitions, messages, offsets.

Sizing for 2M msg/sec: (1) Calculate throughput per topic: 2M / 50 = 40K msg/sec per topic. (2) Each MM2 worker can handle 200-500K msg/sec (depends on message size, network bandwidth, worker CPU/memory). For 2M, need 4-10 workers. Start with 5. (3) Each worker runs connector tasks: configure tasks.max=4 per worker. 5 workers × 4 tasks = 20 tasks total, distributed across 50 topics.

Configuration: Set up MM2 connector in source → target direction:

clusters = us, eu

us.bootstrap.servers = us-broker-1:9092, ...

eu.bootstrap.servers = eu-broker-1:9092, ...

source.consumer.fetch.max.bytes = 52428800 (50MB for large messages)

target.producer.linger.ms = 1000 (batch for throughput)

target.producer.batch.size = 1000000 (1MB batches)

Lag SLO (<1 minute): Monitor kafka.connect:type=connector-task-metrics,connector=mirrormaker,task=X,name=source-record-lag metric. Set up alerting if lag > 60 seconds. If lag exceeds SLO, scale workers up or reduce batch size (trade-off: higher latency but lower throughput).

Production example at LinkedIn: They replicate US → Europe with 1.4T msg/day. Use 50 MM2 workers. Lag stays <15 seconds via careful batching and network optimization.

Follow-up: If a message takes 30 seconds to replicate from US to EU (network latency), how do you distinguish true lag from network propagation time? Can you adjust SLO accordingly?

MirrorMaker2 crashes mid-replication. Some messages were fetched but not yet written to EU cluster. How does MM2 recover without duplicating messages or losing them?

MM2 recovery mechanism: MM2 uses Kafka Connect's offset management. Offsets are stored in a special `__connect-offsets` topic on the target cluster (EU). When MM2 restarts: (1) Reads committed offsets from `__connect-offsets`; (2) Fetches from source cluster starting from last committed offset + 1; (3) Writes to target cluster; (4) Commits offsets.

Exactly-once semantics: If enabled, MM2 uses transactions. (1) Fetch batch from US; (2) Begin transaction on EU; (3) Write messages; (4) Write offset to `__connect-offsets` within transaction; (5) Commit transaction. If crash occurs before step 5, transaction rolls back, offsets don't commit, retry from last committed offset. No duplicates.

Configuration: Set exactly.once.support=required in MM2 connector config. Also ensure target.producer.acks=all and target.producer.enable.idempotence=true.

Risk of data loss: If messages are written to EU but offset commit fails, MM2 might re-fetch and re-write on restart (duplicates), OR skip messages (data loss). To prevent: use transactions (ensures atomicity) and verify `__connect-offsets` topic has min replicas >= 2 (not single replica).

Verification after recovery: Compare message counts: kafka-run-class.sh kafka.tools.OffsetChecker --broker-list us-broker:9092 --group mirrormaker-cluster --topic events (last offset on US) vs same on EU. Should match (within few messages for in-flight).

Follow-up: If __connect-offsets topic is corrupted or deleted, can MM2 recover? Should you backup offsets separately?

MM2 is replicating topic "orders". A new partition is added to source (US) cluster. How does MM2 detect and replicate the new partition to target (EU) cluster?

Auto-detection: MM2 runs periodic sync jobs (default every 60 seconds) that check source cluster metadata. When a new partition is detected, MM2 automatically creates it in target cluster with same config (partition count, replication factor, etc.).

Configuration: sync.topic.configs.enabled=true allows MM2 to periodically check and update target topic configs to match source. sync.topic.configs.interval.seconds=60 sets sync frequency.

Replication of new partition: After partition is created on target, MM2 starts fetching from new partition on source starting from offset 0. Connector tasks are automatically rebalanced to cover the new partition.

Lag during partition addition: New partition starts at offset 0 on both source and target, so no backlog. However, if existing replication is already busy (high lag), adding a new partition might increase lag further (more work for same workers). Monitor lag metric and scale if needed.

Edge case: If source adds partition 10, but target cluster is full (storage limit reached), MM2 fails to create partition on target. Error is logged, but replication continues for existing partitions. The new partition remains un-replicated until storage is freed.

Production best practice: When scaling topic partitions, pre-allocate space on target cluster. Monitor MM2 sync job logs for errors.

Follow-up: If you add 100 partitions to a topic with 2M msg/sec, how long does MM2 take to detect and start replicating all new partitions? Does this introduce ordering issues?

MM2 is replicating topic "users" from US to EU. A consumer in EU wants to consume "users" but it's a replicated topic. How do you distinguish between replicated messages and local writes? Can you prevent EU consumers from accidentally consuming stale replicated data?

Replicated topics: MM2 replicates the topic as-is, including all messages and metadata. A consumer in EU consuming "users" gets all messages (originating from US). Messages look identical to original (same offset, timestamp, key, value). There's no flag marking them as "replicated".

Design pattern 1 - Topic naming: Replicate to "users.us" instead of "users". This clearly indicates the topic origin. EU consumers explicitly choose to consume replicated data: subscribe("users.us").

Design pattern 2 - Namespace the replica: MM2 supports topic renaming via SourceAndTarget SMT (Single Message Transform). Rename "users" → "users-replica". Prevents accidental consumption.

transforms=renameTopics

transforms.renameTopics.type=org.apache.kafka.connect.transforms.RegexRouter

transforms.renameTopics.regex=([a-z-]*)

transforms.renameTopics.replacement=\$1-replica

Design pattern 3 - Active-active replication: If EU also produces to "users", you have data from both US and EU. Use a custom header or schema field to mark origin: { user_id: 123, name: "Alice", _origin: "us" }. Consumer can choose to process only local writes or merge both sources.

Production at Stripe: They use pattern 1 + pattern 3. Replicated topics are "users-us", "users-eu". Each region also produces "users" (local writes). Consumers explicitly subscribe to both, merge based on business logic.

Follow-up: If you have bi-directional replication (US ↔ EU), how do you prevent infinite replication loops? What's the replication cycle detection mechanism?

Network partition: US and EU clusters are isolated for 15 minutes. MM2 workers are in US (one of the clusters). During partition, MM2 can't reach EU. After network heals, how do you resync without duplicating messages?

During partition: MM2 workers are blocked trying to write to EU. Connector enters error state (retries exponentially). Messages accumulate in source cluster; no commits to `__connect-offsets` on EU (because EU is unreachable). Source cluster buffers retain all unprocessed messages.

After network heals: MM2 resumes fetching from source. However, offsets on EU's `__connect-offsets` are still at the last committed point (15 min ago). MM2 re-fetches and re-writes messages from source starting at that offset. If messages are idempotent (same key+value), duplication is prevented by broker dedup (if idempotence enabled). If not idempotent, duplicates occur.

Prevention: (1) Enable idempotence: target.producer.enable.idempotence=true. Broker deduplicates based on (PID, sequence). (2) Use transactions: exactly.once.support=required. Offsets and messages are committed atomically. (3) Monitor lag: set up alerting if lag > 60s. When partition heals, lag will spike. Auto-scale workers or trigger manual investigation.

Detection: Check MM2 connector status: curl localhost:8083/connectors/mirrormaker/status. If state is "FAILED", network partition likely occurred.

Recovery procedure: (1) Verify network is healed; (2) Restart MM2 workers: curl -X POST localhost:8083/connectors/mirrormaker/restart; (3) Monitor lag metric until it drops below SLO; (4) If lag doesn't drop after 30 min, scale up workers.

Production at Uber: Network partition lasted 20 minutes. MM2 restarted automatically. Lag spiked to 5 minutes but recovered within 2 hours (1M msg/sec = 120M messages to replay).

Follow-up: If the network partition isolates EU from US for 48 hours and disk retention on source cluster rotates out un-replicated messages, how do you handle message loss? Is it acceptable or should you have prevented this?

You have 3 regions: US, EU, APAC. MM2 replication is US → EU, EU → APAC (daisy-chain). A message is modified in-place in US. How does it propagate to APAC? Is there a lag compounding issue?

Daisy-chain replication: Message flows: US → EU (takes ~5 sec). EU stores it in EU topic. EU → APAC replicates the message (takes ~10 sec due to network). Total: 15 seconds from US to APAC.

If message is modified in-place: Kafka messages are immutable (offset is fixed). If you "modify" by appending a new message with updated value, the new message propagates US → EU → APAC with same 15-second lag. Old message still exists in all regions (offset doesn't change).

Lag compounding: Yes, daisy-chain increases lag multiplicatively. US → EU (5s) + EU → APAC (10s) = 15s total. If you added APAC → US (for backup), you'd have cycles and duplicate replication. Avoid daisy-chain for <1 second SLOs.

Alternative - Multi-source replication: Instead of daisy-chain, replicate from US to both EU and APAC directly (star topology). Each region gets data ~5 seconds later. No compounding lag.

MM2 limitation: MM2 doesn't natively support multi-source replication (multiple concurrent MM2 jobs pulling from same source). You'd need separate MM2 clusters: Cluster A (US → EU), Cluster B (US → APAC). Adds operational complexity.

Production at Confluent Cloud: They use star topology: US produces, EU and APAC replicate from US. For high-priority data, they also do EU → regional backups (hub-and-spoke model). Lag is predictable and measured.

Follow-up: If you need <1 second lag for US → APAC replication (geographic impossibility), how would you re-architect? Should you shard data or use local caches?

MM2 is backed by Kafka Connect. A Connect worker crashes during rebalancing of tasks. Some tasks are orphaned (no worker). Replication stalls. Recover and prevent future orphaning.

Kafka Connect cluster model: Workers are distributed. Connector tasks are assigned to workers. Worker failure triggers task rebalancing. If workers are deleted without graceful shutdown, task assignments are orphaned (assigned to dead worker, not re-assigned).

Recovery: (1) Verify Connect cluster status: curl localhost:8083/connectors. Find stalled connector; (2) Restart all remaining workers to trigger rebalancing: curl -X POST localhost:8083/connectors/restart; (3) Tasks will re-balance to healthy workers. (4) Monitor rebalance time (typically 30-60 seconds).

Prevention: (1) Run Connect cluster with 3+ workers (fault tolerance). If 1 dies, 2 remain, tasks re-balance. (2) Use health check: curl localhost:8083/ every 10 seconds. If worker is unresponsive, remove it from cluster. (3) Graceful shutdown: use SIGTERM (not SIGKILL) to allow workers to flush state before exiting. (4) Set group.id and offset.storage.topic to ensure state is persisted in Kafka (not in-memory).

Monitoring: Track Connect metrics: kafka.connect:type=connector-task-metrics,connector=mirrormaker,task=X,name=batch-size-avg. If metric suddenly drops to 0, task likely died. Alert immediately.

Production at Netflix: They run 10 Connect workers in a Kubernetes cluster. Failed workers are automatically replaced by Kubernetes. Rebalancing is automatic.

Follow-up: If you have 100 tasks and lose 1 worker, rebalancing takes 60 seconds. During this window, which tasks continue replicating? Can you prioritize critical tasks?

Want to go deeper?