Your PostgreSQL database hit write limits: single-shard handles 1K writes/sec but you need 10K writes/sec. Schema: users table (user_id, name, email, created_at). Design a sharding strategy that supports 10K writes/sec with no duplicate user_ids.
Single-shard write bottleneck: WAL serialization, index maintenance, lock contention. Need 10x throughput = 10 shards minimum. Sharding strategy: **Range-based sharding on user_id**. (1) Shard 0: user_id 0-99M, (2) Shard 1: user_id 100M-199M, ... (10) Shard 9: user_id 900M-999M. New user registration: generate user_id = rand(0, 1B). Hash to shard = user_id % 10. Insert into corresponding PostgreSQL instance. No duplicate risk: each shard handles unique ID range. Write throughput: 10 shards × 1K writes/sec = 10K writes/sec. Latency per write: single node = 5-10ms (unchanged). Distributed write: 5-10ms per shard. Advantages: (a) Even distribution if IDs are uniformly generated. (b) Scaling: add shard 11 (IDs 1B+), rehash future IDs. (c) Hot-shard problem minimized (IDs uniformly distributed). Disadvantages: (a) Queries crossing shards (e.g., "find users by email") require scatter-gather across all 10 shards. Email query latency = max(10 shard queries) = ~100-200ms (vs 5-10ms on single shard). (b) Resharding is expensive (redistribute IDs when adding new shard). Implementation: (1) Use library/ORM for shard routing (Spring Data with custom ShardingStrategy, or Django ORM middleware). (2) Shard lookup: CREATE TABLE shard_map (shard_id INT, min_id BIGINT, max_id BIGINT). At app startup, load map into cache. (3) On write: app computes shard_id = user_id % 10, looks up shard connection string, executes INSERT. (4) On reads: if query has user_id filter, use same modulo to select shard (single-shard query, 5-10ms). If query has no shard key (e.g., WHERE name = "Alice"), scatter-gather (100-200ms). Cost: 10 PostgreSQL instances × $500/month = $5K/month. Latency: writes 5-10ms, reads 5-10ms (with shard key) or 100-200ms (without). Recommendation: **Range-based sharding on user_id** for this workload. Alternatively: **consistent hashing** for easier resharding (add shard 11 without rehashing all user_ids < 1B), but requires more sophisticated client library.
Follow-up: You add shard 11 for new user_ids >= 1B. Old users (IDs 0-1B) stay on shards 0-9. After 1 year, shard 11 is 10% of traffic, shards 0-9 are 9% each. Load is imbalanced. How do you rebalance without downtime?
You implement multi-master replication: master A (US) writes, replicates async to master B (EU). Simultaneously, master B writes, replicates to master A. Network partition for 5 minutes. Both masters accept writes during partition. A: user_email = "alice@example.com", B: user_email = "alice@example.com" but different user. After partition heals, you see conflicting writes. How do you resolve?
Multi-master replication with async replication causes write conflicts during network partition. Both masters continue accepting writes independently. Post-partition: replication logs from A → B and B → A conflict. Conflict: A has user_id=1, email="alice_a@...", B has user_id=1, email="alice_b@...". Same row, conflicting values. Resolution strategies: (1) **Last-write-wins (LWW)**: use timestamp. A: write timestamp 1000ms, B: write timestamp 1050ms. LWW selects B's email. Net result: user_id=1, email="alice_b@...". Risk: A's write is silently lost. If Alice_a paid for account, payment data becomes mismatched. (2) **Application conflict resolution**: detect conflict in replication stream, alert application. Application logic: user_id=1 exists in both masters with different emails. Lookup user: which email was actually registered? Fetch from external source of truth (audit log, email verification DB). Update losing master to match. (3) **Conflict-free replicated data types (CRDT)**: use Last-Write-Wins with additional metadata. Each write includes (timestamp, server_id, user_id). On conflict, compare {1000ms, A, 1} vs {1050ms, B, 1}. Timestamp wins (1050 > 1000), so B's value wins. Deterministic resolution. (4) **Vector clocks**: each write includes vector clock {A: 5, B: 3}. Conflict write from A: {A: 6, B: 3}. Write from B: {A: 5, B: 4}. Clocks are incomparable (one is not strictly > other). Conflict detected, manual resolution needed. (5) **Dual-write mode with quorum**: require writes to be acknowledged by both A and B before succeeding. During partition, writes to A fail (can't reach B). Writes to B fail (can't reach A). No conflicts. Cost: strong consistency, but partition causes unavailability (CP tradeoff). Best approach for multi-master: **(1) LWW + external audit log**. LWW resolves conflicts deterministically. External audit log (immutable, separate from DB) records both writes (Alice_a and Alice_b). Application can replay/audit conflicts. Cost: simple (no manual resolution), audit trail for compliance. Risk: some data loss (silently overwritten by LWW). Mitigate: set alerts when conflict detected, investigate within hours. Recommended for high-availability scenarios (accept eventual consistency).
Follow-up: If the conflicting write is a financial balance (user.balance = 100 vs 200), does LWW still work, or is external audit log insufficient?
A distributed database uses read-heavy replication: 1 primary (writes only), 3 read replicas (reads only). Async replication lag: ~100ms. A user updates their profile, then immediately reads it back. Sees old data (updated field is missing). This is a "read-after-write" consistency violation. Design a solution.
Read-after-write consistency is critical for user-facing applications. Violation: user updates → write goes to primary → read hits replica (not yet replicated) → stale data. Solutions: (1) **Read-write splitting with session affinity**: after write, pin user's session to primary for next N seconds. Reads also go to primary during this window. Latency: reads are now ~10-50ms (primary has more load), but consistency guaranteed. Cost: load shift to primary. (2) **Causal consistency with version tokens**: on write, primary returns version token (e.g., {table: users, version: 1001}). Client caches token. On read, client passes token to replica. Replica checks: "have I replicated past version 1001?" If yes, return data. If no, wait for replication (blocking read, timeout 1 second). Latency: read is <1ms if replicated, up to 1-second timeout if not. (3) **Read from primary on causal dependency**: similar to (2), but explicit. App passes "causally_depends_on: version_token" in read request. Replica enforces causality. Cost: requires app awareness of tokens. (4) **Replication lag monitoring + read steering**: measure replication lag continuously. If lag > 50ms, route read to primary. If lag < 50ms, route to replica. Probabilistic steering: 30% reads → primary, 70% → replicas. Latency: reads hit replica 70% of time (~5ms), primary 30% (~10ms). Consistent reads guaranteed 30% of time. Cost: variable latency. (5) **Hybrid: write-through cache + read-aside**: write updates cache synchronously, then async updates primary. Read hits cache first (consistency guaranteed), falls back to primary if miss. Latency: read <1ms (cache hit), 10-50ms (cache miss). Cost: cache invalidation complexity. Recommended: **(2) causal consistency with version tokens**. Implementation: (a) On write, primary increments LSN (log sequence number), returns LSN to client. (b) Client caches LSN in session. (c) On next read, client passes LSN to replica. Replica waits for its replication to reach LSN (usually <100ms). (d) If wait exceeds timeout (1 second), read from primary. Cost: low latency (replica reads), guaranteed consistency (version tracking). Database support: MySQL 5.7+ (replication_server_uuid, LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP), PostgreSQL 11+ (can build with application logic).
Follow-up: Version token is 64-bit LSN. If LSN overflows (reaches max), what happens to consistency guarantees?
A master-slave setup (master in US, slave in EU, slave in APAC) uses synchronous replication: writes require ACK from all slaves before returning success. A EU slave network connectivity degrades: replication round-trip takes 500ms instead of 100ms. Write latency spikes from 150ms to 650ms (50ms local + 500ms EU + 100ms APAC). Throughput drops 4x (client timeouts increase). How do you recover without losing synchronous replication?
Synchronous replication with slow replica blocks all writes. EU slave bottleneck affects global write latency. Solutions: (1) **Semi-synchronous replication**: require write ACK from only 1 slave (master + 1 slave = 2/3 quorum), not all 3. Write latency: 150ms local + max(100ms APAC, 500ms EU) = max = 500ms worst case (if APAC happens to be slow). Average: 150 + 100 = 250ms. Throughput recovers. Cost: lose one replica's sync guarantee, but 2 copies are safe. (2) **Switchable consistency**: use adaptive quorum. During normal conditions: wait for 2/3 ACKs. If one slave is slow (>200ms), skip it, wait for fastest 2. Write latency: 150 + max(fastest 2 of 3) = 150 + 100 = 250ms. Cost: code complexity, need to detect and skip slow slaves. (3) **Prioritized replication**: prioritize APAC (fast) over EU (slow). Quorum: master + APAC = 2/3. EU replicates async. Write latency: 150 + 100 = 250ms. EU is eventually consistent (100-500ms lag). Cost: EU users see slightly stale data. (4) **Parallel replication**: send write to master, but replicate to all slaves in parallel. Master returns success immediately (before slave ACKs). This is async replication, not synchronous. Risk: if master crashes before EU/APAC ACKs, writes are lost. Cost: lose durability guarantee. (5) **Isolate slow slave**: detect EU slave is slow, temporarily remove from replication quorum. Quorum: master + APAC = 2/3. Meanwhile, debug EU network issue (increase bandwidth, route via different ISP, etc.). Once EU recovers, add back to quorum. Write latency during isolation: 150 + 100 = 250ms. Cost: manual intervention needed. Recommended: **(1) semi-synchronous replication** for production. Require 2/3 ACKs (master + fastest slave). Write latency: 150 + max(US-APAC distance) = 150 + 100 = 250ms in normal case. If one slave is slow, writes aren't blocked. EU slave still eventually replicates async. Cost: lower durability guarantee (1 replica async), but acceptable for most use cases. For extreme durability: **(2) switchable consistency** (skip slow replicas dynamically).
Follow-up: EU slave eventually replicates async, but 30 seconds later EU DB crashes. Writes that were sent before EU crash are lost (EU replica didn't catch up). Is this acceptable?
A MongoDB replica set (primary, secondary, arbiter) in single data center loses power. All replicas go down. You have backups stored on S3 (daily backup, latest is 12 hours old). RTO = 1 hour (restore from backup), RPO = 24 hours (acceptable data loss is 24 hours). Design recovery steps and estimate time to recover.
Single data center failure with backup recovery. RTO = 1 hour, RPO = 24 hours. Recovery steps: (1) **Detect failure**: monitoring alerts on replica set heartbeat timeout (30 seconds). (2) **Declare RTO window**: "OK, we have 1 hour to restore." (3) **Fetch backup from S3**: latest backup = 12 hours old, size = 500GB. Download from S3: 500GB @ 100 Mbps (typical S3 speed to EC2 in same region) = 500GB × 8 bits/byte ÷ 100 Mbps = 40K seconds ≈ 11 hours. Too slow! Mitigation: (a) Use S3 Cross-Region Replication to pre-copy backup to nearby region. Download from nearby region: 5-10 Gbps ≈ 400-800 seconds ≈ 7-13 minutes. (b) Or use AWS DataSync (faster transfer, parallel) ≈ 10-30 minutes. (4) **Restore to new MongoDB instance**: spin up new EC2 instance, attach volume, restore MongoDB from backup. Restore time: 500GB @ 1GB/sec (disk throughput) = 500 seconds ≈ 8 minutes. (5) **Verify data**: sanity checks (collections exist, index count, document count). ~5 minutes. (6) **Update DNS**: point app to new MongoDB instance (or update connection strings). ~5 minutes. Total recovery time: 30 sec (detect) + 20 min (fetch backup) + 8 min (restore) + 5 min (verify) + 5 min (DNS) = 68 minutes. Exceeds 1-hour RTO. Mitigation: (1) **Pre-stage new MongoDB instance** during backup transfer. Spin up new instance while downloading backup. Parallel execution: 20 min (download) + 8 min (restore) = 28 minutes total (not sequential). New total: 30 + 28 + 5 + 5 = 68 minutes. Still tight. (2) **Use cross-region replica set**: keep 1 replica in different data center (replication lag ~100ms). If primary DC fails, promote secondary in other DC. RTO: ~30 seconds. RPO: <1 second. Cost: double infrastructure. (3) **Use S3 backup + incremental backup**: daily full backup (500GB, 12 hours old) + hourly incremental backups (each 50GB). Latest incremental: 1 hour old. Restore full + apply incremental: 500GB + 50GB = 550GB restore, faster playback. Total recover: 30 + 20 + 8 + 5 + 5 = 68 minutes. Improvement: minimal. Best approach for RTO = 1 hour: **(2) cross-region replica set**. Cost: $10K/month extra (extra MongoDB instance in different DC). RTO: 30 seconds. RPO: <1 second. Alternative if budget limited: **(1) + (3)** (pre-stage instance + incremental backups). RTO: 60-70 minutes (tight but acceptable). Cost: ~$2K/month extra.
Follow-up: Cross-region replica is now primary. Original primary DC recovers and comes back online. How do you prevent split-brain (both claiming to be primary)?
A Cassandra cluster (RF=3, nodes A, B, C) uses eventual consistency. A client writes value = 100 to all 3 nodes. Nodes A and B acknowledge, node C is slow (network lag 5 seconds). Client sees "success" (2/3 quorum). Then client reads immediately from node C (slow). Sees stale data (value = old). Later, node C replicates and gets value = 100. Walk through consistency guarantees and improvements.
Cassandra quorum writes ensure durability (2/3 nodes have data), but not read consistency. Write: quorum_write = min(2, 3) = 2 nodes confirm. Client sees success. But node C missed write (network lag). Later read from C hits stale data. Root cause: **write quorum ≠ read quorum doesn't guarantee fresh reads**. Cassandra consistency level trade-off: (1) **Write quorum + read quorum**: write to 2 nodes, read from 2 nodes. If overlap >= 1 node, read is guaranteed fresh (at least 1 node has latest value). Configuration: write_consistency = QUORUM (2/3), read_consistency = QUORUM (2/3). Overlap guaranteed if 2 + 2 > 3, i.e., overlap >= 1. But example breaks: write hit A + B, read hit C only (possible if read happens before C replicates). This is edge case: **read doesn't overlap with write quorum**. Fix: increase read quorum = ALL (3/3). Now read must hit all 3 nodes including C. Latency: read = max(A, B, C) = 5 seconds (C is slow). Throughput drops. (2) **Strong consistency variant**: use lightweight transactions (Cassandra LWT). Write: INSERT ... IF NOT EXISTS. Cassandra uses Paxos consensus: propose to all 3 nodes, majority must agree. Write latency: round-trip to 2 nodes = ~100ms. Read: is guaranteed to see latest (Paxos ensures only one winner). Cost: 2x write latency (Paxos overhead) vs quorum. (3) **Read repair + hinted handoff**: quorum read (2/3 nodes), then async repair missing node C. When C recovers, replicate latest value to C. Consistency: eventual (C has stale data for a few seconds). (4) **Monotonic reads with vector clocks**: client reads include vector clock. Read from C sees old data + vector clock {A: 1, B: 1, C: 0}. Next read: client passes vector clock {A: 1, B: 1, C: 0}, requests only nodes with vector >= client vector. If C hasn't replicated yet, C is excluded from read. Read from A/B (consistent). Latency: 100ms vs 5 seconds. Cost: vector clock bookkeeping. Recommended fix: **(2) lightweight transactions** for strong consistency requirement. Write uses Paxos (2 round trips ≈ 100ms). Read sees latest. Cost: double write latency, but guaranteed consistency. Alternative: **(3) quorum read + read repair** for eventual consistency. Read = 100ms (2 of 3 nodes fast), C repairs async. Stale data window: <10 seconds.
Follow-up: Cassandra LWT (Paxos) fails if 2 nodes crash simultaneously (only C is up). Quorum can't be reached. Writes block. Is this acceptable?
A PostgreSQL database uses streaming replication (primary → standby, WAL streaming in real-time). Standby is 5 seconds behind primary (replication lag). A failover happens: standby is promoted to new primary. The old primary crashed 10 seconds later and can't be recovered. Did you lose any writes?
Replication lag during failover causes potential data loss. Scenario: (1) Write T1 at primary: "INSERT user = Alice" at UTC 12:00:00. (2) Write T2 at primary: "UPDATE balance = 100" at UTC 12:00:02. (3) Standby receives T1 immediately, but T2 takes 5 seconds to replicate (standby is 5 seconds behind). (4) At UTC 12:00:04, standby has only T1 (not T2). (5) Failover happens at UTC 12:00:05. Standby promoted to primary, now has T1 but not T2. T2 is lost. (6) Old primary crashes at UTC 12:00:10. T2 cannot be recovered (disk corrupted). Net data loss: T2 (balance update). Prevention strategies: (1) **Synchronous replication**: write T1 to primary + standby (quorum). Standby ACKs, then write returns success. Replication lag = 0. Failover: no data loss. Cost: write latency = network round-trip to standby (~50-100ms vs instant async). Throughput impact: 50% latency increase. (2) **Cascading replication**: primary → standby → tertiary. Before failover, ensure tertiary has received write (3-way quorum). Failover safer because 2 copies exist. Cost: 3 replicas overhead. (3) **Replication heartbeat + failover guards**: monitor replication lag. If lag > threshold (e.g., 10 seconds), pause accepting new writes. Failover occurs only when standby is caught up. Cost: writes block during high lag, lower availability. (4) **Asynchronous replication + backup**: keep replication async (low latency). But take frequent backups (every 1 minute) to S3. If standby promoted mid-lag, old primary crashes, you still have 1-minute-old backup. Data loss: <1 minute of writes. RPO = 1 minute. Cost: backup overhead, slower recovery. Recommended: **(1) synchronous replication** for strong durability. Configure: synchronous_commit = on (PostgreSQL). Write latency increases to 50-100ms, but zero data loss on failover. Cost: manageable for most use cases. Alternative if latency-sensitive: **(2) cascading replication + tertiary**. 3 replicas ensure failover safety. Latency = 50ms (sync between primary + 2 standby). Trade-off: infrastructure cost vs safety.
Follow-up: Synchronous replication: standby network is slow (200ms round-trip). Write latency = 250ms (primary local + 200ms standby). Timeout is 5 seconds, so writes don't timeout, but throughput drops. How do you optimize without losing sync safety?
A social network uses sharded PostgreSQL (100 shards by user_id). A user's friends are distributed across shards (friend graph spans multiple shards). Query: "show me friends of user X" requires hitting multiple shards and aggregating results. Latency is 500ms (bad UX). Design a replication strategy to reduce query latency without changing the sharding key.
Cross-shard friend graph query is slow because it requires scatter-gather across shards. User X's friends are split: 20% on shard 1, 30% on shard 2, 50% on shard 3. Reading from all 3 sequentially = max(3 shard latencies) = 500ms. Root cause: **sharding key (user_id) doesn't align with query pattern (friend relationships cross shards)**. Solutions: (1) **Denormalization with replica**: replicate user X's entire friend list to a dedicated cache/replica table. On shard 1 (where user X lives), store precomputed friend_list = {friend_ids: [1, 5, 10, 15, ...]}. Query: SELECT friend_list FROM users WHERE user_id = X (single shard) → ~10ms. Cost: write amplification (update friend_list when friendship changes, cascading updates across shards). (2) **Dimensional tables**: create a separate "friendships" table, sharded by (user_id, friend_id) composite. Query: SELECT * FROM friendships WHERE user_id = X LIMIT 100. Cassandra/Bigtable can handle this via range queries. Latency: ~50-100ms (single shard lookup + range scan). Cost: additional table, index maintenance. (3) **Fan-out read with caching**: read-through cache for friend lists. Query checks cache first. Cache miss → scatter-gather across 3 shards. On cache miss, aggregate and cache result for 1 hour. Latency: cache hit ~5ms, miss ~500ms. Hit rate ~90%, average latency ~55ms. Cost: cache maintenance, cache invalidation on friendship changes. (4) **Materialized view with async sync**: maintain materialized view: user_id → friend_ids (denormalized). Async sync job updates view whenever friendship changes (polling or event-driven). Query hits view (single lookup) → ~10ms. Cost: stale data window (eventual consistency, ~1-10 seconds lag). (5) **Partition by geography + replication**: if friends are often geographically co-located, repartition by (geo_region, user_id). Co-locate friends in same shard. Query only hits shard containing user's geo region. Latency: ~50ms (single shard). Cost: resharding complexity, query changes. Recommended: **(3) read-through cache** for simplicity. Friend lists are read-heavy (queries > updates). Cache 90% of requests. Latency: average ~55ms (acceptable for UX). Cost: ~$5K/month (cache infrastructure). Alternative for strong consistency: **(1) denormalization + write amplification**. Precompute friend lists synchronously. Latency: ~10ms. Cost: writes become slow (cascade to 3+ shards), but reads are fast. Use for real-time feeds where reads are critical.
Follow-up: Cache is now 2 days stale (async sync failed silently). User sees outdated friend list. How do you detect and recover?