Python Interview Questions

Context Managers and the with Statement

questions
Scroll to track progress

Production Scenario Interview Questions

Your application uses a database connection pool with context managers: `with pool.get_connection() as conn:`. In production, connections leak after exceptions during rollback. You inspect connection count and it's stuck at 50/50 (max capacity). What's the context manager bug?

The bug: if __exit__ raises an exception (e.g., during rollback), the connection doesn't return to the pool because exception handling breaks the context manager protocol. __exit__ receives exception info as (exc_type, exc_val, exc_tb); if __exit__ raises, the original exception is lost and the new exception propagates. If __exit__ is supposed to release the resource (return to pool), but raises during rollback, the resource is orphaned. Solutions: (1) ensure __exit__ never raises—wrap cleanup code in try/except and log errors, (2) suppress only the exceptions you want to suppress by returning True from __exit__ (False propagates), (3) explicitly handle rollback failures: `try: conn.rollback() except: logger.error(...)`—don't let cleanup errors escape, (4) use contextlib.closing() for simple resources, (5) test __exit__ exception paths with unittest.mock to simulate rollback failures. Example fix: `def __exit__(self, *args): try: self.conn.rollback() finally: self.pool.release(self.conn)`. Key insight: __exit__ must be robust—if it raises, resource cleanup is skipped and the resource leaks. Always return False (let exceptions propagate) or True (suppress), and ensure actual cleanup happens regardless of exceptions.

Follow-up: What's the difference between returning True vs. False from __exit__? If __exit__ raises, does the original exception get re-raised? Can you catch and handle exceptions in __exit__ that were raised in __enter__?

Your team implements a timeout context manager: `with timeout(5):` that's supposed to interrupt long-running code. Tests pass, but in production on high-load servers, the timeout fires in the wrong thread or doesn't fire at all. Why?

Context managers don't interrupt code—they only run at entry (__enter__) and exit (__exit__). A timeout context manager must use threads or signals to actually interrupt execution. Issues: (1) signal.alarm() only works in the main thread and on Unix—doesn't work in worker threads or Windows, (2) threading.Timer runs callbacks but doesn't interrupt the code—you need signal.SIGALRM or threading-aware interruption, (3) if timeout fires in a different thread, it can't interrupt the main thread's code—you need thread-safe cancellation tokens or threading.Event. Solutions: (1) for CPU-bound code, use concurrent.futures.ThreadPoolExecutor(timeout=...) instead, (2) for I/O-bound code, use asyncio.wait_for(coro, timeout=...) with async/await, (3) implement signal-based timeout only in main thread: set SIGALRM, catch SIGALRM in main thread, (4) for portable cross-thread timeout, use threading.Event + exception raising in thread (not safe), or better: use trio/asyncio which have native timeout support, (5) document that timeout context managers don't interrupt CPU-bound code—they only work for I/O-bound code where operations can be canceled. Accurate implementation: timeout context manager should only work with async code (asyncio) or explicit cancellation tokens—don't fake interruption with threads.

Follow-up: How does signal.SIGALRM interact with thread-local state? Can a signal handler in the main thread interrupt code in worker threads? What's the asyncio-based alternative?

Your microservice logs requests with `with log_context(request_id=req_id):`. Tests pass, but in async code, context values leak between concurrent requests. Thread-local storage isn't working. How do you fix context manager state in async?

The problem: threading.local() is thread-based; async code runs many coroutines in one thread, so all coroutines share thread-local state. Context values from one request leak to another. Solutions: (1) use contextvars module (PEP 567): `import contextvars; ctx = contextvars.ContextVar('name')`. contextvars are coroutine-local, not thread-local—each task gets isolated context, (2) replace threading.local with contextvars.ContextVar for async code, (3) use contextvars.copy_context() to snapshot context at task creation, ensuring child tasks inherit parent context without sharing mutable state, (4) in async context managers, set contextvars in __aenter__, reset in __aexit__: `async def __aenter__(self): self.token = self.var.set(self.value); return self`, `async def __aexit__(self, *args): self.var.reset(self.token)`, (5) use Token returned by set() to reset to previous value (not just delete), preserving context nesting. Example: `_request_id = contextvars.ContextVar('request_id'); async with log_context(request_id=req_id):` uses ContextVar internally. Testing: run concurrent tasks and verify context isolation. For backward compatibility with threads, use contextvars—they work in both sync and async (but are task-local, not thread-local).

