9

Concurrency and Parallelism

Concurrency enables a computer to do many different things seemingly at the same time. For example, on a computer with one CPU core, the operating system rapidly changes which program is running on the single processor. In doing so, it interleaves execution of the programs, providing the illusion that the programs are running simultaneously.

Parallelism, in contrast, involves actually doing many different things at the same time. Computers with multiple CPU cores can execute multiple programs simultaneously. Each CPU core runs the instructions of a separate program, allowing each program to make forward progress during the same instant.

Within a single program, concurrency is a tool that makes it easier for programmers to solve certain types of problems. Concurrent programs enable many distinct paths of execution, including separate streams of I/O, to make forward progress in a way that seems to be both simultaneous and independent.

The key difference between parallelism and concurrency is speedup. When two distinct paths of execution in a program make forward progress in parallel, the time it takes to do the total work is cut in half; the speed of execution is faster by a factor of two. In contrast, concurrent programs may run thousands of separate paths of execution seemingly in parallel but provide no speedup for the total work.

Python makes it easy to write concurrent programs in a variety of styles. Threads support a relatively small amount of concurrency, while asynchronous coroutines enable vast numbers of concurrent functions. Python can also be used to do parallel work through system calls, subprocesses, and C extensions. But it can be very difficult to make concurrent Python code truly run in parallel. It’s important to understand how to best utilize Python in these different situations.

Item 67: Use subprocess to Manage Child Processes

Python has battle-hardened libraries for running and managing child processes. This makes Python a great language for gluing together other tools, such as command-line utilities. When existing shell scripts get complicated, as they often do over time, graduating them to a rewrite in Python for the sake of readability and maintainability is a natural choice.

Child processes started by Python are able to run in parallel, enabling you to use Python to consume all of the CPU cores of your machine and maximize the throughput of your programs. Although Python itself may be CPU bound (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”), it’s easy to use Python to drive and coordinate CPU-intensive workloads.

Python has many ways to run subprocesses (e.g., os.popen, os.exec*), but the best choice for managing child processes is to use the subprocess built-in module. Running a child process with subprocess is simple. Here I use the module’s run convenience function to start a process, read its output, and verify that it terminated cleanly:

import subprocess

result = subprocess.run(
    ["echo", "Hello from the child!"],
    capture_output=True,
    encoding="utf-8",
)

result.check_returncode()  # No exception means it exited cleanly
print(result.stdout)

>>>
Hello from the child!

Note

The examples in this item assume that your system has the echo, sleep, and openssl commands available. On Windows, this may not be the case. Please refer to the full example code for this item online to see specific directions on how to run these snippets on Windows.

Child processes run independently from their parent process, the Python interpreter. If you create a subprocess using the Popen class instead of the run function, you can poll child process status periodically while Python does other work:

proc = subprocess.Popen(["sleep", "1"])
while proc.poll() is None:
    print("Working...")
    # Some time-consuming work here
    ...

print("Exit status", proc.poll())

>>>
Working...
Working...
Working...
Working...
Exit status 0

Decoupling the child process from the parent frees up the parent process to run many child processes in parallel. Here I do this by starting all the child processes together with Popen upfront:

import time

start = time.perf_counter()
sleep_procs = []
for _ in range(10):
    proc = subprocess.Popen(["sleep", "1"])
    sleep_procs.append(proc)

Later, I can wait for them to finish their I/O and terminate with the communicate method:

for proc in sleep_procs:
    proc.communicate()

end = time.perf_counter()
delta = end - start
print(f"Finished in {delta:.3} seconds")

>>>
Finished in 1.01 seconds

If these processes ran in sequence, the total delay would be 10 seconds or more rather than the ~1 second that I measured.

You can also pipe data from a Python program into a subprocess and retrieve its output. This allows you to utilize many other programs to do work in parallel. For example, say that I want to use the openssl command-line tool to encrypt some data. Starting the child process with command-line arguments and I/O pipes is easy:

import os

def run_encrypt(data):
    env = os.environ.copy()
    env["password"] = "zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1"
    proc = subprocess.Popen(
        ["openssl", "enc", "-des3", "-pass", "env:password"],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
    )
    proc.stdin.write(data)
    proc.stdin.flush()  # Ensure that the child gets input
    return proc

Here I pipe random bytes into the encryption function, but in practice this input pipe would be fed data from user input, a file handle, a network socket, and so on:

procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_encrypt(data)
    procs.append(proc)

The child processes run in parallel and consume their input. Here I wait for them to finish and then retrieve their final output. The output is random encrypted bytes, as expected:

for proc in procs:
    out, _ = proc.communicate()
    print(out[-10:])

>>>
b'\x02a_\xd3\xd3\x9a\xd0\x8f\x14|'
b'S\x9c\x1a\x919\x9a-P\x0c\x1f'
b'\x1a\x7f\x1e\xbf\xac\xe5A>\xa3\xdd'

It’s also possible to create chains of parallel processes, just like UNIX pipelines, connecting the output of one child process to the input of another, and so on. Here’s a function that starts the openssl command-line tool as a subprocess to generate a Whirlpool hash of the input stream:

def run_hash(input_stdin):
    return subprocess.Popen(
        ["openssl", "dgst", "-whirlpool", "-binary"],
        stdin=input_stdin,
        stdout=subprocess.PIPE,
    )

Now I can kick off one set of processes to encrypt some data and another set of processes to subsequently hash their encrypted output. Note that I have to be careful with how the stdout instance of the upstream process is retained by the Python interpreter process that’s starting this pipeline of child processes:

encrypt_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(100)

    encrypt_proc = run_encrypt(data)
    encrypt_procs.append(encrypt_proc)

    hash_proc = run_hash(encrypt_proc.stdout)
    hash_procs.append(hash_proc)

    # Ensure that the child consumes the input stream and
    # the communicate() method doesn't inadvertently steal
    # input from the child. Also lets SIGPIPE propagate to
    # the upstream process if the downstream process dies.
    encrypt_proc.stdout.close()
    encrypt_proc.stdout = None

The I/O between the child processes happens automatically once they are started. All I need to do is wait for them to finish and print the final output:

for proc in encrypt_procs:
    proc.communicate()
    assert proc.returncode == 0

for proc in hash_procs:
    out, _ = proc.communicate()
    print(out[-10:])
    assert proc.returncode == 0

>>>
b'\xc6\n\x8a"cg\x85\xd2\x81|'
b'\x14\r\xc6J\xb0\xb0\xbf\x0c2X'
b'@\x90$\xcc\xc7\xf4\x08\x19Y\x0b'

If I’m worried about the child processes never finishing or somehow blocking on input or output pipes, I can pass the timeout parameter to the communicate method. This causes an exception to be raised if the child process hasn’t finished within the time period, giving me a chance to terminate the misbehaving subprocess:

proc = subprocess.Popen(["sleep", "10"])
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()

print("Exit status", proc.poll())

>>>
Exit status -15

Things to Remember

  • Images Use the subprocess module to run child processes and manage their input and output streams.

  • Images Child processes run in parallel with the Python interpreter, enabling you to maximize your usage of CPU cores.

  • Images Use the run convenience function for simple usage and the Popen class for advanced usage like UNIX-style pipelines.

  • Images Use the timeout parameter of the communicate method to avoid deadlocks and hanging child processes.

Item 68: Use Threads for Blocking I/O; Avoid for Parallelism

The standard implementation of Python is called CPython. CPython runs a Python program in two steps. First, it parses and compiles the source text into bytecode, which is a low-level representation of the program as 8-bit instructions (see Item 97: “Rely on Precompiled Bytecode and File System Caching to Improve Startup Time” for background). (As of Python 3.6, it’s technically wordcode with 16-bit instructions, but the idea is the same.) Then, CPython runs the bytecode using a stack-based interpreter. The bytecode interpreter has state that must be maintained and coherent while the Python program executes. CPython enforces coherence with a mechanism called the global interpreter lock (GIL).

Essentially, the GIL is a mutual-exclusion lock (mutex) that prevents CPython from being affected by preemptive multithreading, where one thread takes control of a program by interrupting another thread. Such an interruption could corrupt the interpreter state (e.g., garbage collection reference counts) if it comes at an unexpected time. The GIL prevents these interruptions and ensures that every bytecode instruction works correctly with the CPython implementation and its C-extension modules (see Item 96: “Consider Extension Modules to Maximize Performance and Ergonomics” for background).

The GIL has an important negative side effect. With programs written in languages like C++ or Java, having multiple threads of execution means that a program can utilize multiple CPU cores at the same time. Although Python supports multiple threads of execution, the GIL causes only one of them to ever make forward progress at a time. This means that when you reach for threads to do parallel computation and speed up your Python programs, you will be sorely disappointed.

For example, say that I want to do something computationally intensive with Python. Here I use a naive number factorization algorithm as a proxy:

def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i

Factoring a list of 16 numbers in serial takes quite a long time:

import time

numbers = [7775876, 6694411, 5038540, 5426782,
           9934740, 9168996, 5271226, 8288002,
           9403196, 6678888, 6776096, 9582542,
           7107467, 9633726, 5747908, 7613918]
start = time.perf_counter()

for number in numbers:
    list(factorize(number))

end = time.perf_counter()
delta = end - start

print(f"Took {delta:.3f} seconds")

>>>
Took 3.304 seconds

Using multiple threads to do this computation would make sense in other languages because I could take advantage of all the CPU cores of my computer. Let me try that in Python. Here I define a Python thread for doing the same computation as before:

from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))

Then, I start a thread for each number to factorize in parallel:

start = time.perf_counter()

threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

Finally, I wait for all of the threads to finish:

for thread in threads:
    thread.join()

end = time.perf_counter()
delta = end - start
print(f"Took {delta:.3f} seconds")

>>>
Took 3.293 seconds

Surprisingly, this takes almost exactly the same amount of time as running factorize in serial. With one thread per number—again, 16 threads in total for this example—you might expect less than a 16x speedup in other languages due to the overhead of creating threads and coordinating with them. You might also expect only an 8x speedup on the 8-core machine I used to run this code. But you wouldn’t expect the performance of these threads to appear no better when there are multiple CPUs to utilize. This demonstrates the effect of the GIL (e.g., lock contention, scheduling overhead) on programs running in the standard CPython interpreter.

There are ways to get CPython to utilize multiple cores, but they don’t work with the standard Thread class (see Item 79: “Consider concurrent.futures for True Parallelism” and Item 94: “Know When and How to Replace Python with Another Programming Language”), and they can require substantial effort.

Note

Starting in CPython version 3.13, there is an experimental option to compile Python without the GIL, thus enabling programs to avoid its constraints. This can improve parallel performance with multiple threads, but there are significant downsides: Many C-extension modules and common libraries aren’t yet compatible with this behavior; and the straight-line performance of individual threads is reduced because of synchronization overhead. It will be interesting to see how this experiment develops in subsequent releases.

Given these limitations, why does Python support threads at all? There are two good reasons.

First, multiple threads make it easy for a program to seem like it’s doing multiple things at the same time. Managing the juggling act of simultaneous tasks is difficult to implement yourself (see Item 71: “Know How to Recognize When Concurrency Is Necessary” for an example). With threads, you can leave it to Python to run your functions concurrently. This works because CPython ensures a level of fairness between Python threads of execution, even though only one of them makes forward progress at a time due to the GIL.

The second reason Python supports threads is to deal with blocking I/O, which happens when Python does certain types of system calls. A Python program uses system calls to ask the computer’s operating system to interact with the external environment on its behalf. Blocking I/O includes things like reading and writing files, interacting with networks, communicating with devices like displays, and so on. Threads help handle blocking I/O by insulating a program from the delay required for the operating system to respond to requests.

