A consumer application crashes and remains offline for 3 days. When it restarts, it tries to consume from offset 50000 in partition 0, but only offsets 85000+ exist. Explain what happened, why, and how the consumer should handle this.
The consumer hit a log truncation boundary. Kafka retains log segments based on log.retention.hours (default 168) or log.retention.bytes. During the 3-day downtime, segments containing offsets 50000-84999 aged out and were deleted. When the consumer re-subscribes with an outdated offset commit, the broker can't serve that data.
Handling strategies: (1) Catch OffsetOutOfRangeException and seek to earliest() or latest(); (2) Use offset commit strategy with shorter retention to minimize staleness; (3) Implement offset reset policy in consumer config: auto.offset.reset=earliest or latest; (4) Track offset commit timestamps in external store to detect stale offsets before seeking.
Production example: At scale, a monitoring consumer fell 5 days behind due to throughput spike. When restarted, it encountered this scenario. Mitigation: auto-reset to latest, then replay from Cassandra offset table when system stabilized.
Follow-up: How would you design a consumer that detects offset stale/lag and auto-corrects without losing tracking context? What role does log.segment.bytes and broker-side log compaction play?
Your production cluster has partition 0 of topic "events" with 10 million offsets spanning 45 days. A consumer needs to replay historical data from day 10 without reprocessing day 1-9. Offsets are stored in Cassandra. Design the offset recovery mechanism.
Implement a two-tier offset discovery system: (1) Query Cassandra for the consumer's last known offset/timestamp tuple; (2) Use Kafka's offsetsForTimes() API to convert the stored timestamp to the exact offset range at that point; (3) Seek consumer to that offset.
Code pattern: Call consumer.offsetsForTimes(Map<TopicPartition, Long>) with millisecond timestamp. This performs O(log n) binary search against the broker's offset index file, returning the offset >= your timestamp. Then consumer.seek(topicPartition, offset) to resume.
Edge cases: If timestamp is before earliest offset, broker returns -1; seek to beginning. If timestamp is after latest offset, seek to end. Offset index files live in $KAFKA_HOME/logs/ as .index and .timeindex files alongside log segments.
Follow-up: What happens if your consumer group's stored offset points to a deleted log segment, and your timestamp-based recovery also points outside retention window? How do you handle this gracefully in production?
Your log segment size is 1GB and you have 50 partitions. During peak hours, you're writing 100K msg/sec. Storage is filling at 500GB/hour. After 24 hours, brokers crash with disk full. Walk through segment lifecycle and recovery.
Kafka broker lifecycle: (1) Active segment accumulates writes in memory buffer; (2) When segment hits 1GB (log.segment.bytes), it's flushed to disk as immutable .log file with .index and .timeindex sidecar files; (3) New segment starts for continued writes; (4) Retention policy runs every 5 minutes, checking age/size, deleting expired segments; (5) If disk fills before retention runs, broker becomes read-only or crashes.
Recovery: Configure log.retention.check.interval.ms=30000 to check more frequently. Set log.retention.hours=24 or log.retention.bytes to target size. Monitor broker disk watermark and auto-scale storage or reduce segment size (log.segment.bytes=512MB) to trigger retention faster.
Production insight: A 2000-broker cluster at LinkedIn processes 1.4 trillion msg/day. They segment aggressively (256MB segments) and run retention checks every 30s to maintain <80% disk usage.
Follow-up: If you reduce segment size from 1GB to 256MB to trigger faster rotation, what's the CPU/IO cost? How does this interact with log compaction and index rebuild time?
A flaky network causes your consumer to pause for 6 minutes. When it resumes, it's at offset 100000. You need to verify that this offset still exists and hasn't been deleted. Write a check.
beginningOffsets(partition) returns the earliest available offset; compare your stored offset against it. If stored_offset < beginning_offset, data was deleted.
Code:
Map<TopicPartition, Long> beginning = consumer.beginningOffsets(Arrays.asList(tp));
Map<TopicPartition, Long> end = consumer.endOffsets(Arrays.asList(tp));
long earliestAvailable = beginning.get(tp);
long latestAvailable = end.get(tp);
if (storedOffset < earliestAvailable) { // Data deleted, seek to earliest or handle error }
if (storedOffset >= latestAvailable) { // Offset is in future, seek to latest }
Production pattern: Call this at consumer startup to validate state before attempting fetch. Prevents silent data corruption if offset rotated out during downtime.
Follow-up: How does this validation interact with exactly-once semantics? If you detect an offset is stale, should you rollback the previous transaction or move forward?
Your Kafka cluster has 100 partitions with log segments of 1GB each. A consumer group needs to process all 100 partitions in parallel, but you notice that seeking to random offsets across partitions causes 50% slower throughput than sequential reads. Explain why and optimize.
Sequential I/O to index files is fast because offsets are stored in .index files (sparse offset index every 4096 bytes of log by default). Random seeks require the broker to binary-search each index file, then jump to that file offset in the log segment. This causes filesystem cache misses and disk arm thrashing.
Optimization: (1) Increase index density: Set log.index.interval.bytes=2048 to index more frequently (vs default 4096), reducing binary search depth. (2) Use OS-level page cache tuning: vm.swappiness=0, increase vm.max_map_count to allow larger mmap regions; (3) Parallelize fetches across partitions, not offsets: assign each consumer one partition, read sequentially; (4) Batch-seek: if seeking multiple times in same segment, cache the search result.
Metrics: Monitor broker log cleaner lag and index rebuild time. For 100 partitions × 1GB segments, expect 100-200ms worst-case seek time per partition at cold start.
Follow-up: If you set log.index.interval.bytes=1024 to double index density, what's the memory overhead per broker? How does this affect broker heap and GC pause times?
You're implementing a Kafka consumer that must support time-travel replay: given any past timestamp, replay all messages from that point forward. Your cluster runs with log.retention.bytes=10TB per broker (50 brokers). Design the offset lookup strategy.
Kafka stores a timestamp in every message (via log.message.timestamp.type=CreateTime or LogAppendTime). Brokers maintain .timeindex files that map timestamps to offsets. To replay from any timestamp: call offsetsForTimes() with your target timestamp, which performs O(log n) binary search across the timeindex file and returns the first offset >= that timestamp.
Implementation: Build a "snapshot" system: (1) Store (timestamp, offset) tuples every 1 hour to a compacted state topic; (2) At replay time, fetch the last snapshot before your target timestamp; (3) Use offsetsForTimes() for fine-grained offset within that hour window. This amortizes the offset lookup cost.
Production scale: With 50 brokers × 10TB each = 500TB retention, a full timeindex scan could take seconds. Snapshot system reduces this to <1 offset lookup per replay request.
Follow-up: What if the target timestamp is in a deleted segment (before the current earliest offset)? How would you handle this in an audit/compliance scenario where you need to prove data existed?
During a production incident, your team discovers that log segments are being deleted faster than expected. Retention is set to 10TB but you're only at 3TB on-disk. Write a diagnostic query to understand what's deleting segments.
Check the broker configuration: (1) log.retention.bytes per broker; (2) log.retention.hours (default 168 = 7 days); (3) log.retention.ms (overrides hours); (4) per-topic overrides via retention.bytes and retention.ms` on the topic itself.
Diagnostic steps: (1) List topic configs: kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name <topic> --describe; (2) Check broker logs for log cleaner output: grep for "Retention" in server.log; (3) Monitor broker metric kafka.log.log-retention-run-ms and deleted-log-start-offset to see segment deletion rate; (4) Verify ISR (In-Sync Replicas): kafka-topics.sh --describe to ensure all brokers are in sync (slow brokers trigger earlier retention).
Common root cause: A topic has retention.hours=24 override that's lower than global setting. Or a slow replica is out of sync, triggering min.insync.replicas and forcing old segments to be retained locally pending replication catch-up.
Follow-up: If a replica is severely lagged and blocking deletion, how do you safely remove it without data loss? Should you increase retention temporarily or failover the replica?
Your consumer is fetching 1GB of messages from offset 100000. The broker returns messages with gaps (offsets 100000-100100, then jump to 100500). This seems to violate sequential log reads. What's happening?
This is normal and expected. Kafka offsets are not guaranteed to be sequential within a partition because: (1) Log compaction removes older versions of keyed messages, leaving only the latest version, which creates offset gaps; (2) Aborted transactions (failed writes) can leave skipped offset ranges; (3) If a broker crashed during log segment rotation, the last segment might be corrupted and excluded from serving, skipping offsets.
Why it's correct: Offsets are logical sequence numbers, not physical positions. They're guaranteed to be monotonically increasing, but not contiguous. Your consumer must handle gaps by checking the offset field of each message rather than assuming sequential offsets.
Code pattern: When consuming, store the last seen offset: for (ConsumerRecord<> record : records) { processMessage(record); lastOffset = record.offset(); } Always use record.offset(), not an incrementing counter.
Production impact: A data pipeline that assumed sequential offsets crashed when offset gaps appeared during log compaction. Fixed by iterating offset field instead of assuming +1 increments.
Follow-up: How do offset gaps interact with consumer group rebalancing? If a consumer crashes after processing offset 100500, will the rebalanced consumer replay 100101-100499, or skip them?