Follow-up: What's the difference between threading.local and contextvars? Does contextvars work in synchronous code? How do you handle context inheritance across asyncio.create_task()?

Your ORM uses nested context managers for transactions: `with db.transaction(): with db.savepoint():`. After exception in savepoint, the outer transaction rolls back unexpectedly. How do you implement nested transactions correctly?

Nested transactions require savepoint support. A savepoint is a named rollback point within a transaction. Issues: (1) if __exit__ in savepoint context manager calls rollback() unconditionally, it undoes the entire outer transaction, not just the savepoint, (2) nested context managers must track nesting depth and only commit/rollback at appropriate levels, (3) SQL savepoints work differently than transactions—ROLLBACK goes to outer transaction, ROLLBACK TO SAVEPOINT goes to savepoint. Solutions: (1) use database savepoints (SAVEPOINT name, ROLLBACK TO SAVEPOINT name), not nested transactions, (2) track transaction state with a stack: enter pushes a savepoint name, exit pops and rolls back to that savepoint on error, (3) implement __exit__ to catch exceptions and determine rollback scope: if nested, rollback to savepoint; if top-level, rollback entire transaction, (4) use contextlib.contextmanager and generators to simplify state tracking, (5) check database docs for nested transaction support—PostgreSQL, MySQL, SQLite handle savepoints differently. Example: `with db.transaction() as t1: with db.savepoint(t1) as s1:` creates named savepoints. Key insight: nested transactions are tricky—use explicit savepoint management (SAVEPOINT) rather than trying to nest transaction contexts. Test with exception scenarios to verify rollback scope.

Follow-up: How do database savepoints map to context managers? Can you nest savepoints within savepoints? What happens if you try to nest transaction() within transaction()?

Your team uses `with file_handle:` for file I/O. A developer notices files sometimes remain open after the with block exits, especially under heavy load or exceptions. Why don't files close reliably?