For example, say that I want to send a signal to a radio-controlled helicopter through a serial port. I’ll use a slow system call (select) as a proxy for this activity. This function asks the operating system to block for 0.1 seconds and then return control to my program, which is similar to what would happen when using a synchronous serial port:

import select
import socket

def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)

Running this system call in serial requires a linearly increasing amount of time—5 calls takes about 0.5 seconds:

start = time.perf_counter()

for _ in range(5):
    slow_systemcall()

end = time.perf_counter()
delta = end - start
print(f"Took {delta:.3f} seconds")

>>>
Took 0.525 seconds

The problem is that while the slow_systemcall function is running, my program can’t make any other progress. My program’s main thread of execution is blocked on the select system call. This situation is awful in practice. I need to be able to compute my helicopter’s next move while I’m sending it a signal; otherwise, it’ll crash. When you find yourself needing to do blocking I/O and computation simultaneously like this, it’s time to consider moving your system calls to threads.

Here I run multiple invocations of the slow_systemcall function in separate threads. This allows me to communicate with multiple serial ports (and helicopters) at the same time while leaving the main thread to do whatever computation is required:

start = time.perf_counter()

threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

With the threads started, here I do some work to calculate the next helicopter move before waiting for the system call threads to finish:

def compute_helicopter_location(index):
    ...

for i in range(5):
    compute_helicopter_location(i)

for thread in threads:
    thread.join()

end = time.perf_counter()
delta = end - start
print(f"Took {delta:.3f} seconds")

>>>
Took 0.106 seconds

The parallel time is ~5x less than the serial time from the example code earlier. This shows that all the system calls will run in parallel from multiple Python threads even though they’re limited by the GIL. The GIL prevents my Python code from running in parallel, but it doesn’t have an effect on system calls. Python threads release the GIL just before they make system calls, and they reacquire the GIL as soon as the system calls are done.

There are many other ways to deal with blocking I/O besides using threads, such as using the asyncio built-in module, and these alternatives have important benefits. But those options might require extra work in refactoring your code to fit a different model of execution (see Item 75: “Achieve Highly Concurrent I/O with Coroutines” and Item 77: “Mix Threads and Coroutines to Ease the Transition to asyncio”). Using threads is the simplest way to do blocking I/O in parallel with minimal changes to your program.

Things to Remember

  • Images Python threads can’t run in parallel on multiple CPU cores because of the global interpreter lock (GIL).

  • Images Python threads are still useful despite the GIL because they provide an easy way to do multiple things seemingly at the same time.

  • Images You can use Python threads to make multiple system calls in parallel, allowing you to do blocking I/O at the same time as computation.

Item 69: Use Lock to Prevent Data Races in Threads

After learning about the global interpreter lock (GIL) (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”), many new Python programmers assume that they can forgo using mutual-exclusion locks (also called mutexes) in their code altogether. If the GIL is already preventing Python threads from running on multiple CPU cores in parallel, it must also act as a lock for a program’s data structures, right? Some testing on types like lists and dictionaries may even show that this assumption appears to hold.

But beware: This is not truly the case. The GIL will not protect you. Although only one Python thread runs at a time, a thread’s operations on data structures can be interrupted between any two bytecode instructions in the Python interpreter. This is dangerous if you access the same objects from multiple threads simultaneously. The invariants of your data structures could be violated at practically any time because of these interruptions, potentially putting your program in a corrupted state.

For example, say that I want to write a program that counts many things in parallel, like sampling light levels from a network of sensors. Imagine that each sensor has its own worker thread because reading from the sensor requires blocking I/O. After each sensor measurement, the worker thread increments a shared counter variable with the number of photons received:

counter = 0

def read_sensor(sensor_index):
    # Returns sensor data or raises an exception
    ...

def get_offset(data):
    # Always returns 1 or greater
    ...

def worker(sensor_index, how_many):
    global counter
    for _ in range(how_many):
        data = read_sensor(sensor_index)
        counter += get_offset(data)

Here I run one worker thread for each sensor in parallel and wait for them all to finish their readings:

from threading import Thread

how_many = 10**6
sensor_count = 4

threads = []
for i in range(sensor_count):
    thread = Thread(target=worker, args=(i, how_many))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

expected = how_many * sensor_count
print(f"Counter should be {expected}, got {counter}")

>>>
Counter should be 4000000, got 1980032

Given how get_offset always returns 1 or more, it appears that the result is way off! What happened here? How could something so simple go so wrong, especially since only one Python interpreter thread can run at a time due to the GIL?

The answer is preemption. The Python interpreter enforces fairness between all of the threads that are executing to ensure they get a roughly equal amount of processing time. To do this, Python suspends a thread as it’s running and resumes another thread in turn. The problem is that you don’t know exactly when Python will suspend your threads. A thread can even be paused seemingly halfway through what looks like an atomic operation.

That’s what happened in this case, on this line in the worker function from above:

counter += get_offset(data)

The += operator used on the counter variable actually instructs Python to do three separate operations behind the scenes. The statement above is equivalent to this:

value = counter
delta = get_offset(data)
result = value + delta
counter = result

Python threads incrementing the counter might be suspended between any two of these operations. This is problematic if the way the operations interleave causes old versions of value to be assigned to the counter. Here’s an example of bad interaction between two threads, A and B:

# Running in Thread A
value_a = counter
delta_a = get_offset(data_a)
# Context switch to Thread B
value_b = counter
delta_b = get_offset(data_b)
result_b = value_b + delta_b
counter = result_b
# Context switch back to Thread A
result_a = value_a + delta_a
counter = result_a

Thread B interrupted thread A before it had completely finished. Thread B ran and finished, but then thread A resumed mid-execution, overwriting all of thread B’s progress in incrementing the counter. This is exactly what happened in the light sensor example above.

To prevent data races like these, and other forms of data structure corruption, Python includes a robust set of tools in the threading built-in module. The simplest and most useful of them is the Lock class, a mutual-exclusion lock (mutex).

By using a lock, I can have the Counter class protect its current value against simultaneous accesses from multiple threads. Only one thread will be able to acquire the lock at a time. Here I use a with statement to acquire and release the lock; the extra level of indentation makes it easier to see which code is executing while the lock is held (see Item 82: “Consider contextlib and with Statements for Reusable try/finally Behavior” for background):

from threading import Lock

counter = 0
counter_lock = Lock()

def locking_worker(sensor_index, how_many):
    global counter
    for _ in range(how_many):
        data = read_sensor(sensor_index)
        with counter_lock:                  # Added
            counter += get_offset(data)

Now I run the sensor threads as before but use a locking_worker instead:

for i in range(sensor_count):
    thread = Thread(target=locking_worker, args=(i, how_many))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

expected = how_many * sensor_count
print(f"Counter should be {expected}, got {counter}")

>>>
Counter should be 4000000, got 4000000

The result is exactly what I expect. The Lock solved the problem.

Things to Remember

  • Images Even though Python has a global interpreter lock, you’re still responsible for protecting against data races between the threads in your programs.

  • Images Your programs will corrupt their data structures if you allow multiple threads to modify the same objects without mutual exclusion locks (mutexes).

  • Images Use the Lock class from the threading built-in module to enforce your program’s invariants between multiple threads.

Item 70: Use Queue to Coordinate Work Between Threads

Python programs that do many things concurrently often need to coordinate their work. One of the most useful arrangements for concurrent work is a pipeline of functions.

A pipeline works like an assembly line used in manufacturing. Pipelines have many phases in serial, with a specific function for each phase. New pieces of work are constantly added to the beginning of the pipeline. The functions can operate concurrently, each processing the piece of work in its phase. The work moves forward as each function completes until there are no phases remaining. This approach is especially good for work that includes blocking I/O or subprocesses—activities that can easily be parallelized using Python (see Item 67: “Use subprocess to Manage Child Processes” and Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”).

For example, say that I want to build a system that will take a constant stream of images from my digital camera, resize them, and then add them to a photo gallery online. Such a program could be split into three phases of a pipeline. New images are retrieved in the first phase. The downloaded images are passed through the resize function in the second phase. The resized images are consumed by the upload function in the final phase.

Imagine that I’ve already written Python functions that execute the phases: download, resize, and upload. How do I assemble a pipeline to do the work concurrently?

def download(item):
    ...

def resize(item):
    ...

def upload(item):
    ...

The first thing I need is a way to hand off work between the pipeline phases. This can be modeled as a thread-safe producer–consumer queue (see Item 69: “Use Lock to Prevent Data Races in Threads” to understand the importance of thread safety in Python; see Item 103: “Prefer deque for Producer–Consumer Queues” to understand queue performance):

from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

The producer, my digital camera, adds new images to the end of the deque of pending items:

    def put(self, item):
        with self.lock:
            self.items.append(item)

The consumer, the first phase of the processing pipeline, removes images from the front of the deque of pending items:

    def get(self):
        with self.lock:
            return self.items.popleft()

Here I represent each phase of the pipeline as a Python thread that takes work from one queue like this, runs a function on it, and puts the result on another queue. I also track how many times the worker has checked for new input and how much work it’s completed:

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

The trickiest part is that the worker thread must properly handle the case where the input queue is empty because the previous phase hasn’t completed its work yet. This happens where I catch the IndexError exception below. You can think of this as a holdup in the assembly line:

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)  # No work to do
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

Now I can connect the three phases together by creating the queues for their coordination points and the corresponding worker threads:

download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

I can start the threads and then inject a bunch of work into the first phase of the pipeline. Here I use a plain object instance as a proxy for the real data required by the download function:

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

Now I wait for all of the items to be processed by the pipeline and end up in the done_queue:

while len(done_queue.items) < 1000:
    # Do something useful while waiting
    ...

This runs properly, but there’s an interesting side effect caused by the threads polling their input queues for new work. The tricky part, where I catch IndexError exceptions in the run method, executes a large number of times:

processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f"Processed {processed} items after "
      f"polling {polled} times")

>>>
Processed 1000 items after polling 3033 times

When the worker functions vary in their respective speeds, an earlier phase can prevent progress in later phases, causing the pipeline to back up. This causes later phases to starve and constantly check their input queues for new work in a tight loop. The outcome is that worker threads waste CPU time doing nothing useful; they’re constantly raising and catching IndexError exceptions.

But that’s just the beginning of what’s wrong with this implementation. There are three more problems that you should also avoid. First, determining that all of the input work is complete requires yet another busy wait on the done_queue. Second, in Worker, the run method will execute forever in its busy loop. There’s no obvious way to signal to a worker thread that it’s time to exit.

Third, and worst of all, a backup in the pipeline can cause the program to crash arbitrarily. If the first phase makes rapid progress but the second phase makes slow progress, then the queue connecting the first phase to the second phase will constantly increase in size. The second phase won’t be able to keep up. Given enough time and input data, the program will eventually run out of memory and terminate.

The lesson here isn’t that pipelines are bad; it’s that it’s hard to build a good producer–consumer queue yourself. So why even try?

Queue to the Rescue

The Queue class from the queue built-in module provides all the functionality you need to solve the problems outlined above.

Queue eliminates busy waiting for new items by making the get method block the calling thread until data is available. For example, here I start a thread that waits for some input data on a queue:

from queue import Queue

my_queue = Queue()

def consumer():
    print("Consumer waiting")
    my_queue.get()  # Runs after put() below
    print("Consumer done")

thread = Thread(target=consumer)
thread.start()

Even though the consumer thread is running first, it won’t finish until the put method adds an item to the Queue instance and the get method has something to return:

print("Producer putting")
my_queue.put(object())  # Runs before get() above
print("Producer done")
thread.join()

>>>
Consumer waiting
Producer putting
Producer done
Consumer done

