Python — practical round cheat sheet
Ordered by what you'll actually type. Coding primitives + templates first; Redis & the gateway pattern are reference at the bottom.
Pick the structure first:
ends (queue/stack/window) → deque ·
index the middle → list ·
priority → heapq ·
lookup/dedup → dict/set ·
LRU → OrderedDict.
Have cold: defaultdict · deque · heapq · queue.Queue + threading.Lock · time.monotonic.
1 · Objects & their methods — the API surface you blank on
# --- containers ---
d = defaultdict(list) d[k].append(x) # list / int(+=1) / set(.add) / dict(nested)
dq = deque(maxlen=None) dq.append(x) dq.appendleft(x) dq.pop() dq.popleft()
c = Counter(it) c[k]+=1 c.most_common(n)
od = OrderedDict() od.move_to_end(k) od.popitem(last=False) # LRU: False=oldest
# --- heapq: a MODULE on a plain list (min-heap) ---
h = [] heapq.heappush(h, x) heapq.heappop(h) heapq.heapify(lst) h[0]
# max-heap: push -x · ties/unorderable payload: (key, count, item)
# --- concurrency ---
q = queue.Queue() q.put(x) q.get() # get() BLOCKS when empty
q.task_done() q.join() # join blocks till every put is task_done'd
lock = threading.Lock() with lock: ... # NOT reentrant (use RLock if you re-lock)
ev = threading.Event() ev.set() ev.clear() ev.wait(t) ev.is_set()
t = threading.Thread(target=fn, args=(...), daemon=True) t.start() t.join()
2 · Concurrent worker template — crawler / parallel traversal / work queue
seen = {start}; lock = threading.Lock(); q = queue.Queue(); q.put(start)
def worker():
while True:
item = q.get()
if item is None: q.task_done(); return # poison pill
try:
for nxt in expand(item): # get_links / neighbors / children
if keep(nxt): # same-host / filter
with lock: # atomic check-AND-add
if nxt in seen: continue
seen.add(nxt)
q.put(nxt)
finally:
q.task_done() # in finally or join() HANGS
ts = [threading.Thread(target=worker) for _ in range(n)]
for t in ts: t.start()
q.join() # all real work done
for _ in ts: q.put(None) # stop workers
for t in ts: t.join()
Say it: "thread-safe Queue for work, Lock for the visited set, join() as the done-condition." Only expand()/keep() change per problem. Async twin: asyncio.Queue + worker coroutines + await q.join() — drops the lock (cooperative = atomic between awaits).
3 · Traversal templates
BFS / DFS — one deque
q = deque([start]); seen={start}
while q:
n = q.popleft() # BFS
# n = q.pop() # DFS
for m in neighbors(n):
if m not in seen:
seen.add(m); q.append(m)
append+popleft=BFS · append+pop=DFS. Mark seen on enqueue.
Topological sort (Kahn)
g=defaultdict(list); indeg={t:0 for t in tasks}
for a,b in deps: g[a].append(b); indeg[b]+=1
q=deque(t for t in tasks if indeg[t]==0)
order=[]
while q:
t=q.popleft(); order.append(t)
for m in g[t]:
indeg[m]-=1
if indeg[m]==0: q.append(m)
# len(order)!=len(tasks) -> cycle
Process tree — dict[pid]→node + recursion
kill: p = all.pop(pid)
for c in list(p.children): kill(c) # recurse; list() — mutating while looping
if p.parent: del p.parent.children[pid]
4 · Gotchas that cost points
task_done() in a finally, once per get() — else join() hangs forever.
- Lock the check-AND-add together (not just the add) — that pair is the race.
- mutable default →
field(default_factory=list), never x: list = [].
defaultdict: reading a missing key creates it — use d.get(k) to just check.
is for None/sentinels only; == for values. list.pop(0) is O(n) — use deque.popleft().
threading.Lock not reentrant → same-thread double-lock deadlocks; use RLock.
6 · typing — reference (you read it off the signature)
Optional[X] == X | None Callable == any fn Any == anything
list[int] dict[str,int] set[str] # 3.9+ builtin generics, no import
dict[str, dict[int, Callable]] # = event -> (sub_id -> fn)
7 · Redis — reference (discussed, not a coding task)
# lists = queues LPUSH k v BLPOP k [timeout] # BLPOP = the blocking park primitive
# strings/locks SET k v EX 30 SET k v NX EX 30 INCR k # NX EX = atomic lock w/ expiry
# dedup (set) SADD k m # returns 1 if NEW -> distributed check-and-add
# sched (zset) ZADD k score m ZPOPMIN k ZREMRANGEBYSCORE k 0 cutoff # sliding window
# ttl EXPIRE k 30 TTL k
Pub/sub is the WRONG choice for request/result routing
fire-and-forget,
no persistence — if no subscriber is connected the message is lost. Use
lists (persist until claimed) or
streams (XADD/XREADGROUP, durable + acks).
8 · Gateway ↔ batcher routing — reference
gateway: rid=snowflake(); pending[rid]=Future(); LPUSH work {rid,...}; await Future
batcher: run batch; LPUSH results:{instance} {rid,out} # instance = machine_id bits of rid
dispatcher (1/proc): while 1: _,m=BLPOP results:{instance}; pending.pop(m.rid).set_result(m.out)
Why: socket fd + Future are process-local → result must route to that exact process. Lists not pub/sub (persist). BLPOP not poll. One blocking conn per process, not per request. Route embedded in the snowflake's machine-id field. TTL keys + client timeout for crashes. Not a Lambda gateway — stateful, connection-affinitized fleet.
9 · Misc one-liners — low priority
time.monotonic() # elapsed / TTL / rate windows (never wall clock)
@cache def f(n): ... # functools — recursion -> memoized DP, free
hashlib.sha256(b).hexdigest() # content dedup / dup-files
os.walk(root) # (dirpath, dirs, files) recursive; os.path.islink to skip
urlparse(u).hostname # host from a URL
next(iter(d)) # first key (next needs an ITERATOR)
self._id += 1 # id generator — just a counter