Files should close reliably via __exit__ (using buffered I/O's __enter__/__exit__). Issues: (1) if an exception is raised in __enter__ (before opening), __exit__ is never called—file stays open if partially opened, (2) if close() in __exit__ raises, the exception escapes and may hide the original exception, (3) if the file object is deleted (garbage collected) before __exit__ runs, __del__ should close it, but GC timing is unpredictable—don't rely on __del__, (4) buffering issues: if data isn't flushed, it stays in the buffer; __exit__ should flush before close. Solutions: (1) ensure __enter__ succeeds fully or cleans up on partial failure, (2) wrap close() in try/except in __exit__ to prevent exceptions from escaping, (3) always use with statement—never rely on __del__, (4) verify files are closed with `with open(file) as f: assert f.closed == False; # in with block`, then `assert f.closed == True; # after with block`, (5) use pathlib.Path.open() for more robust file handling. Testing: use resource tracking (tracemalloc, psutil) to verify no open file descriptors linger. For reliability, use lsof/fuser on Linux to audit open files in production. Note: Python's buffering is automatic; close() flushes. Don't call close() before the with block exits—the context manager does it.

Follow-up: Does __del__ always call close() if close() isn't explicitly called? What's the order of execution if both __exit__ and __del__ are defined? How does buffering interact with __exit__?

Your monitoring tool needs to measure function execution time and exceptions with `with timer():`. Different team members implement it 4 ways and get different results under concurrent load. How do you implement a context manager that's thread-safe and async-compatible?

Thread-safety and async compatibility are orthogonal concerns. Issues: (1) if context manager stores state in instance variables (self.start_time), multiple threads/coroutines sharing the same instance will race, (2) if using threading.local(), async coroutines will see shared state (not thread-local to tasks), (3) return value from __enter__ must be independent per context use, not shared across threads. Solutions: (1) for thread safety, each context use (each with statement) creates a new instance—don't share context managers, (2) for async, use __aenter__/__aexit__ instead of __enter__/__exit__, or use contextvars for state, (3) store state in local variables (scoped to __enter__/__exit__), not on self, (4) use dataclasses to create isolated instances per with block, (5) if tracking metrics across threads, use queue.Queue or contextvars.ContextVar for thread-safe/async-safe state. Example correct implementation: `class Timer: def __enter__(self): self.start = time.time(); return self; def __exit__(self, *args): duration = time.time() - self.start; print(duration)`. Each with block creates a new Timer(), so state isn't shared. For async: `async def __aenter__(self): ...` and use async/await. For metrics aggregation, use threading.Lock or asyncio.Lock as appropriate. Test: create 1000 threads/tasks, all using timer context manager, verify no state corruption.

Follow-up: What's the relationship between __enter__/__exit__ and __aenter__/__aexit__? Can you implement both for a class? How do you test thread-safety of context managers?

Your codebase has dozens of context manager types (connections, transactions, locks, timers). Documentation is sparse. A junior developer uses them in unexpected ways: `ctx = db.transaction(); with ctx:` instead of `with db.transaction():`. State gets corrupted. How do you design context managers that are hard to misuse?

Context managers should be designed to prevent misuse. Issues: (1) if you can instantiate a context manager without using with, it may be misused (state not initialized properly), (2) if __enter__ and __exit__ aren't symmetrical, state can leak, (3) if context managers are reusable (can use the same instance multiple times), you need careful state tracking. Solutions: (1) use factory functions that return context manager instances: `db.transaction()` returns a new context manager, not callable as a standalone object, (2) prevent misuse by making __enter__/__exit__ check state: add flags like _in_context to ensure proper usage, (3) document clearly: use typing.ContextManager[T] to hint context manager return types, use typeshed to check, (4) use contextlib.contextmanager decorator for simple cases—it handles state automatically, (5) consider immutability: if context managers are one-time-use, make them impossible to reuse (raise error if used twice), (6) add __enter__/__exit__ checks: if __enter__ is called twice, raise RuntimeError. Example defensive pattern: `def __enter__(self): if self._entered: raise RuntimeError(...); self._entered = True; return self`. For factories, ensure they return fresh instances. Test with linters (mypy, pylint) to catch type errors before runtime. Use type hints extensively: `def transaction(self) -> ContextManager[Connection]:` signals that the return value must be used with.

Follow-up: How do you type-hint context managers with return values? Is ContextManager[T] from typing sufficient? How do you enforce single-use context managers at type-check time?

Your testing framework mocks context managers for unit tests: `with mock_db_transaction():`. Tests pass but integration tests fail because mock context manager doesn't properly simulate __enter__/__exit__ behavior. How do you mock context managers correctly?

Mocking context managers requires implementing or patching __enter__/__exit__. Issues: (1) if you use MagicMock() without configuring context manager protocol, it doesn't work as a context manager (missing __enter__/__exit__), (2) unittest.mock.MagicMock has __enter__ and __exit__ by default, but their return values may be wrong, (3) if your code depends on __enter__ return value (e.g., connection object), mock must return appropriate object, (4) __exit__ return value matters: True suppresses exceptions, False propagates—mocks must match this semantics. Solutions: (1) use unittest.mock.MagicMock() which has __enter__/__exit__ by default, (2) configure return values: `mock_ctx = MagicMock(); mock_ctx.__enter__.return_value = connection_mock; mock_ctx.__exit__.return_value = False`, (3) use contextlib.contextmanager to create simple test mocks: `@contextlib.contextmanager; def mock_transaction(): yield connection_mock`, (4) test that __exit__ is called with exception info—verify exception handling: `mock_ctx.__exit__.assert_called_once()`, (5) use pytest fixtures with yield to mock context managers cleanly. Example: `@pytest.fixture; def mock_db(): with patch('db.transaction', MagicMock(__enter__=MagicMock(return_value=MockConnection()), __exit__=MagicMock(return_value=False))): yield`. Key: ensure mock context manager semantics match reality (return values, exception propagation, side effects in __exit__).

Follow-up: How does pytest's monkeypatch interact with context manager mocking? Can you use unittest.mock.patch as a context manager to mock other context managers? What's the cleanest way to mock async context managers (__aenter__/__aexit__)?

Want to go deeper?