To solve the pipeline backup issue and avoid out-of-memory errors, the Queue class lets you specify the maximum amount of pending work to allow between two phases. This buffer size causes calls to put to block when the queue is already full. (Sometimes this behavior is called back pressure.) For example, here I define a thread that waits for a while before consuming a queue:

my_queue = Queue(1)  # Buffer size of 1

def consumer():
    time.sleep(0.1)  # Wait
    my_queue.get()   # Runs second
    print("Consumer got 1")
    my_queue.get()   # Runs fourth
    print("Consumer got 2")
    print("Consumer done")

thread = Thread(target=consumer)
thread.start()

The wait should allow the producer thread to call the queue’s put method for both objects before the consumer thread ever calls get. But the queue’s size is 1. This means that the producer adding items to the queue will have to wait for the consumer thread to call get at least once before the second call to put will stop blocking and actually add the second item to the queue:

my_queue.put(object())  # Runs first
print("Producer put 1")
my_queue.put(object())  # Runs third
print("Producer put 2")
print("Producer done")
thread.join()

>>>
Producer put 1
Consumer got 1
Producer put 2
Producer done
Consumer got 2
Consumer done

The Queue class can also track the progress of work using the task_done method. This lets you wait for a phase’s input queue to drain (using the join method) and eliminates the need to poll for the last phase of a pipeline (as with done_queue in the section above). For example, here I define a consumer thread that calls task_done when it finishes working on an item:

in_queue = Queue()

def consumer():
    print("Consumer waiting")
    work = in_queue.get()      # Runs second
    print("Consumer working")
    # Doing work
    ...
    print("Consumer done")
    in_queue.task_done()       # Runs third

thread = Thread(target=consumer)
thread.start()

Now the producer code doesn’t have to call join on the consumer thread or poll. The producer can just wait for in_queue to finish by calling join on the Queue instance. Even once it’s empty, in_queue won’t be joinable until after task_done is called for every item that was ever enqueued:

print("Producer putting")
in_queue.put(object())     # Runs first
print("Producer waiting")
in_queue.join()            # Runs fourth
print("Producer done")
thread.join()

>>>
Consumer waiting
Producer putting
Producer waiting
Consumer working
Consumer done
Producer done

The Queue class also allows for easy termination of worker threads by calling the shutdown method (a feature added in Python version 3.13). After the shutdown signal is received, any call to put on the queue will raise an exception, but the queue will permit get calls to drain the queue and complete pending work. Once the queue is fully empty, a ShutDown exception will be raised by get in the worker thread, giving it a chance to clean up and exit (see Item 80: “Take Advantage of Each Block in try/except/else/finally” for background). For example, here I show how a thread continues to process work after shutdown is called:

from queue import ShutDown

my_queue2 = Queue()

def consumer():
    while True:
        try:
            item = my_queue2.get()
        except ShutDown:
            print("Terminating!")
            return
        else:
            print("Got item", item)
            my_queue2.task_done()

thread = Thread(target=consumer)
my_queue2.put(1)
my_queue2.put(2)
my_queue2.put(3)
my_queue2.shutdown()

thread.start()

my_queue2.join()
thread.join()
print("Done")

>>>
Got item 1
Got item 2
Got item 3
Terminating!
Done

I can bring all of these behaviors together into a new worker thread class that processes input items one at a time, puts the results on an output queue, marks the input items as done, and terminates when the ShutDown exception is raised:

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            try:
                item = self.in_queue.get()
            except ShutDown:
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.in_queue.task_done()

Now I can create a set of pipeline threads and queues using the new worker class; the resize and upload phases have a maximum number of items specified to prevent the program from running out of memory:

download_queue = Queue()
resize_queue = Queue(100)
upload_queue = Queue(100)
done_queue = Queue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()

To start processing, I inject all of the input work into the beginning of the pipeline:

for _ in range(1000):
    download_queue.put(object())

Then I wait for the work in each phase to finish. I’m careful to call shutdown for each queue in the pipeline only after all work for that phase has been added to the corresponding queue. I use the join method to ensure that I wait for all of the work in the queue to be completed before sending the termination signal for the next phase:

download_queue.shutdown()
download_queue.join()

resize_queue.shutdown()
resize_queue.join()

upload_queue.shutdown()
upload_queue.join()

Once the prior phases are complete, I send the shutdown signal to the final queue, receive each of the output items in the main thread, and wait for the worker threads to terminate:

done_queue.shutdown()

counter = 0

while True:
    try:
        item = done_queue.get()
    except ShutDown:
        break
    else:
        # Process the item
        ...
        done_queue.task_done()
        counter += 1

done_queue.join()

for thread in threads:
    thread.join()

print(counter, "items finished")

>>>
1000 items finished

This approach can be extended to use multiple worker threads per phase, which can increase I/O parallelism and speed up this type of program significantly. To do this, first I define helper functions for starting replicas of worker threads and draining the final queue:

def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def drain_queue(input_queue):
    input_queue.shutdown()

    counter = 0

    while True:
        try:
            item = input_queue.get()
        except ShutDown:
            break
        else:
            input_queue.task_done()
            counter += 1

    input_queue.join()

    return counter

Then I connect the queues together as before and start the workers:

download_queue = Queue()
resize_queue = Queue(100)
upload_queue = Queue(100)
done_queue = Queue()

threads = (
    start_threads(3, download, download_queue, resize_queue)
    + start_threads(4, resize, resize_queue, upload_queue)
    + start_threads(5, upload, upload_queue, done_queue)
)

Following the same order of calls to put, shutdown, get, and join as in the example above, I can drive the work through the pipeline—but this time using multiple workers for each intermediate phase:

for _ in range(2000):
    download_queue.put(object())

download_queue.shutdown()
download_queue.join()

resize_queue.shutdown()
resize_queue.join()

upload_queue.shutdown()
upload_queue.join()

counter = drain_queue(done_queue)

for thread in threads:
    thread.join()

print(counter, "items finished")

>>>
2000 items finished

Although Queue works well with a linear pipeline, there are other tools that you should consider using in different situations (see Item 75: “Achieve Highly Concurrent I/O with Coroutines” for an example).

Things to Remember

  • Images Pipelines allow you to organize sequences of work—especially I/O-bound programs—that run concurrently using multiple Python threads.

  • Images Be aware of the many problems in building concurrent pipelines: busy waiting, telling workers to stop, knowing when work is done, and memory explosion.

  • Images The Queue class has all the facilities you need to build robust pipelines: blocking operations, buffer sizes, joining, and shutdown.

Item 71: Know How to Recognize When Concurrency Is Necessary

Inevitably, as the scope of a program grows, so does its complexity. Dealing with expanding requirements in a way that maintains clarity, testability, and efficiency is one of the most difficult parts of programming. Perhaps the hardest type of change to handle is moving from a single-threaded program to a program that needs multiple concurrent lines of execution.

Let me demonstrate how you might encounter this problem with an example. Say that I want to implement Conway’s Game of Life, a classic illustration of finite state automata. The rules of the game are simple: There’s a two-dimensional grid of an arbitrary size, and each cell in the grid can either be alive or empty:

ALIVE = "*"
EMPTY = "-"

The game progresses one tick of the clock at a time. Every tick, each cell counts how many of its neighboring eight cells are still alive. Based on its neighbor count, a cell decides if it will keep living, die, or regenerate. (I’ll explain the specific rules further below.) Here’s an example of a 5 × 5 Game of Life grid after four generations with time going to the right:

  0   |   1   |   2   |   3   |   4
----- | ----- | ----- | ----- | -----
-*--- | --*-- | --**- | --*-- | -----
--**- | --**- | -*--- | -*--- | -**--
---*- | --**- | --**- | --*-- | -----
----- | ----- | ----- | ----- | -----

I can represent the state of each cell with a simple container class. The class must have methods that allow me to get and set the value of any coordinate. Coordinates that are out of bounds should wrap around, making the grid act like an infinite looping space:

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        ...

To see this class in action, I can create a Grid instance and set its initial state to a classic shape called a glider:

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)

>>>
---*-----
----*----
--***----
---------
---------

Now I need a way to retrieve the status of neighboring cells. I can do this with a helper function that queries the grid for information about its surrounding environment and returns the count of living neighbors. I use a simple function for the get_cell parameter instead of passing in a whole Grid instance in order to reduce coupling (see Item 48: “Accept Functions Instead of Classes for Simple Interfaces” for more about this approach):

def count_neighbors(y, x, get_cell):
    n_ = get_cell(y - 1, x + 0)  # North
    ne = get_cell(y - 1, x + 1)  # Northeast
    e_ = get_cell(y + 0, x + 1)  # East
    se = get_cell(y + 1, x + 1)  # Southeast
    s_ = get_cell(y + 1, x + 0)  # South
    sw = get_cell(y + 1, x - 1)  # Southwest
    w_ = get_cell(y + 0, x - 1)  # West
    nw = get_cell(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

Now I define the simple logic for Conway’s Game of Life based on the game’s three rules: Die if a cell has fewer than two neighbors, die if a cell has more than three neighbors, or become alive if an empty cell has exactly three neighbors:

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

I can connect count_neighbors and game_logic together in another function that transitions the state of a cell. This function will be called each generation to figure out a cell’s current state, inspect the neighboring cells around it, determine what its next state should be, and update the resulting grid accordingly. Again, I use a function interface for set_cell instead of passing in the Grid instance to make this code more decoupled:

def step_cell(y, x, get_cell, set_cell):
    state = get_cell(y, x)
    neighbors = count_neighbors(y, x, get_cell)
    next_state = game_logic(state, neighbors)
    set_cell(y, x, next_state)

Finally, I can define a function that progresses the whole grid of cells forward by a single step and then returns a new grid containing the state for the next generation. The important detail here is that I need all dependent functions to call the get_cell method on the previous generation’s Grid instance and to call the set method on the next generation’s Grid instance. This is how I ensure that all of the cells move in lockstep, which is an essential part of how the game works. This is easy to achieve because I used function interfaces for get_cell and set_cell instead of passing Grid instances:

def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            step_cell(y, x, grid.get, next_grid.set)
    return next_grid

Now I can progress the grid forward one generation at a time. You can see how the glider moves down and to the right on the grid based on the simple rules from the game_logic function:

class ColumnPrinter:
    ...

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate(grid)

print(columns)

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

This works great for a program that can run in one thread on a single machine. But imagine that the program’s requirements have changed—as I alluded to above—and now I need to do some I/O (e.g., with a network socket) from within the game_logic function. For example, this might be required if I’m trying to build a massively multiplayer online game where the state transitions are determined by a combination of the grid state and communication with other players over the Internet.

How can I extend this implementation to support such functionality? The simplest thing to do is to add blocking I/O directly into the game_logic function:

def game_logic(state, neighbors):
    ...
    # Do some blocking input/output in here:
    data = my_socket.recv(100)
    ...

The problem with this approach is that it’s going to slow down the whole program. If the latency of the I/O required is 100 milliseconds (which is a reasonably good cross-continent, round-trip latency on the Internet), and there are 45 cells in the grid, then each generation will take a minimum of 4.5 seconds to evaluate because each cell is processed serially in the simulate function. That’s far too slow and will make the game unplayable. It also scales poorly: If I later wanted to expand the grid to 10,000 cells, I would need over 15 minutes to evaluate each generation.

The solution is to do the I/O in parallel so each generation takes roughly 100 milliseconds, regardless of how big the grid is. The process of spawning a concurrent line of execution for each unit of work—a cell in this case—is called fan-out. Waiting for all of those concurrent units of work to finish before moving on to the next phase in a coordinated process—a generation in this case—is called fan-in.

Python provides many built-in tools for achieving fan-out and fan-in with various trade-offs. You should understand the pros and cons of each approach and choose the best tool for the job, depending on the situation. See the following items for details: Item 72: “Avoid Creating New Thread Instances for On-Demand Fan-out,” Item 73: “Understand How Using Queue for Concurrency Requires Refactoring,” Item 74: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency,” and Item 75: “Achieve Highly Concurrent I/O with Coroutines”).

