Your Kafka Streams application processes user events and maintains session state. Processing latency is 30 seconds (end-to-end), but SLA is <1 second. State store is 200GB (RocksDB). 10M events/sec throughput. Diagnose bottleneck and optimize.
Latency breakdown: (1) Event consumed (1ms), (2) State store lookup (X ms), (3) Processing logic (Y ms), (4) State update (Z ms), (5) Result output (1ms). Total: 30s. If state store is 200GB, issue is likely step 2 or 4.
Root cause investigation: (1) Check RocksDB compaction pauses: kafka.streams:type=state-store,name=session-store,operation=compaction-time. If metric spikes to 1-2 seconds periodically, compaction is bottleneck; (2) Check cache hit ratio: kafka.streams:type=cache,name=cache-hit-ratio. If <30%, many cache misses, excessive disk I/O; (3) Check changelog topic lag: kafka.streams:type=stream-processor,name=changelog-lag. If lag is 30s, state updates aren't flushed quickly.
Optimization 1 - Increase cache size: Default RocksDB block cache is small. Increase block.cache.size=1GB (vs default 100MB). This caches hot state in memory, reducing disk I/O. For 200GB state store with 90% working set in cache, reduces latency from 30ms to 5ms per lookup.
Optimization 2 - Tune RocksDB compaction: Set compaction.style=leveled (vs default universal). Leveled compaction is slower during writes but faster during reads. Also set compression.type=snappy to trade CPU for I/O reduction.
Optimization 3 - Increase topology parallelism: Add more Streams instances (8 → 16 instances). Each handles fewer partitions, state store per instance shrinks (100GB → 50GB). Less time scanning state store.
Optimization 4 - Window pruning: If using sessions, old sessions accumulate in state store. Set grace.period=1000 (1 second) to drop late-arriving events and prune state faster. This reduces state store size.
Production example at Uber: Session state was 300GB, 50s latency. Increased cache to 5GB, enabled leveled compaction, added 4 more instances. Result: 100GB per instance, 2s latency (25x improvement).
Follow-up: If you increase cache size to 5GB on each instance, and you have 16 instances, total memory = 80GB. What's the trade-off vs simply adding more instances but keeping cache small?
Your Streams app scales from 5 to 20 instances (rebalancing). During rebalance, state stores are rebuilt. Rebuild time is 5 minutes. During these 5 minutes, the app can't process events (warm-up). Minimize rebuild time.
Rebalance and state store rebuild: When a new Streams instance joins: (1) Old partitions are revoked from old instances; (2) New partitions assigned to new instance; (3) New instance replays changelog topic from beginning to rebuild state store; (4) Replaying 200GB changelog at 10M events/sec takes 20 seconds. But state store must be warm before instance starts processing, so add latency for flushing: 5 minutes total.
Optimization 1 - Restore from changelog fast: Increase parallel restore threads: num.restore.threads=8 (vs default 1). Replay multiple changelog partitions in parallel. Restore time: 5 minutes → 1 minute.
Optimization 2 - State store backup: Backup state store to S3 before rebalance. On new instance, restore from backup (S3 download + import = 30 seconds) instead of replaying changelog. Requires: (a) State store snapshots periodically (1 hour interval); (b) Upload to S3; (c) New instance downloads and restores.
Optimization 3 - Changelog topic retention: Ensure changelog topic has high retention (unlimited, or 90+ days). If changelog is rotated out, new instance can't rebuild from beginning. If changelog is missing, state is lost.
Optimization 4 - Standby replicas: Configure num.standby.replicas=1. Each state store has a standby replica on another instance, kept in sync. On rebalance, standby replica becomes active immediately (no rebuild). Trade-off: 2x state store replicas = 2x memory/disk.
Best practice: Optimization 1 (parallel restore) + Optimization 4 (standby replicas) hybrid. Restore time: <30 seconds. No warm-up lag.
Production at LinkedIn: Kafka Streams at scale (1000+ instances). They use standby replicas + backup snapshots. Rebalance time: 20 seconds (including restore). No SLA violations.
Follow-up: If you have 20 instances with standby replicas, that's 40 state stores in memory (20 active + 20 standby). Total memory: 80 instances × 200GB = 16TB. Is this sustainable, or should you use a different architecture?
Kafka Streams application has a session window aggregation that groups events by user_id. Sessions are 30 minutes (inactivity timeout). A user goes offline for 40 minutes, then comes back. Are their events in the same session or different?
Session window semantics: Session windows are time-bounded inactivity windows. If events are more than 30 minutes apart (inactivity gap >= 30 min), they belong to different sessions.
Scenario: User logs in at 10:00 (session 1 starts). Active until 10:20. Goes offline (40 min gap). Logs back in at 11:00. Since gap = 40 min > 30 min (timeout), this event starts session 2. Sessions are separate.
Grace period: Late-arriving events within grace period can be merged with existing session. Example: Session 1 is 10:00-10:20. At 10:30 (10 min after session end), a late event arrives for the same user. With grace.period=10*60*1000 (10 min grace), this event is merged into session 1 (extended). Without grace, it starts a new session.
Configuration: SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(5)). This creates 30-minute session windows, allows 5-minute grace for late arrivals.
Production implications: (1) Set grace period based on expected lateness of events. 5-10 minutes is typical. (2) Monitor grace period violations: if 20% of events are late beyond grace period, users are experiencing session splits. (3) Increase grace period if tolerable (more memory for extended state), or accept splits if lateness is expected.
Example at Netflix: User session windows for streaming video. 30-minute inactivity timeout. 2-minute grace period. If user pauses video for 40 minutes then resumes, it's a new session (reflects real behavior).
Follow-up: If grace period is 30 minutes (same as inactivity timeout), can sessions never end? What's the upper bound on session store memory?
Your Streams app uses a global table (GlobalKTable) to enrich stream events with user metadata. The global table is 500GB (all users). On Streams instance startup, it waits 10 minutes to fully load the global table. SLA requires <1 minute startup. Optimize.
GlobalKTable loading: GlobalKTable is a replicated table available to all instances. It's backed by a compacted topic. On startup, each instance reads the entire compacted topic from beginning to populate RocksDB state store. 500GB at 100MB/sec network throughput = 5000 seconds = 83 minutes. But with compression + batching, realistic time is 10 minutes.
Optimization 1 - Pre-warm GlobalKTable cache: Before deploying new Streams version, pre-load GlobalKTable on all instances to warm their caches. Then deploy: instances start with warm cache, skip full reload. Startup time: <30 seconds.
Optimization 2 - Lazy loading: Don't load entire GlobalKTable on startup. Load on-demand: when instance needs to enrich a record, if data isn't in cache, fetch from source (database, cache layer). Trade-off: first few requests are slow (query database) but instances start fast. After warm-up period, cache hits are fast.
Optimization 3 - Shard GlobalKTable: Instead of 500GB global table shared by all instances, partition it: shard 1 (A-M users), shard 2 (N-Z users). Each Streams instance loads only its shard (e.g., 100GB). Startup time: 100GB = 1000 seconds = 16 minutes. Still not <1 minute, but better.
Optimization 4 - Cache server in front of GlobalKTable: Deploy Redis or Memcached cluster as cache layer. GlobalKTable populates Redis. Streams instances query Redis (on-demand, <10ms per query). GlobalKTable continues loading in background. Startup: <1 second (only connect to Redis, no data loading).
Best practice: Optimization 4 (Redis cache layer). Streams instances start fast, cache warmth is handled by Redis layer. Requires additional infrastructure but scalable.
Production at Stripe: GlobalKTable for merchant metadata (500GB). They use Redis cache layer + GlobalKTable in background. Streams startup: 2 seconds. Cache hit ratio: 98%. First-miss queries fall back to database (100ms).
Follow-up: If metadata updates frequently (users' attributes change every second), and Redis cache has stale data, enrichment might be incorrect. How do you balance freshness vs startup latency?
Kafka Streams topology has a stream-stream join (orders ⋈ payments). Both streams are partitioned by order_id. You deploy 10 instances. During rebalance, some instances get imbalanced partition assignments: instance 1 has 40 partitions, instance 2 has 2 partitions. Latency and resource usage are uneven. Diagnose and fix.
Partition assignment imbalance: Kafka's default partitioner (range partitioner) can cause skew. If topics have different partition counts or deletion history, assignment might be uneven. Example: orders topic has 100 partitions, payments topic has 50. Joining requires co-partitioning. Kafka Streams rebalances to: instance 1 [orders 0-40, payments 0-20], instance 2 [orders 41-99, payments 21-49]. Imbalance: instance 1 processes 60 partitions, instance 2 processes 58. Worse if message rate differs (orders 10M/sec, payments 1M/sec).
Diagnosis: (1) Monitor partition assignment: enable StreamsBuilderConfigs.TOPOLOGY_OPTIMIZATION=StreamsBuilderConfigs.TOPOLOGY_OPTIMIZATION_BASIC to log assignments; (2) Monitor per-instance CPU/memory usage. If instance 1 is 80% CPU, instance 2 is 10%, imbalance confirmed; (3) Check lag per partition: instance 1's partitions likely lag behind instance 2's.
Fix 1 - Re-partition topics to same partition count: If orders has 100 and payments has 50, rebalance payments to 100 partitions. This ensures even co-partitioning. Trade-off: administrative overhead (recreate topic).
Fix 2 - Custom partitioner: Implement a partitioner that balances by message rate, not just partition count: StreamsBuilderConfigs.PARTITIONER_POLICY=CustomPartitioner. Partitioner sees per-partition metrics (throughput, size) and assigns partitions to instances evenly.
Fix 3 - Manual assignment (RangeAssignor with offset): Use StreamsPartitioner.RangeAssignor with custom offset to skip imbalanced partitions. Advanced, requires tuning.
Fix 4 - Add more instances: Scale from 10 to 20 instances. Each now gets half the partitions (better balance by law of averages). Cost: more memory/CPU, but evens out resource usage.
Production at Confluent: They provide custom partitioner plugins. Most customers use simple fix: ensure co-partitioned topics have same partition count. Automatic assignment then balances evenly.
Follow-up: If you have 100 partitions and 10 instances, that's 10 partitions per instance (ideal). But if 1 partition has 100x more messages (hot partition), latency is skewed. Can you split hot partitions dynamically?
Streams application crashes during a state store compaction (RocksDB). Changelog topic still has the log entries but state store is corrupted. On restart, instance tries to re-rebuild state store from changelog. Rebuild is slow. Can you detect and fix corruption faster?
State store corruption recovery: When instance crashes mid-compaction, RocksDB files (SST, manifest) can be inconsistent. On restart, Streams detects corruption (manifest doesn't match SST files) and triggers full state store rebuild: delete local state store, replay entire changelog topic from beginning. For 200GB state store, this takes 20 minutes.
Optimization 1 - Backup before compaction: Before compaction, snapshot state store to disk/S3. If crash occurs, restore from backup (instead of rebuild from changelog). Restore time: 5 minutes (copy from backup). Trade-off: need backup storage.
Optimization 2 - Disable compaction during peak hours: Set compaction.style=universal with compaction_options_universal.max_compaction_bytes=1GB to trigger compaction only when state store exceeds threshold. Schedule maintenance window (low traffic) for compaction. Reduce crash risk.
Optimization 3 - Validate state store health before compaction: Before compaction, RocksDB can be validated (checksum check). Set options.setParanoidChecks(true). If corruption detected before compaction, fail early and rebuild. Prevents mid-compaction crashes.
Optimization 4 - Use standy replicas: With standby replicas, if instance crashes, standby takes over immediately with warm state store (no rebuild). Original instance can rebuild in background.
Best practice: Use standby replicas + backup snapshots. Crash recovery time: <1 minute (standby takes over). Background rebuild finishes in 20 minutes.
Follow-up: If you have standby replicas and main instance crashes, does the standby immediately become primary? Or is there a lag (election time) before it's promoted?
Kafka Streams topology processes events and writes results to downstream Kafka topic. State store is in sync (transactional updates + outputs). If downstream Kafka write fails mid-transaction, what happens to the state store and offset commit?
Exactly-once semantics in Streams: Streams provides exactly-once guarantees via transactions. (1) Consume from input topic, (2) update state store, (3) produce to output topic, (4) commit input offset. All atomic. If any step fails, all are rolled back.
Failure scenario: (1) Event consumed, (2) state store updated, (3) write to output topic fails (broker unreachable). Streams rolls back: state store changes are reverted (RocksDB transaction rollback), input offset is NOT committed (so next restart re-processes the event).
On restart: Instance re-reads input offset, re-processes event, re-updates state store, re-tries output write. If output write succeeds, state store and offset are committed together. If fails again, cycle repeats. Exactly-once guarantee holds: no duplicates in output, no data loss.
Configuration for exactly-once: processing.guarantee=exactly_once_v2 (newer, recommended over exactly_once). This enables transactional state store updates.
Trade-off: Transactions slow throughput (1000 msg/sec vs 10K msg/sec without exactly-once). For non-critical workloads, use at_least_once (faster, allows duplicates).
Production at Netflix: All critical Streams apps use exactly_once_v2. Payment processing, user state management use this. Non-critical (analytics, metrics) use at_least_once for speed.
Follow-up: If exactly-once is slow (10x throughput hit), but you need 1M msg/sec, should you use at_least_once and handle duplicates downstream, or re-architect?