You implement real-time notifications using change streams: when an order status changes, send email to customer. Your listener: `db.orders.watch([{$match: {operationType: "update"}}]).on("change", (change) => sendEmail(change.fullDocument.customerId))`. After 2 weeks, customers report missing emails. You check the change stream listener and find it's been disconnected from MongoDB for 6 hours. Emails sent during that window were lost. Design a resilient change stream consumer.
Change streams don't persist events—they're live streams. If consumer disconnects, missed events are lost. The MongoDB oplog only retains events for 24-48 hours (configurable), so if listener is down for 6 hours but oplog still has those events, you can resume from the resume token. But if oplog retention expires, data is gone forever.
Resilience strategy: (1) Store resume tokens: after processing each event, save the resume token to a "watermark" collection: `db.change_stream_state.updateOne({_id: "orders"}, {$set: {resumeToken: change._id}}, {upsert: true})`. If listener crashes, restart from the saved token; (2) Use exponential backoff for reconnection: if MongoDB connection drops, retry with backoff instead of giving up; (3) Implement idempotent side effects: emails sent must be idempotent. Use idempotency key (orderId + change timestamp) and check if email already sent before sending again; (4) Persist events to queue: forward all change stream events to a message queue (Kafka, RabbitMQ, SQS). The queue persists events and allows decoupling from MongoDB.
Example with resume token persistence: `const resumeToken = db.change_stream_state.findOne({_id: "orders"})?.resumeToken; const stream = db.orders.watch([...], {resumeAfter: resumeToken}); stream.on("change", (change) => { sendEmail(...); db.change_stream_state.updateOne({_id: "orders"}, {$set: {resumeToken: change._id}}, {upsert: true}); });`
Follow-up: If MongoDB oplog is only 24-48 hours but your listener can be down for multiple days, how would you guarantee event processing?
You build a feature: when a product inventory drops below 10 units, trigger a webhook to reorder from supplier. You implement with change stream: `db.products.watch([{$match: {"operationType": "update", "fullDocument.inventory": {$lt: 10}}}])`. You test: update inventory from 15 to 5, event fires, webhook works. However, in production, you see webhooks firing multiple times for the same inventory update. Some updates trigger 5+ webhooks. Why?
Change streams emit an event for each change in the oplog. If your update is written to oplog multiple times (due to replication, retries, or batching), the change stream sees multiple events. Additionally, if the inventory field appears in multiple places (e.g., document has both "inventory" and "stock" fields representing same thing), updates might trigger multiple events depending on how your aggregation pipeline is structured.
More likely: your webhook is being called, the webhook handler is retrying internally, or MongoDB is persisting the event multiple times due to replication lag. If 5 webhooks fire from 1 update, you're hitting the event multiple times in your listener.
Prevention: (1) Add deduplication logic: each change event has a unique _id. Store processed event IDs in a set, skip if already processed: `if (processedIds.has(change._id)) return;`; (2) Implement webhook idempotency: supplier reorder system should accept idempotent keys (ProductId + timestamp) and not create duplicate orders; (3) Use $group to deduplicate: `db.products.watch([{$match: {...}}, {$group: {_id: "$documentKey._id", inventory: {$first: "$fullDocument.inventory"}}}])` groups multiple events for same document; (4) Add cooldown: if reorder webhook just fired, ignore subsequent events for 1 hour on same product ID.
Verify: Add logging to change stream listener before webhook call. Count webhook calls vs inventory updates—if ratio > 1, you're getting duplicate events.
Follow-up: Design an idempotent webhook system that can safely receive duplicate calls for the same inventory change.
You scale your change stream consumer to 2 replicas for high availability. Both replicas watch the same orders collection with identical pipelines. Immediately, both replicas send duplicate emails—each customer gets 2 emails per order change. How do you coordinate multiple consumers without duplicates?
Change streams fire for all listeners independently. If 2 replicas both listen to the same collection with same pipeline, they both see the same events and both process them—duplicates.
Solutions: (1) Single consumer with failover: only 1 replica actively listens, others are standby. If active dies, standby takes over. Use distributed lock (MongoDB-based or Redis): `db.locks.updateOne({_id: "orders_consumer"}, {$set: {holder: "replica-1", expiry: now + 60s}}, {upsert: true}). Only holder processes events. Other replicas wait for lock acquisition or check lock holder`; (2) Partitioned consumers: each replica listens to subset of data using $match: replica-1 watches orders with `userId % 2 == 0`, replica-2 watches `userId % 2 == 1`. No duplication—each event goes to exactly one consumer; (3) Use message queue as buffer: change stream pushes all events to Kafka/RabbitMQ topic. Multiple consumer replicas form consumer group (Kafka) where each event is delivered to exactly one member. Kafka handles coordination; (4) Idempotent processing: all 2 replicas process all events, but use idempotent keys so duplicate processing is harmless (email sent twice still ends up as 1 sent due to idempotency check).
Simplest for your case: distributed lock pattern. Replica-1 acquires lock and processes events. Replica-2 waits for lock. If Replica-1 fails (lock expires), Replica-2 acquires lock and takes over. Keep lock expiry short (60s) so failover is fast.
Follow-up: Design a distributed lock system on MongoDB that handles process crashes without deadlocking.
Your change stream consumer processes events but some events contain the full document (fullDocument: {...}) while others have fullDocument: null. You're trying to extract order status from fullDocument: `const status = change.fullDocument.status`. This crashes with "Cannot read property 'status' of null" for some events. Why does fullDocument become null?
fullDocument is null for certain operations: (1) delete operations: when a document is deleted, there's no fullDocument to return (only documentKey); (2) drop collection/database events: fullDocument is null; (3) large documents: if fullDocument exceeds 16MB, MongoDB doesn't include it in change event to save bandwidth; (4) operations on unsharded collections viewed from sharded change streams.
For order status tracking, you're likely seeing delete events where customers delete their orders (or batch cleanup). When document is deleted, fullDocument is null.
Fix: (1) Check operation type before accessing fullDocument: `if (change.operationType === "delete") { logDelete(change.documentKey); return; }; const status = change.fullDocument.status;`; (2) For updates, use the updateDescription to get only changed fields instead of full document: `const statusUpdate = change.updateDescription.updatedFields.status;` if only status changed; (3) For deletes that need full data, query the database before deletion (hard—event comes after delete) or use soft deletes (set isDeleted: true instead of deleting).
Best practice: Always check operationType before accessing fullDocument. Use defensive coding: `const status = change.fullDocument?.status || "unknown"`.
Follow-up: Design a schema and change stream strategy for tracking order deletions and recovering deleted order data.
You use change streams to sync orders from MongoDB to Elasticsearch: watch orders collection, on each change, index into ES. However, you notice the indices in ES are often out of sync with MongoDB: ES shows 5M documents but MongoDB shows 5.1M. You run a full re-index nightly which takes 3 hours. Design a sync system that maintains consistency with <1 hour gap.
Lag causes sync divergence: (1) change stream consumes event from MongoDB at time T; (2) processes event; (3) sends to ES at T+100ms. Meanwhile, new events arrive. If ES indexing is slow (1s per document) and MongoDB writes at 1000/sec, ES falls behind constantly. After 1 day, lag is hours; (2) connection issues: if change stream disconnects, missed events accumulate; (3) ES failures: if ES is down, updates queue up; (4) resume token expiry: if lag > 48 hours, oplog entries expire and change stream can't resume.
Consistency strategy: (1) Dual write: on each order change in application, write to MongoDB AND to ES atomically. This ensures ES is always in sync. But adds complexity and failure modes (one write fails, data diverges); (2) Event log: all order changes go to immutable event log (Kafka topic). MongoDB consumer writes to MongoDB, ES consumer writes to ES. Both pull from same log—eventual consistency but guaranteed to reach same state eventually; (3) Change stream with periodic full-sync: run change stream for real-time sync (minutes lag), run full-sync job hourly to catch edge cases. Accept 1-hour max gap; (4) Increase change stream consumer throughput: parallelize processing, batch ES updates, use bulk API to reduce latency.
For your case with <1 hour SLA: use event log (Kafka). Application publishes order changes to Kafka topic. Both MongoDB consumer and ES consumer read from topic, processing independently. If ES consumer falls behind, it's eventually consistent but will catch up as lag decreases.
Follow-up: If you must use MongoDB change streams (no Kafka), how would you design a backpressure mechanism to keep ES in sync?
Your change stream listener uses aggregation pipeline for filtering: `db.users.watch([{$match: {operationType: "update", "fullDocument.role": "admin"}}])`. This filters at MongoDB server, reducing network traffic. However, you notice the change stream sometimes misses admin role changes: you update a user's role to admin but the change stream doesn't fire. You check and see the event in MongoDB oplog but didn't receive it. Why?
Change stream aggregation pipeline filters on fullDocument, but for some update operations, fullDocument might not be in the event immediately. Additionally, if the update only changes part of the document (using field-level updates), the fullDocument in the change event might not include the updated field depending on the stage of replication.
More likely: if update operation only includes updateDescription (field-level changes) and not full document, and your pipeline filters on fullDocument.role, the filter might not trigger because fullDocument isn't populated yet or is from a point-in-time before the update.
Better filtering: filter on updateDescription instead of fullDocument for updates: `db.users.watch([{$match: {operationType: "update", "updateDescription.updatedFields.role": "admin"}}])`. This catches updates that change the role field to admin.
However, there's a subtlety: updateDescription.updatedFields only contains the new value, not the old value. So `"updateDescription.updatedFields.role": "admin"` matches updates that set role field to admin (any value), not specifically to role="admin". Use `{$eq: ["$updateDescription.updatedFields.role", "admin"]}` to match exact value.
Full fix: `db.users.watch([{$match: {$or: [{operationType: "update", "updateDescription.updatedFields.role": "admin"}, {operationType: "insert", "fullDocument.role": "admin"}]}}])`. This catches both inserts and updates that result in admin role.
Follow-up: Design a change stream aggregation pipeline that reliably tracks all role changes (assign, revoke, elevate) for an audit log.
You implement a cache invalidation system using change streams: when a product document changes, invalidate Redis cache for that product. Your change stream processes ~1000 events/sec. Redis cache operations take 1ms each. After scaling to 10K events/sec, you notice cache invalidation is falling behind and cache hits are stale. Application queries to get product info sometimes get outdated cached values. Design a high-throughput invalidation system.
Latency bottleneck: processing 10K events/sec with 1ms per invalidation = 10 seconds to clear queue. By the time you invalidate an old event's cache, new events have already populated stale cache entries. Additionally, change stream listener is single-threaded—processes events sequentially.
High-throughput design: (1) Parallelize invalidation: use worker pool (10 workers), each processes events concurrently: `Promise.all([worker1(), worker2(), ...])`. With 10 workers, 1000 events/sec becomes 100 events/sec per worker (10ms queue depth), manageable; (2) Batch Redis operations: instead of invalidating 1 key at a time, collect 100 keys and use Redis pipeline: `redis.pipeline().del(key1).del(key2)...exec()`. Batch of 100 keys might take 5ms instead of 100ms individually; (3) Use message queue: change stream publishes to Kafka queue instead of directly invalidating. Multiple Redis workers consume from queue, process in parallel. Decouples MongoDB from Redis speed; (4) TTL-based cache: set product cache TTL to 30 seconds. Don't actively invalidate—let cache naturally expire. Simplest but staleness window is up to 30 seconds; (5) Use async fire-and-forget: enqueue invalidation to background job, return immediately from change stream listener. Don't wait for Redis confirmation before acknowledging change.
For your case: switch to message queue (Kafka) to decouple change stream from Redis. Multiple Redis workers pull from queue and invalidate in parallel.
Follow-up: Design a cache invalidation system that also handles cache warming on restart without thundering herd.
You use change streams for database audit logging: every change to sensitive data (users, payments) is logged to audit collection. Your change stream listener logs changes and writes to audit collection. However, you've noticed circular logging: the audit collection write itself generates a change event, which generates another audit entry, creating an infinite loop. After a few hours, audit collection grows to 100GB. How would you prevent this?
Change stream on the source collection fires for both application updates and audit listener writes. If audit listener writes to audit collection and also watches audit collection, you create a feedback loop: change on orders -> audit entry written -> change on audit -> audit entry for the audit change -> infinite recursion.
Prevention: (1) Use separate collection for audit: watch orders collection, write to audit_orders collection (different collection). This breaks the loop because audit_orders changes don't re-trigger the orders change stream; (2) Filter out audit writes in aggregation pipeline: `db.orders.watch([{$match: {operationType: {$ne: "audit"}}}])`. Mark audit updates with special operationType and filter them; (3) Use change stream on separate database: watch orders in production_db, write audit to audit_db. They're different databases—change stream on production won't see audit_db changes; (4) Flag audit writes: before writing audit entry, set a thread-local flag, check flag in change stream listener, skip processing if flag is set.
Best practice: use separate audit collection for each source collection. Orders changes -> orders_audit, Users changes -> users_audit. No shared collection, no loops. At query time, join results if needed.
Verify fix: start change stream, make single order change, check audit_orders receives exactly 1 entry. Wait 5 minutes, confirm audit collection count doesn't grow spontaneously.
Follow-up: Design a comprehensive audit logging system on MongoDB that captures all changes without circular dependencies or performance impact.