Things to Remember

  • Images As a program’s scope and complexity increase, it often starts requiring support for multiple concurrent lines of execution.

  • Images The most common types of concurrency coordination are fan-out (generating new units of concurrency) and fan-in (waiting for existing units of concurrency to complete).

  • Images Python has many different ways of achieving fan-out and fan-in.

Item 72: Avoid Creating New Thread Instances for On-Demand Fan-out

Threads are the natural first tool to reach for in order to do parallel I/O in Python (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”). However, they have significant downsides when you try to use them for fanning out to many concurrent lines of execution.

To demonstrate this, I’ll continue with the Game of Life example from before (see Item 71: “Know How to Recognize When Concurrency Is Necessary” for background and the implementations of various functions and classes below). I’ll use threads to solve the latency problem caused by doing I/O in the game_logic function. To begin, threads will require coordination using locks to ensure that assumptions within data structures are maintained properly (see Item 69: “Use Lock to Prevent Data Races in Threads” for details). I can create a subclass of the Grid class that adds locking behavior so an instance can be used by multiple threads simultaneously:

from threading import Lock

ALIVE = "*"
EMPTY = "-"

class Grid:
    ...

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)

Then, I can reimplement the simulate function to fan out by creating a thread for each call to step_cell. The threads will run in parallel and won’t have to wait on each other’s I/O. I can then fan in by waiting for all of the threads to complete before moving on to the next generation:

from threading import Thread

def count_neighbors(y, x, get_cell):
    ...

def game_logic(state, neighbors):
    ...
    # Do some blocking input/output in here:
    data = my_socket.recv(100)
    ...

def step_cell(y, x, get_cell, set_cell):
    state = get_cell(y, x)
    neighbors = count_neighbors(y, x, get_cell)
    next_state = game_logic(state, neighbors)
    set_cell(y, x, next_state)

def simulate_threaded(grid):
    next_grid = LockingGrid(grid.height, grid.width)

    threads = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()  # Fan-out
            threads.append(thread)

    for thread in threads:
        thread.join()  # Fan-in

    return next_grid

I can run this code using the same implementation of step_cell and the same driving code as before with only two lines changed to use the LockingGrid and simulate_threaded implementations:

class ColumnPrinter:
    ...

grid = LockingGrid(5, 9)  # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate_threaded(grid)  # Changed

print(columns)

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

This works as expected, and the I/O is now parallelized between the threads. However, this code has three big problems:

  • Images The Thread instances require special tools (i.e., Lock objects) to coordinate with each other safely. This makes the code that uses threads harder to reason about in comparison to the procedural, single-threaded code from before. This complexity makes threaded code more difficult to extend and maintain over time.

  • Images Threads require a lot of memory—about 8 MB per executing thread. On many computers, that amount of memory doesn’t matter for the 45 threads I’d need in this example. But if the game grid had to grow to 10,000 cells, I would need to create that many threads, which is so much memory (80 GB) that the program won’t fit on my machine. Although some operating systems play tricks to delay a thread’s full memory allocation until its execution stack is sufficiently deep, there’s still a risk that running one thread per concurrent activity won’t work reliably.

  • Images Starting a thread is costly, and threads have a negative performance impact when they run due to the overhead of context switching between them. In the code above, all of the threads are started and stopped each generation of the game, which has so much overhead it will increase latency beyond the expected I/O time of 100 milliseconds.

This code would also be very difficult to debug if something went wrong. For example, imagine that the game_logic function raises an exception, which is highly likely due to the generally flaky nature of I/O:

def game_logic(state, neighbors):
    ...
    raise OSError("Problem with I/O")
    ...

I can test what this would do by running a Thread instance pointed at this function and redirecting the sys.stderr output from the program to an in-memory StringIO buffer:

import contextlib
import io

fake_stderr = io.StringIO()
with contextlib.redirect_stderr(fake_stderr):
    thread = Thread(target=game_logic, args=(ALIVE, 3))
    thread.start()
    thread.join()

print(fake_stderr.getvalue())

>>>
Exception in thread Thread-226 (game_logic):
Traceback (most recent call last):
  File "threading.py", line 1039, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "threading.py", line 990, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "example.py", line 205, in game_logic
    raise OSError('Problem with I/O')
OSError: Problem with I/O

An OSError exception is expected, but somehow the code that created the thread and called join on it is unaffected. How can this be? The reason is that the Thread class will independently catch any exceptions that are raised by the target function and then write their traceback to sys.stderr. Such exceptions are never re-raised to the caller that started the thread in the first place.

Given all these issues, it’s clear that threads are not the solution if you need to constantly create and finish new concurrent functions. Python provides other solutions that are a better fit (see Item 73: “Understand How Using Queue for Concurrency Requires Refactoring,” Item 74: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency,” and Item 75: “Achieve Highly Concurrent I/O with Coroutines”).

Things to Remember

  • Images Threads have many downsides: They’re costly to start and run if you need a lot of them, they each require a significant amount of memory, and they require special tools like Lock instances for coordination.

  • Images A Thread doesn’t have a built-in way to raise exceptions back in the code that started it or in a separate thread that is waiting for it to finish, which greatly hampers debugging.

Item 73: Understand How Using Queue for Concurrency Requires Refactoring

In the previous item (see Item 72: “Avoid Creating New Thread Instances for On-Demand Fan-out”) I covered the downsides of using Thread to solve the parallel I/O problem in the Game of Life example from earlier (see Item 71: “Know How to Recognize When Concurrency Is Necessary” for background and the implementations of various functions and classes below).

The next approach to try is implementing a threaded pipeline using the Queue class from the queue built-in module (see Item 70: “Use Queue to Coordinate Work Between Threads” for background; I rely on the implementation of StoppableWorker from that item in the example code below).

Here’s the general approach: Instead of creating one thread per cell per generation of the Game of Life, I can create a fixed number of worker threads upfront and have them do parallelized I/O as needed. This will keep my resource usage under control and eliminate the overhead of frequently starting new threads.

To do this, I need two Queue instances to use for communicating to and from the worker threads that execute the game_logic function:

from queue import Queue

in_queue = Queue()
out_queue = Queue()

I can start multiple threads that will consume items from in_queue, process them by calling game_logic, and put the results on out_queue. These threads will run concurrently, allowing for parallel I/O and reduced latency for each generation:

from threading import Thread

class StoppableWorker(Thread):
    ...

def game_logic(state, neighbors):
    ...
    # Do some blocking input/output in here:
    data = my_socket.recv(100)
    ...

def game_logic_thread(item):
    y, x, state, neighbors = item
    try:
        next_state = game_logic(state, neighbors)
    except Exception as e:
        next_state = e
    return (y, x, next_state)

# Start the threads upfront
threads = []
for _ in range(5):
    thread = StoppableWorker(
        game_logic_thread, in_queue, out_queue)
    thread.start()
    threads.append(thread)

Now I can redefine the simulate function to interact with these queues to request state transition decisions and receive corresponding responses. Adding items to in_queue is what causes fan-out, and consuming items from the out_queue until it’s empty is what causes fan-in:

ALIVE = "*"
EMPTY = "-"

class SimulationError(Exception):
    pass

class Grid:
    ...

def count_neighbors(y, x, get_cell):
    ...

def simulate_pipeline(grid, in_queue, out_queue):
    for y in range(grid.height):
        for x in range(grid.width):
            state = grid.get(y, x)
            neighbors = count_neighbors(y, x, grid.get)
            in_queue.put((y, x, state, neighbors))  # Fan-out

    in_queue.join()
    item_count = out_queue.qsize()

    next_grid = Grid(grid.height, grid.width)
    for _ in range(item_count):
        item = out_queue.get()                      # Fan-in
        y, x, next_state = item
        if isinstance(next_state, Exception):
            raise SimulationError(y, x) from next_state
        next_grid.set(y, x, next_state)

    return next_grid

The calls to Grid.get and Grid.set both happen within this new simulate_pipeline function, which means I can use the single-threaded implementation of Grid instead of the implementation that requires Lock instances for synchronization (see Item 69: “Use Lock to Prevent Data Races in Threads” for background).

This code is also easier to debug than the Thread approach used in the previous item. If an exception occurs while doing I/O in the game_logic function, it will be caught by the surrounding game_logic_thread function, propagated to the out_queue, and then re-raised in the main thread:

def game_logic(state, neighbors):
    ...
    raise OSError("Problem with I/O in game_logic")
    ...

simulate_pipeline(Grid(1, 1), in_queue, out_queue)

>>>
Traceback ...
OSError: Problem with I/O in game_logic

The above exception was the direct cause of the following
exception:

Traceback ...
SimulationError: (0, 0)

I can drive this multithreaded pipeline for repeated generations by calling simulate_pipeline in a loop:

class ColumnPrinter:
    ...

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate_pipeline(grid, in_queue, out_queue)

print(columns)

in_queue.shutdown()
in_queue.join()

for thread in threads:
    thread.join()

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

The results are the same as before. Although I’ve addressed the memory explosion problem, startup costs, and debugging issues related to using threads on their own, many issues remain:

  • The simulate_pipeline function is even harder to follow than the simulate_threaded approach from the previous item.

  • Extra support functionality is required (e.g., StoppableWorker) to make the code easier to read, at the expense of increased complexity.

  • I have to specify the amount of potential parallelism—the number of threads running game_logic_thread—upfront based on my expectations of the workload instead of having the system automatically scale up parallelism as needed.

  • In order to enable debugging, I have to manually catch exceptions in worker threads, propagate them on a queue, and then re-raise them in the main thread.

However, the biggest problem with this code is apparent if the requirements change again. Imagine that later I needed to do I/O within the count_neighbors function in addition to the I/O that was needed within game_logic:

def count_neighbors(y, x, get_cell):
    ...
    # Do some blocking input/output in here:
    data = my_socket.recv(100)
    ...

In order to make this parallelizable, I need to add another stage to the pipeline that runs count_neighbors in a thread. I need to make sure that exceptions propagate correctly between the worker threads and the main thread. And I need to use a lock for the Grid class in order to ensure safe synchronization between the worker threads (see Item 72: “Avoid Creating New Thread Instances for On-Demand Fan-out” for background and for the implementation of LockingGrid):

def count_neighbors_thread(item):
    y, x, state, get_cell = item
    try:
        neighbors = count_neighbors(y, x, get_cell)
    except Exception as e:
        neighbors = e
    return (y, x, state, neighbors)

def game_logic_thread(item):
    y, x, state, neighbors = item
    if isinstance(neighbors, Exception):
        next_state = neighbors
    else:
        try:
            next_state = game_logic(state, neighbors)
        except Exception as e:
            next_state = e
    return (y, x, next_state)

class LockingGrid(Grid):
    ...

I have to create another set of Queue instances for the count_neighbors_thread workers and the corresponding Thread instances:

in_queue = Queue()
logic_queue = Queue()
out_queue = Queue()

threads = []

for _ in range(5):
    thread = StoppableWorker(
        count_neighbors_thread, in_queue, logic_queue
    )
    thread.start()
    threads.append(thread)

for _ in range(5):
    thread = StoppableWorker(
        game_logic_thread, logic_queue, out_queue
    )
    thread.start()
    threads.append(thread)

Finally, I need to update simulate_pipeline to coordinate the multiple phases in the pipeline and ensure that work fans out and back in correctly:

