Python — practical round cheat sheet

Ordered by what you'll actually type. Coding primitives + templates first; Redis & the gateway pattern are reference at the bottom.
Pick the structure first: ends (queue/stack/window) → deque · index the middle → list · priority → heapq · lookup/dedup → dict/set · LRU → OrderedDict.
Have cold: defaultdict · deque · heapq · queue.Queue + threading.Lock · time.monotonic.

1 · Objects & their methods — the API surface you blank on

# --- containers --- d = defaultdict(list) d[k].append(x) # list / int(+=1) / set(.add) / dict(nested) dq = deque(maxlen=None) dq.append(x) dq.appendleft(x) dq.pop() dq.popleft() c = Counter(it) c[k]+=1 c.most_common(n) od = OrderedDict() od.move_to_end(k) od.popitem(last=False) # LRU: False=oldest # --- heapq: a MODULE on a plain list (min-heap) --- h = [] heapq.heappush(h, x) heapq.heappop(h) heapq.heapify(lst) h[0] # max-heap: push -x · ties/unorderable payload: (key, count, item) # --- concurrency --- q = queue.Queue() q.put(x) q.get() # get() BLOCKS when empty q.task_done() q.join() # join blocks till every put is task_done'd lock = threading.Lock() with lock: ... # NOT reentrant (use RLock if you re-lock) ev = threading.Event() ev.set() ev.clear() ev.wait(t) ev.is_set() t = threading.Thread(target=fn, args=(...), daemon=True) t.start() t.join()

2 · Concurrent worker template — crawler / parallel traversal / work queue

seen = {start}; lock = threading.Lock(); q = queue.Queue(); q.put(start) def worker(): while True: item = q.get() if item is None: q.task_done(); return # poison pill try: for nxt in expand(item): # get_links / neighbors / children if keep(nxt): # same-host / filter with lock: # atomic check-AND-add if nxt in seen: continue seen.add(nxt) q.put(nxt) finally: q.task_done() # in finally or join() HANGS ts = [threading.Thread(target=worker) for _ in range(n)] for t in ts: t.start() q.join() # all real work done for _ in ts: q.put(None) # stop workers for t in ts: t.join()

Say it: "thread-safe Queue for work, Lock for the visited set, join() as the done-condition." Only expand()/keep() change per problem. Async twin: asyncio.Queue + worker coroutines + await q.join() — drops the lock (cooperative = atomic between awaits).

3 · Traversal templates

BFS / DFS — one deque

q = deque([start]); seen={start} while q: n = q.popleft() # BFS # n = q.pop() # DFS for m in neighbors(n): if m not in seen: seen.add(m); q.append(m)

append+popleft=BFS · append+pop=DFS. Mark seen on enqueue.

Topological sort (Kahn)

g=defaultdict(list); indeg={t:0 for t in tasks} for a,b in deps: g[a].append(b); indeg[b]+=1 q=deque(t for t in tasks if indeg[t]==0) order=[] while q: t=q.popleft(); order.append(t) for m in g[t]: indeg[m]-=1 if indeg[m]==0: q.append(m) # len(order)!=len(tasks) -> cycle

Process tree — dict[pid]→node + recursion

kill: p = all.pop(pid) for c in list(p.children): kill(c) # recurse; list() — mutating while looping if p.parent: del p.parent.children[pid]

4 · Gotchas that cost points

6 · typing — reference (you read it off the signature)

Optional[X] == X | None Callable == any fn Any == anything list[int] dict[str,int] set[str] # 3.9+ builtin generics, no import dict[str, dict[int, Callable]] # = event -> (sub_id -> fn)

7 · Redis — reference (discussed, not a coding task)

# lists = queues LPUSH k v BLPOP k [timeout] # BLPOP = the blocking park primitive # strings/locks SET k v EX 30 SET k v NX EX 30 INCR k # NX EX = atomic lock w/ expiry # dedup (set) SADD k m # returns 1 if NEW -> distributed check-and-add # sched (zset) ZADD k score m ZPOPMIN k ZREMRANGEBYSCORE k 0 cutoff # sliding window # ttl EXPIRE k 30 TTL k
Pub/sub is the WRONG choice for request/result routing
fire-and-forget, no persistence — if no subscriber is connected the message is lost. Use lists (persist until claimed) or streams (XADD/XREADGROUP, durable + acks).

8 · Gateway ↔ batcher routing — reference

gateway: rid=snowflake(); pending[rid]=Future(); LPUSH work {rid,...}; await Future batcher: run batch; LPUSH results:{instance} {rid,out} # instance = machine_id bits of rid dispatcher (1/proc): while 1: _,m=BLPOP results:{instance}; pending.pop(m.rid).set_result(m.out)

Why: socket fd + Future are process-local → result must route to that exact process. Lists not pub/sub (persist). BLPOP not poll. One blocking conn per process, not per request. Route embedded in the snowflake's machine-id field. TTL keys + client timeout for crashes. Not a Lambda gateway — stateful, connection-affinitized fleet.

9 · Misc one-liners — low priority

time.monotonic() # elapsed / TTL / rate windows (never wall clock) @cache def f(n): ... # functools — recursion -> memoized DP, free hashlib.sha256(b).hexdigest() # content dedup / dup-files os.walk(root) # (dirpath, dirs, files) recursive; os.path.islink to skip urlparse(u).hostname # host from a URL next(iter(d)) # first key (next needs an ITERATOR) self._id += 1 # id generator — just a counter