// From a blocking socket to an event-driven coroutine — no magic, just syscalls
In the previous post, we developed a toy coroutine scheduler using generators and a sleep-based timer. We assumed that the coroutine could tell the scheduler exactly how long to wait. In real network IO, there is no such promise. We need some other mechanism to decide when to resume the generator. In this post, let's take the example of a simple get request and understand how we can bridge the gap between a toy generator and a real async coroutine.
Your application cannot communicate over the network directly. It must request the OS to do so. The OS provides this capability through an abstraction called as socket. A socket is a structure that enables bidirectional communication over the network between two nodes. When you create a socket, the OS returns file descriptor (an integer index) to refer to it. Later, you can use this index to interact with the socket.
Here is a simple HTTP fetch using raw sockets:
import socket
def fetch_data():
host = "example.com"
port = 80
# syscall: socket() — kernel allocates a socket and returns a file descriptor
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# syscall: connect() — kernel initiates the TCP three-way handshake:
# and blocks until the handshake completes.
sock.connect((host, port))
request = (
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"Connection: close\r\n"
"\r\n"
)
# syscall: send() — kernel copies bytes to a buffer,
# then the NIC transmits them. Blocks until all bytes are handed off.
sock.sendall(request.encode())
# syscall: recv() — blocks until the server sends data back
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
sock.close()
return response.decode()
if __name__ == "__main__":
print(fetch_data())
Each step above involves a syscall. The application drops into kernel mode, the kernel does its work, and only then returns execution to your code. The important thing to notice is that during all these calls, the CPU is mostly idle.
In all three cases the thread blocks. It cannot do anything else while it waits. Let's address this issue first.
Sockets have a non-blocking mode. When a socket is set to non-blocking, all the syscalls that previously blocked the thread would return immediately, but with an error code telling you "there's nothing ready yet, try again later." This gives the application the flexibility to proceed with other tasks and come back (concurrecy).
In C, when a non-blocking syscall fails, it returns -1 and sets errno to
EAGAIN
or EWOULDBLOCK. Python's socket library surfaces these errno values as exceptions,
BlockingIOError. It is not an error in the
traditional sense. It just indicates that the socket is not ready yet. We can catch this and treat it as a
signal to yield control.
Here is what the non-blocking version of the fetch_data function.
When we call connect(), the kernel starts the TCP handshake in the background and the
non-blocking socket immediately raises
BlockingIOError to signal that come back later. At this point, our generator can yield the
socket to the scheduler with a signal.
Essentially, it can inform the scheduler: "Hey, I have initiated the TCP handshake with this socket. Come
back to me when the socket is ready to write."
import socket
import errno
def fetch_data():
host = "example.com"
port = 80
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
# connect() on a non-blocking socket TYPICALLY raises BlockingIOError
# (errno EINPROGRESS) idicating that the kernel has started the
# TCP handshake but the application need not wait.
try:
sock.connect((host, port))
except BlockingIOError:
pass # expected — handshake is in progress in the background
# Yield back to the scheduler. Tell it to resume once this
# socket is writable.
yield sock, "write"
# At this point the connection is established. Build the request.
request = (
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"Connection: close\r\n"
"\r\n"
)
# sendall() on a non-blocking socket can raise BlockingIOError if the
# kernel send buffer is full. We retry in a loop until all bytes are sent.
data = request.encode()
while data:
try:
sent = sock.send(data)
data = data[sent:]
except BlockingIOError:
# Buffer full — yield and wait for the socket to become writable again
yield sock, "write"
# Tell the scheduler: "resume me once this socket has data to read"
yield sock, "read"
# Read the response. recv() on a non-blocking socket raises BlockingIOError
# if no data is available yet. We yield and wait for another read event.
response = b""
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break # empty bytes = server closed the connection
response += chunk
except BlockingIOError:
# No data yet — yield and wait for the next read event
yield sock, "read"
sock.close()
return response.decode()
Similarly, the generator can trigger other non-blocking syscalls and yield back to the scheduler.
Each yield is now a contract between the generator and its scheduler: "I'm pausing here —
resume me when this specific event fires on this specific socket." But the scheduler
still needs a mechanism to know when that event actually fires.
select / epoll
The kernel provides multiple syscalls, select, poll,
epoll (Linux), kqueue (macOS/BSD), that let an application hand a list of
file descriptors to the kernel and request: "please notify me when any of these are ready." Note that the
kernel
does not waste CPU cycles to continuously monitor the file descriptors (sockets in this context) for an
event.
It prompts the application only when a desired event is triggered within the socket.
Python's selectors module provides all the necessary abstractions for this communication with
the kernel.
It automatically picks the best mechanism available on the current OS (epoll on Linux,
kqueue on macOS). We can use it to build the scheduler.
At each yield, fetch_data hands the scheduler a
(socket, event) pair. The scheduler registers that socket with the selector and then
calls sel.select(), which blocks until the OS reports that one or more of the registered
sockets are ready. No busy-waiting, no polling loop — the process is genuinely idle between events.
Here is the updated generator with selectors constants replacing the bare strings:
import socket
import selectors
def fetch_data():
host = "example.com"
port = 80
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect((host, port))
except BlockingIOError:
pass
# Yield the socket and the event the scheduler should wait for.
# EVENT_WRITE fires once the TCP handshake completes and the
# connection is ready to accept data.
yield sock, selectors.EVENT_WRITE
request = (
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"Connection: close\r\n"
"\r\n"
)
data = request.encode()
while data:
try:
sent = sock.send(data)
data = data[sent:]
except BlockingIOError:
yield sock, selectors.EVENT_WRITE
# EVENT_READ fires once the server has sent at least some response bytes
yield sock, selectors.EVENT_READ
response = b""
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
except BlockingIOError:
yield sock, selectors.EVENT_READ
sock.close()
return response.decode()
The scheduler's job is to run a generator until it terminates with a valid response (raises StopIteration).
It registers the
(socket, event) pair with the selector and resumes the generator when the kernel notifies that
the event has fired.
import selectors
def run(gen_func):
# Instantiate the generator.
gen = gen_func()
# Ask the OS for the best available IO multiplexing mechanism
# (epoll on Linux, kqueue on macOS).
sel = selectors.DefaultSelector()
# Run the generator to its first yield to get
# the initial (socket, event) pair.
try:
sock, event = next(gen)
except StopIteration:
return # generator finished synchronously with no IO — unusual but valid
# Register the socket. Attach the generator itself as metadata
# so we can retrieve it when the event fires.
sel.register(sock, event, data=gen)
while True:
# sel.select() BLOCKS here. The process is idle.
# The OS wakes the application only when one of the registered sockets
# has the event it asked for.
ready = sel.select()
for key, mask in ready:
gen = key.data
sel.unregister(key.fileobj)
try:
# Resume the generator — it runs until the next yield
sock, event = next(gen)
# Re-register for the new event the generator is now waiting on
sel.register(sock, event, data=gen)
except StopIteration as e:
# Generator exhausted — e.value holds the return value
print("Response received!")
print(e.value[:500])
sel.close()
return
if not sel.get_map():
sel.close()
break
if __name__ == "__main__":
run(fetch_data)
Let's trace through exactly what happens when run(fetch_data) is called, step by step:
fetch_data() returns a generator object. No code
inside fetch_data has run yet.next(). The generator runs until the first yield:
it creates a socket object, calls connect(), catches the BlockingIOError, and
yields (sock, EVENT_WRITE). The TCP handshake runs in the background.EVENT_WRITE and calls sel.select().
sel.select() returns.next(gen) again. The generator
sends the HTTP request and yields (sock, EVENT_READ). The scheduler registers for the
read event and sleeps again.sel.select() returns. The generator resumes, reads all available chunks, and
eventually returns the full response.
StopIteration. The scheduler catches the exception, extracts
e.value (the return value of the generator), prints it, and exits.
We can extend the above scheduler to handle multiple concurrent requests. We will use a dict to maintain a mapping between a generator and its socket. When a socket becomes ready, we resume its corresponding generator.
def run_all(gen_funcs):
"""Run multiple fetch_data generators concurrently on a single thread."""
sel = selectors.DefaultSelector()
# Kick off all generators and register their initial (sock, event) pairs
for gen_func in gen_funcs:
gen = gen_func()
try:
sock, event = next(gen)
sel.register(sock, event, data=gen)
except StopIteration:
pass # finished synchronously
while sel.get_map():
# Block until any registered socket has an event
ready = sel.select()
for key, mask in ready:
gen = key.data
sel.unregister(key.fileobj)
try:
sock, event = next(gen)
sel.register(sock, event, data=gen)
except StopIteration as e:
print(f"Done: {e.value[:80]}...")
sel.close()
if __name__ == "__main__":
# Fire off 5 concurrent fetches — all on one thread
run_all([fetch_data] * 5)
All the requests are in-flight simultaneously. Each is blocked on its own network event. The single
application thread sleeps inside sel.select() and is woken up exactly when the kernel
notifies it about an event in a socket. This is the core of what asyncio's event loop does. It
is this pattern, built with better error handling and cleaner abstractions.
In a future post, we'll try and dissect the abstractions provided by the asyncio library.