def simulate_phased_pipeline(grid, in_queue, logic_queue,
out_queue):
    for y in range(grid.height):
        for x in range(grid.width):
            state = grid.get(y, x)
            item = (y, x, state, grid.get)
            in_queue.put(item)            # Fan-out

    in_queue.join()
    logic_queue.join()                    # Pipeline sequencing
    item_count = out_queue.qsize()

    next_grid = LockingGrid(grid.height, grid.width)
    for _ in range(item_count):
        y, x, next_state = out_queue.get()  # Fan-in
        if isinstance(next_state, Exception):
            raise SimulationError(y, x) from next_state
        next_grid.set(y, x, next_state)

    return next_grid

With these updated implementations, now I can run the multiphase pipeline end-to-end:

grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate_phased_pipeline(
        grid, in_queue, logic_queue, out_queue
    )

print(columns)

in_queue.shutdown()
in_queue.join()

logic_queue.shutdown()
logic_queue.join()

for thread in threads:
    thread.join()

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

Again, this works as expected, but it required a lot of changes and boilerplate. The point here is that Queue does make it possible to solve fan-out and fan-in problems, but the complexity is very high. Although using Queue is a better approach than using Thread instances on their own, it’s still not nearly as good as using some of the other tools provided by Python (see Item 74: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency” and Item 75: “Achieve Highly Concurrent I/O with Coroutines”).

Things to Remember

  • Images Using Queue instances with a fixed number of worker threads improves the scalability of fan-out and fan-in using threads.

  • Images It takes a significant amount of work to refactor existing code to use Queue, especially when multiple stages of a pipeline are required.

  • Images Using Queue with a fixed number of worker threads fundamentally limits the total amount of I/O parallelism a program can leverage compared to alternative approaches provided by other built-in Python features and modules that are more dynamic.

Item 74: Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency

Python includes the concurrent.futures built-in module, which provides the ThreadPoolExecutor class. It combines the best of the Thread (see Item 72: “Avoid Creating New Thread Instances for On-Demand Fan-out”) and Queue (see Item 73: “Understand How Using Queue for Concurrency Requires Refactoring”) approaches to solving the parallel I/O problem from the Game of Life example (see Item 71: “Know How to Recognize When Concurrency Is Necessary” for background and the implementations of various functions and classes below):

ALIVE = "*"
EMPTY = "-"

class Grid:
    ...

class LockingGrid(Grid):
    ...

def count_neighbors(y, x, get_cell):
    ...

def game_logic(state, neighbors):
    ...
    # Do some blocking input/output in here:
    data = my_socket.recv(100)
    ...

def step_cell(y, x, get_cell, set_cell):
    state = get_cell(y, x)
    neighbors = count_neighbors(y, x, get_cell)
    next_state = game_logic(state, neighbors)
    set_cell(y, x, next_state)

Instead of starting a new Thread instance for each Grid square, I can fan out by submitting a function to an executor that will be run in a separate thread. Later, I can wait for the result of all tasks in order to fan in:

from concurrent.futures import ThreadPoolExecutor

def simulate_pool(pool, grid):
    next_grid = LockingGrid(grid.height, grid.width)

    futures = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            future = pool.submit(step_cell, *args)  # Fan-out
            futures.append(future)

    for future in futures:
        future.result()                             # Fan-in

    return next_grid

The threads used for the executor can be allocated in advance, which means I don’t have to pay the startup cost on each execution of simulate_pool. I can also specify the maximum number of threads to use for the pool—using the max_workers parameter—to prevent the memory blow-up issues associated with the naive Thread solution to the parallel I/O problem (i.e., one thread per cell):

class ColumnPrinter:
    ...

grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:
    for i in range(5):
        columns.append(str(grid))
        grid = simulate_pool(pool, grid)

print(columns)

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

The best part about the ThreadPoolExecutor class is that it automatically propagates exceptions back to the caller when the result method is called on the Future instance returned by the submit method:

def game_logic(state, neighbors):
    ...
    raise OSError("Problem with I/O")
    ...

with ThreadPoolExecutor(max_workers=10) as pool:
    task = pool.submit(game_logic, ALIVE, 3)
    task.result()

>>>
Traceback ...
OSError: Problem with I/O

If I need to provide I/O parallelism for the count_neighbors function in addition to game_logic, no modifications to the program are required because ThreadPoolExecutor already runs these functions concurrently as part of step_cell. It’s even possible to achieve CPU parallelism by using the same interface if necessary (see Item 79: “Consider concurrent.futures for True Parallelism”).

However, the big problem that remains is the limited amount of I/O parallelism that ThreadPoolExecutor provides. Even if I use a max_workers parameter of 100, this solution still won’t scale if I need 10,000+ cells in the grid that require simultaneous I/O. ThreadPoolExecutor is a good choice for situations where there is no asynchronous solution (e.g., blocking file system operations), but there are better ways to maximize I/O parallelism in many cases (see Item 75: “Achieve Highly Concurrent I/O with Coroutines”).

Things to Remember

  • Images ThreadPoolExecutor enables simple I/O parallelism with limited refactoring required.

  • Images You can use ThreadPoolExecutor to avoid the cost of thread startup each time fan-out concurrency is required.

  • Images ThreadPoolExecutor makes threaded code easier to debug by automatically propagating exceptions across thread boundaries.

  • Images Although ThreadPoolExecutor eliminates the potential memory blow-up issues of using threads directly, it also limits I/O parallelism by requiring max_workers to be specified upfront.

Item 75: Achieve Highly Concurrent I/O with Coroutines

The previous items have tried to solve the parallel I/O problem for the Game of Life example with varying degrees of success (see Item 71: “Know How to Recognize When Concurrency Is Necessary” for background and the implementations of various functions and classes below). All of the other approaches fall short in their ability to handle thousands of simultaneously concurrent functions (see Item 72: “Avoid Creating New Thread Instances for On-Demand Fan-out,” Item 73: “Understand How Using Queue for Concurrency Requires Refactoring,” and Item 74: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency”).

Python addresses the need for highly concurrent I/O with coroutines. Coroutines let you have a very large number of seemingly simultaneously executing functions in your Python programs. They’re implemented using the async and await keywords along with the same infrastructure that powers generators (see Item 43: “Consider Generators Instead of Returning Lists,” Item 46: “Pass Iterators into Generators as Arguments Instead of Calling the send Method,” and Item 47: “Manage Iterative State Transitions with a Class Instead of the Generator throw Method”).

The cost of starting a coroutine is a function call. Once a coroutine is active, it uses less than 1 KB of memory until it’s exhausted. Like threads, coroutines are independent functions that can consume inputs from their environment and produce resulting outputs. The difference is that coroutines pause at each await expression and resume executing an async function after the pending awaitable is resolved (similarly to how yield behaves in generators).

When many separate async functions are advanced in lockstep, they all seem to be running simultaneously, mimicking the concurrent behavior of Python threads. However, coroutines do this without the memory overhead, startup and context switching costs, and complex locking and synchronization code required for threads. The magical mechanism powering coroutines is the event loop, which can do highly concurrent I/O efficiently while rapidly interleaving execution between appropriately written functions.

I can use coroutines to implement the Game of Life. My goal is to allow for I/O to occur within the game_logic function while overcoming the problems from the Thread, Queue, and ThreadPoolExecutor approaches in the previous items. To do this, first I indicate that the game_logic function is a coroutine by defining it using async def instead of def. This will allow me to use the await syntax for I/O, such as an asynchronous read from a socket:

ALIVE = "*"
EMPTY = "-"

class Grid:
    ...

def count_neighbors(y, x, get_cell):
    ...

async def game_logic(state, neighbors):
    ...
    # Do some input/output in here:
    data = await my_socket.read(50)
    ...

Similarly, I can turn step_cell into a coroutine by adding async to its definition and using await for the call to the game_logic function:

async def step_cell(y, x, get_cell, set_cell):
    state = get_cell(y, x)
    neighbors = count_neighbors(y, x, get_cell)
    next_state = await game_logic(state, neighbors)
    set_cell(y, x, next_state)

The simulate function also needs to become a coroutine:

import asyncio

async def simulate(grid):
    next_grid = Grid(grid.height, grid.width)

    tasks = []
    for y in range(grid.height):
        for x in range(grid.width):

            task = step_cell(
                y, x, grid.get, next_grid.set)        # Fan-out
            tasks.append(task)

    await asyncio.gather(*tasks)                      # Fan-in

    return next_grid

The coroutine version of the simulate function requires some explanation:

  • Calling step_cell doesn’t immediately run that function. Instead, it returns a coroutine instance that can be used with an await expression at a later time. This is similar to how a generator function that uses yield returns a generator instance when it’s called instead of executing immediately. Deferring execution like this is the mechanism that causes fan-out.

  • The gather function from the asyncio built-in library is what causes fan-in. The await expression on gather instructs the event loop to run the step_cell coroutines concurrently and resume execution of the simulate coroutine when all the coroutines have been completed (see Item 77: “Mix Threads and Coroutines to Ease the Transition to asyncio” for another approach using asyncio.TaskGroup).

  • No locks are required for the Grid instance since all execution occurs within a single thread. The I/O becomes parallelized as part of the event loop that’s provided by asyncio.

Finally, I can drive this code with a one-line change to the original example. This relies on the asyncio.run function to execute the simulate coroutine in an event loop and carry out its dependent I/O:

class ColumnPrinter:
    ...

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = asyncio.run(simulate(grid))  # Run the event loop

print(columns)

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

The result is the same as before. All the overhead associated with threads has been eliminated. Whereas the Queue and ThreadPoolExecutor approaches are limited in their exception handling—merely re-raising exceptions across thread boundaries—with coroutines I can even use the interactive debugger to step through the exception handling code line by line (see Item 114: “Consider Interactive Debugging with pdb”).

Later, if my requirements change and I also need to do I/O from within count_neighbors, I can easily accomplish this by adding async and await keywords to the existing functions and call sites instead of having to restructure everything as I would have to do if I were using Thread or Queue instances (see Item 76: “Know How to Port Threaded I/O to asyncio” for another example):

async def count_neighbors(y, x, get_cell):
    ...

async def step_cell(y, x, get_cell, set_cell):
    state = get_cell(y, x)
    neighbors = await count_neighbors(y, x, get_cell)
    next_state = await game_logic(state, neighbors)
    set_cell(y, x, next_state)

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = asyncio.run(simulate(grid))

print(columns)

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

The beauty of coroutines is that they decouple your code’s instructions for the external environment (i.e., I/O) from the implementation that carries out your wishes (i.e., the event loop). They let you focus on the logic of what you’re actually trying to do instead of wasting time trying to figure out how you’re going to accomplish your goals concurrently.

Things to Remember

  • Images Functions that are defined using the async keyword are called coroutines. A caller can receive the result of a dependent coroutine by using the await keyword.

  • Images Coroutines provide an efficient way to run tens of thousands of functions seemingly at the same time.

  • Images Coroutines can use fan-out and fan-in in order to parallelize I/O, while also overcoming all the problems associated with doing I/O in threads.

Item 76: Know How to Port Threaded I/O to asyncio

Once you understand the advantage of coroutines (see Item 75: “Achieve Highly Concurrent I/O with Coroutines”), it may seem daunting to port an existing codebase to use them. Luckily, Python’s support for asynchronous execution is well integrated into the language. This makes it straightforward to move code that does threaded, blocking I/O over to coroutines and asynchronous I/O.

For example, say that I have a TCP-based server for playing a “guess the number” game. The server takes lower and upper parameters that determine the range of numbers to consider. Then the server returns guesses for integer values in that range as they are requested by the client. Finally, the server collects reports from the client on whether each of those numbers was closer (warmer) or further away (colder) from the client’s secret number.

The most common way to build this type of client/server system is by using blocking I/O and threads (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”). To do this, I need a helper class that can manage sending and receiving messages. For my purposes, each line sent or received represents a command to be processed:

class EOFError(Exception):
    pass

class Connection:
    def __init__(self, connection):
        self.connection = connection
        self.file = connection.makefile("rb")

    def send(self, command):
        line = command + "\n"
        data = line.encode()
        self.connection.send(data)

    def receive(self):
        line = self.file.readline()
        if not line:
            raise EOFError("Connection closed")
        return line[:-1].decode()

The server is implemented as a class that handles one connection at a time and maintains the game’s session state:

import random

WARMER = "Warmer"
COLDER = "Colder"
SAME = "Same"
UNSURE = "Unsure"
CORRECT = "Correct"

class UnknownCommandError(Exception):
    pass

class ServerSession(Connection):
    def __init__(self, *args):
        super().__init__(*args)
        self.clear_state()

It has one primary method that handles incoming commands from the client and dispatches them to methods as needed. Here I use a match statement to parse the semi-structured data (see Item 9: “Consider match for Destructuring in Flow Control; Avoid When if Statements Are Sufficient” for details):

    def loop(self):
        while command := self.receive():
            match command.split(" "):
                case "PARAMS", lower, upper:
                    self.set_params(lower, upper)
                case ["NUMBER"]:
                    self.send_number()
                case "REPORT", decision:
                    self.receive_report(decision)
                case ["CLEAR"]:
                    self.clear_state()
                case _:
                    raise UnknownCommandError(command)

The first command sets the lower and upper bounds for the numbers that the server is trying to guess:

    def set_params(self, lower, upper):
        self.clear_state()
        self.lower = int(lower)
        self.upper = int(upper)

The second command makes a new guess based on the previous state that’s stored in the ServerSession instance. Specifically, this code ensures that the server will never try to guess the same number more than once per parameter assignment:

    def next_guess(self):
        if self.secret is not None:
            return self.secret

        while True:
            guess = random.randint(self.lower, self.upper)
            if guess not in self.guesses:
                return guess

    def send_number(self):
        guess = self.next_guess()
        self.guesses.append(guess)
        self.send(format(guess))

The third command receives the decision from the client about whether the guess was warmer, colder, the same, or correct, and it updates the ServerSession state accordingly:

    def receive_report(self, decision):
        last = self.guesses[-1]
        if decision == CORRECT:
            self.secret = last

        print(f"Server: {last} is {decision}")

The last command clears the state to end a game whether it was successful or not:

    def clear_state(self):
        self.lower = None
        self.upper = None
        self.secret = None
        self.guesses = []

A game is initiated by using a with statement to ensure that state is correctly managed on the server side (see Item 82: “Consider contextlib and with Statements for Reusable try/finally Behavior” for background and Item 78: “Maximize Responsiveness of asyncio Event Loops with async-Friendly Worker Threads” for another example). This new_game function sends the first and last commands to the server and provides a context object to use for the duration of the game:

import contextlib

@contextlib.contextmanager
def new_game(connection, lower, upper, secret):
    print(
        f"Guess a number between {lower} and {upper}!"
        f" Shhhhh, it's {secret}."
    )
    connection.send(f"PARAMS {lower} {upper}")
    try:
        yield ClientSession(
            connection.send,
            connection.receive,
            secret,
        )
    finally:
        connection.send("CLEAR")

I use a stateful class with helper methods for game actions and references to manage each game session (see Item 48: “Accept Functions Instead of Classes for Simple Interfaces” for why I pass in send and receive explicitly):

import math

class ClientSession:
    def __init__(self, send, receive, secret):
        self.send = send
        self.receive = receive
        self.secret = secret
        self.last_distance = None

New guesses are requested from the server using a method that implements the second command:

    def request_number(self):
        self.send("NUMBER")
        data = self.receive()
        return int(data)

Whether each guess from the server was warmer or colder than the last is reported using the third command:

    def report_outcome(self, number):
        new_distance = math.fabs(number - self.secret)

        if new_distance == 0:
            decision = CORRECT
        elif self.last_distance is None:
            decision = UNSURE
        elif new_distance < self.last_distance:
            decision = WARMER
        elif new_distance > self.last_distance:
            decision = COLDER
        else:
            decision = SAME

        self.last_distance = new_distance

        self.send(f"REPORT {decision}")
        return decision

The game session object can be iterated over (see Item 21: “Be Defensive when Iterating over Arguments” for background) to make new, unique guesses repeatedly until the correct answer is found:

    def __iter__(self):
        while True:
            number = self.request_number()
            decision = self.report_outcome(number)
            yield number, decision
            if decision == CORRECT:
                return

I can run the server by having one thread listen on a socket and spawn additional threads to handle each new client connection:

import socket
from threading import Thread

def handle_connection(connection):
    with connection:
        session = ServerSession(connection)
        try:
            session.loop()
        except EOFError:
            pass

def run_server(address):
    with socket.socket() as listener:
        listener.bind(address)
        listener.listen()
        while True:
            connection, _ = listener.accept()
            thread = Thread(
                target=handle_connection,
                args=(connection,),
                daemon=True,
            )
            thread.start()

The client runs in the main thread and returns the results of the guessing game back to the caller. Perhaps a bit awkwardly, this code exercises a variety of Python language features (for loops, with statements, generators, comprehensions, the iterator protocol) so that below I can show what it takes to port each of these over to using coroutines:

def run_client(address):
    with socket.create_connection(address) as server_sock:
        server = Connection(server_sock)

        with new_game(server, 1, 5, 3) as session:
            results = [outcome for outcome in session]

        with new_game(server, 10, 15, 12) as session:
            for outcome in session:
                results.append(outcome)

        with new_game(server, 1, 3, 2) as session:
            it = iter(session)
            while True:
                try:
                    outcome = next(it)
                except StopIteration:
                    break
                else:
                    results.append(outcome)

    return results

Finally, I can glue all this together and confirm that it works as expected:

def main():
    address = ("127.0.0.1", 1234)
    server_thread = Thread(
        target=run_server, args=(address,), daemon=True
    )
    server_thread.start()

    results = run_client(address)
    for number, outcome in results:
        print(f"Client: {number} is {outcome}")

main()

>>>
Guess a number between 1 and 5! Shhhhh, it's 3.
Server: 4 is Unsure
Server: 1 is Colder
Server: 5 is Same
Server: 3 is Correct
Guess a number between 10 and 15! Shhhhh, it's 12.
Server: 11 is Unsure
Server: 10 is Colder
Server: 12 is Correct
Guess a number between 1 and 3! Shhhhh, it's 2.
Server: 3 is Unsure
Server: 2 is Correct
Client: 4 is Unsure
Client: 1 is Colder
Client: 5 is Same
Client: 3 is Correct
Client: 11 is Unsure
Client: 10 is Colder
Client: 12 is Correct
Client: 3 is Unsure
Client: 2 is Correct

How much effort is needed to convert this example to using async, await, and the asyncio built-in module?

First, I need to update my Connection class to provide coroutine methods for send and receive instead of blocking I/O methods. I’ve marked each line that’s changed with a # Changed comment to make it clear what the delta is between this new example and the code above:

class AsyncConnection:
    def __init__(self, reader, writer):      # Changed
        self.reader = reader                 # Changed
        self.writer = writer                 # Changed

    async def send(self, command):
        line = command + "\n"
        data = line.encode()
        self.writer.write(data)              # Changed
        await self.writer.drain()            # Changed

    async def receive(self):
        line = await self.reader.readline()  # Changed
        if not line:
            raise EOFError("Connection closed")
        return line[:-1].decode()

I can create another stateful class to represent the server session state for a single connection. The only changes here are the class’s name and inheriting from AsyncConnection instead of Connection:

class AsyncServerSession(AsyncConnection):  # Changed
    def __init__(self, *args):
        ...

The primary entry point for the server’s command-processing loop requires only minimal changes to become a coroutine:

    async def loop(self):                       # Changed
        while command := await self.receive():  # Changed
            match command.split(" "):
                case "PARAMS", lower, upper:
                    self.set_params(lower, upper)
                case ["NUMBER"]:
                    await self.send_number()    # Changed
                case "REPORT", decision:
                    self.receive_report(decision)
                case ["CLEAR"]:
                    self.clear_state()
                case _:
                    raise UnknownCommandError(command)

No changes are required for handling the first command:

    def set_params(self, lower, upper):
        ...

The only change required for the second command is allowing asynchronous I/O to be used when guesses are transmitted to the client:

    def next_guess(self):
        ...

    async def send_number(self):                    # Changed
        guess = self.next_guess()
        self.guesses.append(guess)
        await self.send(format(guess))              # Changed

No changes are required in the third and fourth commands:

    def receive_report(self, decision):
        ...

    def clear_state(self):
        ...

Initiating a new game on the client requires a few async and await keywords to be added for sending the first and last commands. It also needs to use the asynccontextmanager helper function from the contextlib built-in module:

@contextlib.asynccontextmanager                       # Changed
async def new_async_game(
    connection, lower, upper, secret):                # Changed
    print(
        f"Guess a number between {lower} and {upper}!"
        f" Shhhhh, it's {secret}."
    )
    await connection.send(f"PARAMS {lower} {upper}")   # Changed
    try:
        yield AsyncClientSession(
            connection.send,
            connection.receive,
            secret,
        )
    finally:
        await connection.send("CLEAR")                # Changed

The asynchronous version of the ClientSession class for representing game state has the same constructor as before:

class AsyncClientSession:
    def __init__(self, send, receive, secret):
        ...

The second command only requires the addition of async and await anywhere asynchronous behavior is required:

    async def request_number(self):
        await self.send("NUMBER")    # Changed
        data = await self.receive()  # Changed
        return int(data)

The third command only requires adding one async and one await keyword:

    async def report_outcome(self, number):    # Changed
        new_distance = math.fabs(number - self.secret)

        if new_distance == 0:
            decision = CORRECT
        elif self.last_distance is None:
            decision = UNSURE
        elif new_distance < self.last_distance:
            decision = WARMER
        elif new_distance > self.last_distance:
            decision = COLDER
        else:
            decision = SAME

        self.last_distance = new_distance

        await self.send(f"REPORT {decision}")         # Changed
        return decision

To enable asynchronous iteration, I need to implement __aiter__ instead of __iter__, with corresponding additions of async and await:

    async def __aiter__(self):                        # Changed
        while True:
            number = await self.request_number()      # Changed
            decision = await self.report_outcome(
                number)                               # Changed
            yield number, decision
            if decision == CORRECT:
                return

The code that runs the server needs to be completely reimplemented to use the asyncio built-in module and its start_server function:

import asyncio

async def handle_async_connection(reader, writer):
    session = AsyncServerSession(reader, writer)
    try:
        await session.loop()
    except EOFError:
        pass

async def run_async_server(address):
    server = await asyncio.start_server(
        handle_async_connection, *address
    )
    async with server:
        await server.serve_forever()

The run_client function that initiates the game requires changes on nearly every line. Any code that previously interacted with the blocking socket instances has to be replaced with asyncio versions of similar functionality (which are marked with # New below). All other lines in the function that require interaction with coroutines need to use async and await keywords; coroutine-specific functions like aiter and anext; or async-specific constants like StopAsyncIteration. If you forget to add one of these keywords in a necessary place, an exception will be raised at runtime.

async def run_async_client(address):
    streams = await asyncio.open_connection(*address)  # New
    client = AsyncConnection(*streams)                 # New

    async with new_async_game(client, 1, 5, 3) as session:
        results = [outcome async for outcome in session]

    async with new_async_game(client, 10, 15, 12) as session:
        async for outcome in session:
            results.append(outcome)

    async with new_async_game(client, 1, 3, 2) as session:
        it = aiter(session)
        while True:
            try:
                outcome = await anext(it)
            except StopAsyncIteration:
                break
            else:
                results.append(outcome)

    _, writer = streams                                # New
    writer.close()                                     # New
    await writer.wait_closed()                         # New

    return results

What’s most interesting about run_async_client is that I didn’t have to restructure any of the substantive parts of interacting with AsyncClient in order to port this function over to use coroutines. Each of the language features that I needed has a corresponding asynchronous version, which made the migration straightforward.

This transition won’t always be easy. For example, in the standard library, there are currently no asynchronous versions of the utility functions from itertools (see Item 24: “Consider itertools for Working with Iterators and Generators”). There’s also no asynchronous version of yield from (see Item 45: “Compose Multiple Generators with yield from”), which makes composing generators noisier. Many community libraries help fill these gaps (see Item 116: “Know Where to Find Community-Built Modules”), but it can still take extra work, depending on the complexity of your code.

Finally, the glue needs to be updated to run this new asynchronous example end-to-end. I use the asyncio.create_task function to enqueue the server for execution on the event loop so that it runs in parallel with the client when the await expression is reached. This is another approach to causing fan-out with different behavior than what occurs with the asyncio.gather function:

async def main_async():
    address = ("127.0.0.1", 4321)

    server = run_async_server(address)
    asyncio.create_task(server)

    results = await run_async_client(address)
    for number, outcome in results:
        print(f"Client: {number} is {outcome}")

asyncio.run(main_async())

>>>
Guess a number between 1 and 5! Shhhhh, it's 3.
Server: 5 is Unsure
Server: 4 is Warmer
Server: 2 is Same
Server: 1 is Colder
Server: 3 is Correct
Guess a number between 10 and 15! Shhhhh, it's 12.
Server: 14 is Unsure
Server: 10 is Same
Server: 15 is Colder
Server: 12 is Correct
Guess a number between 1 and 3! Shhhhh, it's 2.
Server: 2 is Correct
Client: 5 is Unsure
Client: 4 is Warmer
Client: 2 is Same
Client: 1 is Colder
Client: 3 is Correct
Client: 14 is Unsure
Client: 10 is Same
Client: 15 is Colder
Client: 12 is Correct
Client: 2 is Correct

This works as expected. The coroutine version is easier to follow because all the interactions with threads have been removed. The asyncio built-in module also provides many helper functions that reduce the amount of socket boilerplate code required to write a server like this.

Your use case may be more difficult to port for a variety of reasons. The asyncio module has a vast number of I/O, synchronization, and task management features that could make adopting coroutines easier for you (see Item 77: “Mix Threads and Coroutines to Ease the Transition to asyncio” and Item 78: “Maximize Responsiveness of asyncio Event Loops with async-Friendly Worker Threads”). Be sure to check out the online documentation for the library (https://docs.python.org/3/library/asyncio.html) to understand its full potential.

Things to Remember

  • Images Python provides asynchronous versions of for loops, with statements, generators, comprehensions, iterators, and library helper functions that can be used as drop-in replacements in coroutines.

  • Images The asyncio built-in module makes it straightforward to port existing code that uses threads and blocking I/O over to coroutines and asynchronous I/O.

Item 77: Mix Threads and Coroutines to Ease the Transition to asyncio

In the previous item (see Item 76: “Know How to Port Threaded I/O to asyncio”), I ported a TCP server that does blocking I/O with threads over to use asyncio with coroutines. It was a big-bang transition: I moved all of the code to the new style in one go. But it’s rarely feasible to port a large program this way. Instead, you usually need to incrementally migrate your codebase while also updating your tests as needed and verifying that everything works at each step along the way.

In order to do that, your codebase needs to be able to use threads for blocking I/O (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”) and coroutines for asynchronous I/O (see Item 75: “Achieve Highly Concurrent I/O with Coroutines”) at the same time in a way that’s mutually compatible. Practically, this means you need threads to be able to run coroutines, and you need coroutines to be able to start and wait on threads. Luckily, asyncio includes built-in facilities for making this type of interoperability straightforward.

For example, say that I’m writing a program that merges log files together into one output stream in order to aid with debugging. Given a file handle for an input log, I need a way to detect whether new data is available and return the next line of input. I can do this by using the tell method of the file handle to check whether the current read position matches the length of the file. When no new data is present, an exception should be raised (see Item 32: “Prefer Raising Exceptions to Returning None” for background):

class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()

By wrapping this function in a while loop, I can turn it into a worker thread. When a new line is available, I call a given callback function to write it to the output log (see Item 48: “Accept Functions Instead of Classes for Simple Interfaces” for why to use a function interface for this instead of a class). When no data is available, the thread sleeps to reduce the amount of busy waiting caused by polling for new data. When the input file handle is closed, the worker thread exits:

import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)

Now I can start one worker thread per input file and unify the output of these threads into a single output file. Below, the write closure function (see Item 33: “Know How Closures Interact with Variable Scope and nonlocal”) needs to use a Lock instance (see Item 69: “Use Lock to Prevent Data Races in Threads”) in order to serialize writes to the output stream and ensure that there are no intra-line conflicts:

from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, "wb") as output:
        lock = Lock()

        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

As long as an input file handle is still alive, its corresponding worker thread will also stay alive. That means it’s sufficient to wait for the join method from each thread to complete in order to know that the whole process is done.

Given a set of input paths and an output path, I can call run_threads and confirm that it works as expected. How the input file handles are created or separately closed isn’t important in order to demonstrate this code’s behavior, nor is the output verification function—defined in confirm_merge that follows—which is why I’ve left them out here:

def confirm_merge(input_paths, output_path):
    ...

input_paths = ...
handles = ...
output_path = ...
run_threads(handles, 0.1, output_path)

confirm_merge(input_paths, output_path)

With this threaded implementation as the starting point, how can I incrementally convert this code to use asyncio and coroutines instead? There are two approaches: top-down and bottom-up.

Top-Down Approach

Top-down means starting at the highest parts of a codebase, like in the main entry points, and working down to the individual functions and classes that are the leaves of the call hierarchy. This approach can be useful when you maintain a lot of common modules that you use across many different programs. By porting the entry points first, you can wait to port the common modules until you’re already using coroutines everywhere else.

These are the concrete steps:

  1. Change a top function to use async def instead of def.

  2. Wrap all of its calls that do I/O—potentially blocking the event loop—to use asyncio.run_in_executor instead.

  3. Ensure that the resources or callbacks used by run_in_executor invocations are properly synchronized (i.e., using Lock or the asyncio.run_coroutine_threadsafe function with a fan-in event loop instance).

  4. Try to eliminate get_event_loop and run_in_executor calls by moving downward through the call hierarchy and converting intermediate functions and methods to coroutines (following the first three steps).

Here I apply steps 1–3 to the run_threads function:

import asyncio

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    output = await loop.run_in_executor(
        None, open, output_path, "wb")
    try:
        async def write_async(data):
            await loop.run_in_executor(None, output.write, data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write
            )
            tasks.append(task)

        await asyncio.gather(*tasks)

    finally:
        await loop.run_in_executor(None, output.close)

The run_in_executor method instructs the event loop to run a given function—which may include blocking I/O—using a ThreadPoolExecutor, ensuring that it doesn’t interfere with the event loop’s thread (see Item 74: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency” for background). By making multiple calls to run_in_executor without corresponding await expressions, the run_tasks_mixed coroutine fans out to have one concurrent line of execution for each input file. Then the asyncio.gather fans in the tail_file threads until they all complete (see Item 71: “Know How to Recognize When Concurrency Is Necessary” for more about fan-out and fan-in).

This code eliminates the need for the Lock instance in the write helper by using asyncio.run_coroutine_threadsafe. This function allows plain old threads to call a coroutine—write_async in this case—and have it execute in the event loop from the explicitly supplied main thread. This effectively synchronizes the worker threads together, ensuring that all writes to the output file happen one at a time. Once the asyncio.TaskGroup awaitable is resolved, I can assume that all writes to the output file have also completed, and thus I can close the output file handle without having to worry about race conditions.

I can verify that this code works as expected by using the asyncio.run function to start the coroutine and run the main event loop:

input_paths = ...
handles = ...
output_path = ...
asyncio.run(run_tasks_mixed(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

Now I can apply step 4 to the run_tasks_mixed function by moving down the call stack. I can redefine the tail_file dependent function to be an asynchronous coroutine instead of doing blocking I/O by following steps 1–3:

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)

The new tail_async function allows me to eliminate the run_tasks_mixed function’s calls to run_coroutine_threadsafe and the write wrapper function. I can also use asyncio.TaskGroup (new in Python 3.11) to manage fan-out and fan-in for the tail_async coroutines, further shortening the code:

async def run_tasks(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    output = await loop.run_in_executor(
        None, open, output_path, "wb")
    try:

        async def write_async(data):
            await loop.run_in_executor(None, output.write, data)

        async with asyncio.TaskGroup() as group:
            for handle in handles:
                group.create_task(
                    tail_async(handle, interval, write_async)
                )
    finally:
        await loop.run_in_executor(None, output.close)

I can verify that run_tasks works as expected, too:

input_paths = ...
handles = ...
output_path = ...
asyncio.run(run_tasks(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

It’s possible to continue this refactoring approach and convert readline into an asynchronous coroutine as well. However, that function requires so many blocking file I/O operations that it doesn’t seem worth porting, given how much that would reduce the clarity of the code. In some situations, it makes sense to move everything to asyncio, and in others it doesn’t.

Bottom-Up Approach

The bottom-up approach to adopting coroutines has four steps that are similar to the steps of the top-down style, but the process traverses the call hierarchy in the opposite direction: from leaves to entry points.

These are the concrete steps:

  1. Create a new asynchronous coroutine version of each leaf function that you’re trying to port.

  2. Change the existing synchronous functions so they call the coroutine versions and run the event loop instead of implementing any real asynchronous behavior.

  3. Move up a level of the call hierarchy, make another layer of coroutines, and replace existing calls to synchronous functions with calls to the coroutines defined in step 1.

  4. Delete synchronous wrappers around coroutines created in step 2 as you stop requiring them to glue the pieces together.

For the example above, I would start with the tail_file function since I decided that the readline function should keep using blocking I/O. I can rewrite tail_file so it merely wraps the tail_async coroutine that I defined above. The provided write_func, which uses blocking I/O, can be run by the write_async function using run_in_executor, making it compatible with what tail_async expects. To run each worker coroutine until it finishes, I can create an event loop for each tail_file thread and then call its run_until_complete method. This method will block the current thread and drive the event loop until the tail_async coroutine exits, achieving the same behavior as the threaded, blocking I/O version of tail_file:

def tail_file(handle, interval, write_func):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def write_async(data):
        await loop.run_in_executor(None, write_func, data)

    coro = tail_async(handle, interval, write_async)
    loop.run_until_complete(coro)

This new tail_file function is a drop-in replacement for the old one. I can verify that everything works as expected by calling run_threads again:

input_paths = ...
handles = ...
output_path = ...
run_threads(handles, 0.1, output_path)

confirm_merge(input_paths, output_path)

After wrapping tail_async with tail_file, the next step is to convert the run_threads function to a coroutine. This ends up being the same work as step 4 of the top-down approach above, so at this point, the styles converge.

This is all a great start for adopting asyncio, but there’s even more that you could do to increase the responsiveness of your program (see Item 78: “Maximize Responsiveness of asyncio Event Loops with async-Friendly Worker Threads”).

Things to Remember

  • Images The awaitable run_in_executor method of the asyncio event loop enables coroutines to run synchronous functions in ThreadPoolExecutor worker threads. This facilitates top-down migration to asyncio.

  • Images The run_until_complete method of the asyncio event loop enables synchronous code to run a coroutine until it finishes. The asyncio.run_coroutine_threadsafe function provides the same functionality across thread boundaries. Together these help with bottom-up migration to asyncio.

Item 78: Maximize Responsiveness of asyncio Event Loops with async-Friendly Worker Threads

In the previous item I showed how to migrate to asyncio incrementally (see Item 77: “Mix Threads and Coroutines to Ease the Transition to asyncio” for background and the implementation of various functions below). The resulting coroutine properly tails input files and merges them into a single output:

import asyncio

async def run_tasks(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    output = await loop.run_in_executor(
        None, open, output_path, "wb")
    try:

        async def write_async(data):
            await loop.run_in_executor(None, output.write, data)

        async with asyncio.TaskGroup() as group:
            for handle in handles:
                group.create_task(
                    tail_async(handle, interval, write_async)
                )
    finally:
        await loop.run_in_executor(None, output.close)

This code is quite noisy and repetitive with all the run_in_executor boilerplate to handle the boundary between synchronous and asynchronous function calls. The function would be a lot shorter if I accepted the fact that calls to open, close, and write for the output file handle will block the event loop—and for the purpose of merging multiple file handles like this, it’s functionally correct, too:

async def run_tasks_simpler(handles, interval, output_path):
    with open(output_path, "wb") as output:  # Changed

        async def write_async(data):
            output.write(data)               # Changed

        async with asyncio.TaskGroup() as group:
            for handle in handles:
                group.create_task(
                    tail_async(handle, interval, write_async)
                )

However, avoiding run_in_executor like this is bad because these operations all require making system calls to the program’s host operating system, which may block the event loop for significant amounts of time and prevent other coroutines from making progress. This could hurt overall responsiveness and increase latency, especially for programs with event loops that are shared by many components, such as highly concurrent servers.

But how bad is it to block the event loop, really? And how often does it happen in practice? I can detect when this problem occurs in a real program by passing the debug=True parameter to the asyncio.run function. Here I show how the file and line of a bad coroutine, presumably blocked on a slow system call, can be identified:

import time

async def slow_coroutine():
    time.sleep(0.5)  # Simulating slow I/O

asyncio.run(slow_coroutine(), debug=True)

>>>
Executing <Task finished name='Task-1' coro=<slow_coroutine()
➥done, defined at example.py:61> result=None created at
➥.../asyncio/runners.py:100> took 0.506 seconds
...

If I want the most responsive program possible, then I need to minimize the potential system calls that are made from within the main event loop. Using run_in_executor is one way to do that, but it requires a lot of boilerplate, as shown above. One potentially better alternative is to create a new Thread subclass (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”) that encapsulates everything required to write to the output file using its own independent event loop:

from threading import Thread

class WriteThread(Thread):
    def __init__(self, output_path):
        super().__init__()
        self.output_path = output_path
        self.output = None
        self.loop = asyncio.new_event_loop()

    def run(self):
        asyncio.set_event_loop(self.loop)
        with open(self.output_path, "wb") as self.output:
            self.loop.run_forever()

        # Run one final round of callbacks so the await on
        # stop() in another event loop will be resolved.
        self.loop.run_until_complete(asyncio.sleep(0))

Coroutines in other threads can directly call and await on the write method of this class, since it’s merely a thread-safe wrapper around the real_write method that actually does the I/O. This eliminates the need for Lock (see Item 69: “Use Lock to Prevent Data Races in Threads”):

    async def real_write(self, data):
        self.output.write(data)

    async def write(self, data):
        coro = self.real_write(data)
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)

Other coroutines can tell the worker thread when to stop in a thread-safe manner, using similar boilerplate:

    async def real_stop(self):
        self.loop.stop()

    async def stop(self):
        coro = self.real_stop()
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)

I can also define the __aenter__ and __aexit__ methods to allow this class to be used in with statements (see Item 82: “Consider contextlib and with Statements for Reusable try/finally Behavior” and Item 76: “Know How to Port Threaded I/O to asyncio” for background). This ensures that the worker thread starts and stops at the right times without slowing down the main event loop thread:

    async def __aenter__(self):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.start)
        return self

    async def __aexit__(self, *_):
        await self.stop()

With this new WriteThread class, I can refactor run_tasks into a fully asynchronous version that’s easy to read, doesn’t interfere with the main event loop’s default executor, and completely avoids running slow system calls in the main event loop thread:

def readline(handle):
    ...

async def tail_async(handle, interval, write_func):
    ...

async def run_fully_async(handles, interval, output_path):
    async with (
        WriteThread(output_path) as output,
        asyncio.TaskGroup() as group,
    ):
        for handle in handles:
            group.create_task(
                tail_async(handle, interval, output.write)
            )

I can verify that this works as expected, given a set of input handles and an output file path:

def confirm_merge(input_paths, output_path):
    ...

input_paths = ...
handles = ...
output_path = ...
asyncio.run(run_fully_async(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

Things to Remember

  • Images Making system calls in coroutines—including blocking I/O and starting threads—can reduce program responsiveness and increase the perception of latency.

  • Images Pass the debug=True parameter to asyncio.run in order to detect when certain coroutines are preventing the event loop from reacting quickly.

  • Images To improve the readability of code that must span the boundary between asynchronous and synchronous execution, consider defining helper thread classes that provide coroutine-friendly interfaces.

Item 79: Consider concurrent.futures for True Parallelism

At some point in writing Python programs, you might hit a performance wall. Even after optimizing your Python code (see Item 92: “Profile Before Optimizing”), your program’s execution might still be too slow for your needs. On modern computers that have an increasing number of CPU cores, it’s reasonable to assume that one solution could be parallelism. What if you could split your code’s computation into independent pieces of work that run simultaneously across multiple CPU cores?

Unfortunately, Python’s global interpreter lock (GIL) prevents true CPU parallelism in Python threads in most cases (see Item 68: “Use Threads for Blocking I/O; Avoid for Parallelism”). But the multiprocessing built-in module, which is easily accessed via the concurrent.futures built-in module, might be exactly what you need (see Item 74: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency” for a related example). multiprocessing enables Python to utilize multiple CPU cores in parallel by running additional interpreters as child processes. These child processes are separate from the main interpreter, so their global interpreter locks are also separate. Each child can fully utilize one CPU core. Each child also has a link to the main process where it receives instructions to do computation and returns results.

For example, say that I want to do something computationally intensive with Python and utilize multiple CPU cores. I’ll use an implementation of finding the greatest common divisor of two numbers as a proxy for a more computationally intense algorithm (like simulating fluid dynamics with the Navier–Stokes equation):

# my_module.py
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i
    raise RuntimeError("Not reachable")

Running this function in serial takes a linearly increasing amount of time because there is no parallelism:

# run_serial.py
import my_module
import time

NUMBERS = [
    (19633090, 22659730),
    (20306770, 38141720),
    (15516450, 22296200),
    (20390450, 20208020),
    (18237120, 19249280),
    (22931290, 10204910),
    (12812380, 22737820),
    (38238120, 42372810),
    (38127410, 47291390),
    (12923910, 21238110),
]

def main():
    start = time.perf_counter()
    results = list(map(my_module.gcd, NUMBERS))
    end = time.perf_counter()
    delta = end - start
    print(f"Took {delta:.3f} seconds")

if __name__ == "__main__":
    main()

>>>
Took 5.643 seconds

Running this code on multiple Python threads will yield no speed improvement because the GIL prevents Python from using multiple CPU cores in parallel. Here I do the same computation as above but using the concurrent.futures module with its ThreadPoolExecutor class and eight worker threads (to match the number of CPU cores on my computer):

# run_threads.py
import my_module
from concurrent.futures import ThreadPoolExecutor
import time

NUMBERS = [
    ...
]

def main():
    start = time.perf_counter()
    pool = ThreadPoolExecutor(max_workers=8)
    results = list(pool.map(my_module.gcd, NUMBERS))
    end = time.perf_counter()
    delta = end - start
    print(f"Took {delta:.3f} seconds")

if __name__ == "__main__":
    main()

>>>
Took 5.810 seconds

It’s even slower this time because of the overhead of starting and communicating with the pool of threads.

Now for the surprising part: Changing a single line of code causes something magical to happen. If I replace ThreadPoolExecutor with ProcessPoolExecutor from the concurrent.futures module, everything speeds up:

# run_parallel.py
import my_module
from concurrent.futures import ProcessPoolExecutor
import time

NUMBERS = [
    ...
]

def main():
    start = time.perf_counter()
    pool = ProcessPoolExecutor(max_workers=8)  # The one change
    results = list(pool.map(my_module.gcd, NUMBERS))
    end = time.perf_counter()
    delta = end - start
    print(f"Took {delta:.3f} seconds")

if __name__ == "__main__":
    main()

>>>
Took 1.684 seconds

Running on my multi-core machine, this is significantly faster! How is this possible? Here’s what the ProcessPoolExecutor class actually does (via the low-level constructs provided by the multiprocessing module):

  1. It takes each item from the numbers input data to map.

  2. It serializes the item into binary data by using the pickle module (see Item 107: “Make pickle Serialization Maintainable with copyreg”).

  3. It copies the serialized data from the main interpreter process to a child interpreter process over a local socket.

  4. It deserializes the data back into Python objects, using pickle in the child process.

  5. It imports the Python module containing the gcd function.

  6. It runs the function on the input data in parallel with other child processes.

  7. It serializes the result back into binary data.

  8. It copies that binary data back through the socket.

  9. It deserializes the binary data back into Python objects in the parent process.

  10. It merges the results from multiple children into a single list to return.

Although using the pool.map method looks simple, the multiprocessing module and ProcessPoolExecutor class do a huge amount of work to make parallelism possible. In most other languages, the only touch point you need to coordinate two threads is a single lock or atomic operation (see Item 69: “Use Lock to Prevent Data Races in Threads” for an example). The overhead of using multiprocessing via ProcessPoolExecutor is high because of all the serialization and deserialization that must happen between the parent and child processes.

This scheme is well suited to certain types of isolated, high-leverage tasks. By isolated, I mean functions that don’t need to share state with other parts of the program. By high-leverage, I mean situations in which only a small amount of data must be transferred between the parent and child processes to enable a large amount of computation. The greatest common divisor algorithm is one example of this, but many other mathematical algorithms work similarly.

If your computation doesn’t have these characteristics, then the overhead of ProcessPoolExecutor may prevent it from speeding up your program through parallelization. When that happens, multiprocessing provides more advanced facilities for shared memory, cross-process locks, queues, and proxies. But all of these features are very complex. It’s hard enough to reason about such tools in the memory space of a single process shared between Python threads. Extending that complexity to other processes and involving sockets makes this much more difficult to understand.

I suggest that you initially avoid all parts of the multiprocessing built-in module. You can start by using the ThreadPoolExecutor class to run isolated, high-leverage functions in threads. Later you can move to ProcessPoolExecutor to get a speedup. Finally, when you’ve completely exhausted the other options, you can consider using the multiprocessing module directly or using more advanced techniques (see Item 94: “Know When and How to Replace Python with Another Programming Language”).

Things to Remember

  • Images The multiprocessing module provides powerful tools that can parallelize certain types of Python computation with minimal effort.

  • Images The power of multiprocessing is best accessed through the concurrent.futures built-in module and its simple ProcessPoolExecutor class.

  • Images Avoid the advanced (and complicated) parts of the multiprocessing module until you’ve exhausted all other options.