Python has long supported different approaches to concurrent programming, including programming with threads, launching subprocesses, and various tricks involving generator functions. In this chapter, recipes related to various aspects of concurrent programming are presented, including common thread programming techniques and approaches for parallel processing.
As experienced programmers know, concurrent programming is fraught with potential peril. Thus, a major focus of this chapter is on recipes that tend to lead to more reliable and debuggable code.
The threading library can be used to execute any Python callable
in its own thread. To do this, you create a Thread instance and
supply the callable that you wish to execute as a target. Here is a
simple example:
# Code to execute in an independent threadimporttimedefcountdown(n):whilen>0:('T-minus',n)n-=1time.sleep(5)# Create and launch a threadfromthreadingimportThreadt=Thread(target=countdown,args=(10,))t.start()
When you create a thread instance, it doesn’t start executing until
you invoke its start() method (which invokes the target function
with the arguments you supplied).
Threads are executed in their own system-level thread (e.g., a POSIX thread or Windows threads) that is fully managed by the host operating system. Once started, threads run independently until the target function returns. You can query a thread instance to see if it’s still running:
ift.is_alive():('Still running')else:('Completed')
You can also request to join with a thread, which waits for it to terminate:
t.join()
The interpreter remains running until all threads terminate. For long-running threads or background tasks that run forever, you should consider making the thread daemonic. For example:
t=Thread(target=countdown,args=(10,),daemon=True)t.start()
Daemonic threads can’t be joined. However, they are destroyed automatically when the main thread terminates.
Beyond the two operations shown, there aren’t many other things you can do with threads. For example, there are no operations to terminate a thread, signal a thread, adjust its scheduling, or perform any other high-level operations. If you want these features, you need to build them yourself.
If you want to be able to terminate threads, the thread must be programmed to poll for exit at selected points. For example, you might put your thread in a class such as this:
classCountdownTask:def__init__(self):self._running=Truedefterminate(self):self._running=Falsedefrun(self,n):whileself._runningandn>0:('T-minus',n)n-=1time.sleep(5)c=CountdownTask()t=Thread(target=c.run,args=(10,))t.start()...c.terminate()# Signal terminationt.join()# Wait for actual termination (if needed)
Polling for thread termination can be tricky to coordinate if threads perform blocking operations such as I/O. For example, a thread blocked indefinitely on an I/O operation may never return to check if it’s been killed. To correctly deal with this case, you’ll need to carefully program thread to utilize timeout loops. For example:
classIOTask:defterminate(self):self._running=Falsedefrun(self,sock):# sock is a socketsock.settimeout(5)# Set timeout periodwhileself._running:# Perform a blocking I/O operation w/ timeouttry:data=sock.recv(8192)breakexceptsocket.timeout:continue# Continued processing...# Terminatedreturn
Due to a global interpreter lock (GIL), Python threads are restricted to an execution model that only allows one thread to execute in the interpreter at any given time. For this reason, Python threads should generally not be used for computationally intensive tasks where you are trying to achieve parallelism on multiple CPUs. They are much better suited for I/O handling and handling concurrent execution in code that performs blocking operations (e.g., waiting for I/O, waiting for results from a database, etc.).
Sometimes you will see threads defined via inheritance from the Thread
class. For example:
fromthreadingimportThreadclassCountdownThread(Thread):def__init__(self,n):super().__init__()self.n=0defrun(self):whileself.n>0:('T-minus',self.n)self.n-=1time.sleep(5)c=CountdownThread(5)c.start()
Although this works, it introduces an extra dependency between the code and
the threading library. That is, you can only use the resulting code
in the context of threads, whereas the technique shown earlier involves
writing code with no explicit dependency on threading. By freeing
your code of such dependencies, it becomes usable in other contexts
that may or may not involve threads. For instance, you might be able
to execute your code in a separate process using the multiprocessing module
using code like this:
importmultiprocessingc=CountdownTask(5)p=multiprocessing.Process(target=c.run)p.start()...
Again, this only works if the CountdownTask class has been written
in a manner that is neutral to the actual means of concurrency
(threads, processes, etc.).
A key feature of threads is that they execute independently and
nondeterministically. This can present a tricky synchronization
problem if other threads in the program need to know if a thread has
reached a certain point in its execution before carrying out further
operations. To solve such problems, use the Event object from
the threading library.
Event instances are similar to a “sticky” flag that allows threads to
wait for something to happen. Initially, an event is set to 0. If the
event is unset and a thread waits on the event, it will block (i.e.,
go to sleep) until the event gets set. A thread that sets the event
will wake up all of the threads that happen to be waiting (if any).
If a thread waits on an event that has already been set, it merely moves
on, continuing to execute.
Here is some sample code that uses an Event to coordinate the startup of
a thread:
fromthreadingimportThread,Eventimporttime# Code to execute in an independent threaddefcountdown(n,started_evt):('countdown starting')started_evt.set()whilen>0:('T-minus',n)n-=1time.sleep(5)# Create the event object that will be used to signal startupstarted_evt=Event()# Launch the thread and pass the startup event('Launching countdown')t=Thread(target=countdown,args=(10,started_evt))t.start()# Wait for the thread to startstarted_evt.wait()('countdown is running')
When you run this code, the “countdown is running” message will always
appear after the “countdown starting” message. This is coordinated
by the event that makes the main thread wait until the countdown() function
has first printed the startup message.
Event objects are best used for one-time events. That is, you create
an event, threads wait for the event to be set, and once set, the
Event is discarded. Although it is possible to clear an event using
its clear() method, safely clearing an event and waiting for it to
be set again is tricky to coordinate, and can lead to missed events,
deadlock, or other problems (in particular, you can’t guarantee that a
request to clear an event after setting it will execute
before a released thread cycles back to wait on the event again).
If a thread is going to repeatedly signal an event over and over, you’re
probably better off using a Condition object instead. For example,
this code implements a periodic timer that other threads can monitor
to see whenever the timer expires:
importthreadingimporttimeclassPeriodicTimer:def__init__(self,interval):self._interval=intervalself._flag=0self._cv=threading.Condition()defstart(self):t=threading.Thread(target=self.run)t.daemon=Truet.start()defrun(self):'''Run the timer and notify waiting threads after each interval'''whileTrue:time.sleep(self._interval)withself._cv:self._flag^=1self._cv.notify_all()defwait_for_tick(self):'''Wait for the next tick of the timer'''withself._cv:last_flag=self._flagwhilelast_flag==self._flag:self._cv.wait()# Example use of the timerptimer=PeriodicTimer(5)ptimer.start()# Two threads that synchronize on the timerdefcountdown(nticks):whilenticks>0:ptimer.wait_for_tick()('T-minus',nticks)nticks-=1defcountup(last):n=0whilen<last:ptimer.wait_for_tick()('Counting',n)n+=1threading.Thread(target=countdown,args=(10,)).start()threading.Thread(target=countup,args=(5,)).start()
A critical feature of Event objects is that they wake all waiting
threads. If you are writing a program where you only want to wake up
a single waiting thread, it is probably better to use a Semaphore or
Condition object instead.
For example, consider this code involving semaphores:
# Worker threaddefworker(n,sema):# Wait to be signaledsema.acquire()# Do some work('Working',n)# Create some threadssema=threading.Semaphore(0)nworkers=10forninrange(nworkers):t=threading.Thread(target=worker,args=(n,sema,))t.start()
If you run this, a pool of threads will start, but nothing happens because they’re all blocked waiting to acquire the semaphore. Each time the semaphore is released, only one worker will wake up and run. For example:
>>>sema.release()Working 0>>>sema.release()Working 1>>>
Writing code that involves a lot of tricky synchronization between threads is likely to make your head explode. A more sane approach is to thread threads as communicating tasks using queues or as actors. Queues are described in the next recipe. Actors are described in Recipe 12.10.
You have multiple threads in your program and you want to safely communicate or exchange data between them.
Perhaps the safest way to send data from one thread to another is
to use a Queue from the queue library. To do this, you create
a Queue instance that is shared by the threads. Threads then use
put() or get() operations to add or remove items from the queue.
For example:
fromqueueimportQueuefromthreadingimportThread# A thread that produces datadefproducer(out_q):whileTrue:# Produce some data...out_q.put(data)# A thread that consumes datadefconsumer(in_q):whileTrue:# Get some datadata=in_q.get()# Process the data...# Create the shared queue and launch both threadsq=Queue()t1=Thread(target=consumer,args=(q,))t2=Thread(target=producer,args=(q,))t1.start()t2.start()
Queue instances already have all of the required locking, so they
can be safely shared by as many threads as you wish.
When using queues, it can be somewhat tricky to coordinate the shutdown of the producer and consumer. A common solution to this problem is to rely on a special sentinel value, which when placed in the queue, causes consumers to terminate. For example:
fromqueueimportQueuefromthreadingimportThread# Object that signals shutdown_sentinel=object()# A thread that produces datadefproducer(out_q):whilerunning:# Produce some data...out_q.put(data)# Put the sentinel on the queue to indicate completionout_q.put(_sentinel)# A thread that consumes datadefconsumer(in_q):whileTrue:# Get some datadata=in_q.get()# Check for terminationifdatais_sentinel:in_q.put(_sentinel)break# Process the data...
A subtle feature of this example is that the consumer, upon receiving the special sentinel value, immediately places it back onto the queue. This propagates the sentinel to other consumers threads that might be listening on the same queue—thus shutting them all down one after the other.
Although queues are the most common thread communication mechanism, you can build your own data structures as long as you add the required locking and synchronization. The most common way to do this is to wrap your data structures with a condition variable. For example, here is how you might build a thread-safe priority queue, as discussed in Recipe 1.5.
importheapqimportthreadingclassPriorityQueue:def__init__(self):self._queue=[]self._count=0self._cv=threading.Condition()defput(self,item,priority):withself._cv:heapq.heappush(self._queue,(-priority,self._count,item))self._count+=1self._cv.notify()defget(self):withself._cv:whilelen(self._queue)==0:self._cv.wait()returnheapq.heappop(self._queue)[-1]
Thread communication with a queue is a one-way and nondeterministic
process. In general, there is no way to know when the receiving
thread has actually received a message and worked on it. However,
Queue objects do provide some basic completion features, as
illustrated by the task_done() and join() methods in this example:
fromqueueimportQueuefromthreadingimportThread# A thread that produces datadefproducer(out_q):whilerunning:# Produce some data...out_q.put(data)# A thread that consumes datadefconsumer(in_q):whileTrue:# Get some datadata=in_q.get()# Process the data...# Indicate completionin_q.task_done()# Create the shared queue and launch both threadsq=Queue()t1=Thread(target=consumer,args=(q,))t2=Thread(target=producer,args=(q,))t1.start()t2.start()# Wait for all produced items to be consumedq.join()
If a thread needs to know immediately when a consumer thread has
processed a particular item of data, you should pair the
sent data with an Event object that allows the producer
to monitor its progress. For example:
fromqueueimportQueuefromthreadingimportThread,Event# A thread that produces datadefproducer(out_q):whilerunning:# Produce some data...# Make an (data, event) pair and hand it to the consumerevt=Event()out_q.put((data,evt))...# Wait for the consumer to process the itemevt.wait()# A thread that consumes datadefconsumer(in_q):whileTrue:# Get some datadata,evt=in_q.get()# Process the data...# Indicate completionevt.set()
Writing threaded programs based on simple queuing is often a good way to maintain sanity. If you can break everything down to simple thread-safe queuing, you’ll find that you don’t need to litter your program with locks and other low-level synchronization. Also, communicating with queues often leads to designs that can be scaled up to other kinds of message-based communication patterns later on. For instance, you might be able to split your program into multiple processes, or even a distributed system, without changing much of its underlying queuing architecture.
One caution with thread queues is that putting an item in a queue doesn’t make a copy of the item. Thus, communication actually involves passing an object reference between threads. If you are concerned about shared state, it may make sense to only pass immutable data structures (e.g., integers, strings, or tuples) or to make deep copies of the queued items. For example:
fromqueueimportQueuefromthreadingimportThreadimportcopy# A thread that produces datadefproducer(out_q):whileTrue:# Produce some data...out_q.put(copy.deepcopy(data))# A thread that consumes datadefconsumer(in_q):whileTrue:# Get some datadata=in_q.get()# Process the data...
Queue objects provide a few additional features that may prove to be
useful in certain contexts. If you create a Queue with an optional
size, such as Queue(N), it places a limit on the number of items that
can be enqueued before the put() blocks the producer. Adding an
upper bound to a queue might make sense if there is mismatch in speed
between a producer and consumer. For instance, if a producer is
generating items at a much faster rate than they can be consumed. On
the other hand, making a queue block when it’s full can also have an
unintended cascading effect throughout your program, possibly causing
it to deadlock or run poorly. In general, the problem of “flow
control” between communicating threads is a much harder problem than
it seems. If you ever find yourself trying to fix a problem by fiddling with queue sizes, it could be an indicator of a fragile design or some
other inherent scaling problem.
Both the get() and put() methods support nonblocking and timeouts.
For example:
importqueueq=queue.Queue()try:data=q.get(block=False)exceptqueue.Empty:...try:q.put(item,block=False)exceptqueue.Full:...try:data=q.get(timeout=5.0)exceptqueue.Empty:...
Both of these options can be used to avoid the problem of just blocking
indefinitely on a particular queuing operation. For example, a nonblocking
put() could be used with a fixed-sized queue to implement different kinds
of handling code for when a queue is full. For example, issuing a log
message and discarding:
defproducer(q):...try:q.put(item,block=False)exceptqueue.Full:log.warning('queued item%rdiscarded!',item)
A timeout is useful if you’re trying to make consumer threads
periodically give up on operations such as q.get() so that they can
check things such as a termination flag, as described in
Recipe 12.1.
_running=Truedefconsumer(q):while_running:try:item=q.get(timeout=5.0)# Process item...exceptqueue.Empty:pass
Lastly, there are utility methods q.qsize(), q.full(), q.empty()
that can tell you the current size and status of the queue. However,
be aware that all of these are unreliable in a multithreaded
environment. For example, a call to q.empty() might tell you that
the queue is empty, but in the time that has elapsed since making the
call, another thread could have added an item to the queue.
Frankly, it’s best to write your code not to rely on such functions.
Your program uses threads and you want to lock critical sections of code to avoid race conditions.
To make mutable objects safe to use by multiple threads, use Lock
objects in the threading library, as shown here:
importthreadingclassSharedCounter:'''A counter object that can be shared by multiple threads.'''def__init__(self,initial_value=0):self._value=initial_valueself._value_lock=threading.Lock()defincr(self,delta=1):'''Increment the counter with locking'''withself._value_lock:self._value+=deltadefdecr(self,delta=1):'''Decrement the counter with locking'''withself._value_lock:self._value-=delta
A Lock guarantees mutual exclusion when used with the with
statement—that is, only one thread is allowed to execute the block
of statements under the with statement at a time.
The with statement acquires the lock for the
duration of the indented statements and releases the lock when
control flow exits the indented block.
Thread scheduling is inherently nondeterministic. Because of this, failure to use locks in threaded programs can result in randomly corrupted data and bizarre behavior known as a “race condition.” To avoid this, locks should always be used whenever shared mutable state is accessed by multiple threads.
In older Python code, it is common to see locks explicitly acquired and released. For example, in this variant of the last example:
importthreadingclassSharedCounter:'''A counter object that can be shared by multiple threads.'''def__init__(self,initial_value=0):self._value=initial_valueself._value_lock=threading.Lock()defincr(self,delta=1):'''Increment the counter with locking'''self._value_lock.acquire()self._value+=deltaself._value_lock.release()defdecr(self,delta=1):'''Decrement the counter with locking'''self._value_lock.acquire()self._value-=deltaself._value_lock.release()
The with statement is more elegant and less prone to
error—especially in situations where a programmer might forget to
call the release() method or if a program happens to raise an
exception while holding a lock (the with statement guarantees that
locks are always released in both cases).
To avoid the potential for deadlock, programs that use locks should be written in a way such that each thread is only allowed to acquire one lock at a time. If this is not possible, you may need to introduce more advanced deadlock avoidance into your program, as described in Recipe 12.5.
In the threading library, you’ll find other synchronization
primitives, such as RLock and Semaphore objects. As a general rule
of thumb, these are more special purpose and should not be used for
simple locking of mutable state. An RLock or re-entrant lock object
is a lock that can be acquired multiple times by the same thread. It
is primarily used to implement code based locking or synchronization based
on a construct known as a “monitor.” With this kind of locking,
only one thread is allowed to use an entire function or the
methods of a class while the lock is held. For example, you could
implement the SharedCounter class like this:
importthreadingclassSharedCounter:'''A counter object that can be shared by multiple threads.'''_lock=threading.RLock()def__init__(self,initial_value=0):self._value=initial_valuedefincr(self,delta=1):'''Increment the counter with locking'''withSharedCounter._lock:self._value+=deltadefdecr(self,delta=1):'''Decrement the counter with locking'''withSharedCounter._lock:self.incr(-delta)
In this variant of the code, there is just a single class-level lock
shared by all instances of the class. Instead of the lock being tied
to the per-instance mutable state, the lock is meant to synchronize
the methods of the class. Specifically, this lock ensures that only
one thread is allowed to be using the methods of the class at once.
However, unlike a standard lock, it is OK for methods that already
have the lock to call other methods that also use the lock (e.g., see the decr() method).
One feature of this implementation is that only one lock is created, regardless of how many counter instances are created. Thus, it is much more memory-efficient in situations where there are a large number of counters. However, a possible downside is that it may cause more lock contention in programs that use a large number of threads and make frequent counter updates.
A Semaphore object is a synchronization primitive based on a shared
counter. If the counter is nonzero, the with statement decrements
the count and a thread is allowed to proceed. The counter is incremented
upon the conclusion of the with block. If the counter is zero,
progress is blocked until the counter is incremented by another thread.
Although a semaphore can be used in the same manner as a standard Lock,
the added complexity in implementation negatively impacts performance.
Instead of simple locking, Semaphore objects are more useful for
applications involving signaling between threads or throttling. For
example, if you want to limit the amount of concurrency in a part
of code, you might use a semaphore, as follows:
fromthreadingimportSemaphoreimporturllib.request# At most, five threads allowed to run at once_fetch_url_sema=Semaphore(5)deffetch_url(url):with_fetch_url_sema:returnurllib.request.urlopen(url)
If you’re interested in the underlying theory and implementation of thread synchronization primitives, consult almost any textbook on operating systems.
You’re writing a multithreaded program where threads need to acquire more than one lock at a time while avoiding deadlock.
In multithreaded programs, a common source of deadlock is due to threads that attempt to acquire multiple locks at once. For instance, if a thread acquires the first lock, but then blocks trying to acquire the second lock, that thread can potentially block the progress of other threads and make the program freeze.
One solution to deadlock avoidance is to assign each lock in the program a unique number, and to enforce an ordering rule that only allows multiple locks to be acquired in ascending order. This is surprisingly easy to implement using a context manager as follows:
importthreadingfromcontextlibimportcontextmanager# Thread-local state to stored information on locks already acquired_local=threading.local()@contextmanagerdefacquire(*locks):# Sort locks by object identifierlocks=sorted(locks,key=lambdax:id(x))# Make sure lock order of previously acquired locks is not violatedacquired=getattr(_local,'acquired',[])ifacquiredandmax(id(lock)forlockinacquired)>=id(locks[0]):raiseRuntimeError('Lock Order Violation')# Acquire all of the locksacquired.extend(locks)_local.acquired=acquiredtry:forlockinlocks:lock.acquire()yieldfinally:# Release locks in reverse order of acquisitionforlockinreversed(locks):lock.release()delacquired[-len(locks):]
To use this context manager, you simply allocate lock objects in
the normal way, but use the acquire() function whenever you want
to work with one or more locks. For example:
importthreadingx_lock=threading.Lock()y_lock=threading.Lock()defthread_1():whileTrue:withacquire(x_lock,y_lock):('Thread-1')defthread_2():whileTrue:withacquire(y_lock,x_lock):('Thread-2')t1=threading.Thread(target=thread_1)t1.daemon=Truet1.start()t2=threading.Thread(target=thread_2)t2.daemon=Truet2.start()
If you run this program, you’ll find that it happily runs forever without deadlock—even though the acquisition of locks is specified in a different order in each function.
The key to this recipe lies in the first statement that sorts the locks
according to object identifier. By sorting the locks, they always get
acquired in a consistent order regardless of how the user might have
provided them to acquire().
The solution uses thread-local storage to solve a subtle problem with
detecting potential deadlock if multiple acquire() operations are nested.
For example, suppose you wrote the code like this:
importthreadingx_lock=threading.Lock()y_lock=threading.Lock()defthread_1():whileTrue:withacquire(x_lock):withacquire(y_lock):('Thread-1')defthread_2():whileTrue:withacquire(y_lock):withacquire(x_lock):('Thread-2')t1=threading.Thread(target=thread_1)t1.daemon=Truet1.start()t2=threading.Thread(target=thread_2)t2.daemon=Truet2.start()
If you run this version of the program, one of the threads will crash with an exception such as this:
Exception in thread Thread-1:Traceback (most recent call last):File"/usr/local/lib/python3.3/threading.py", line639, in_bootstrap_innerself.run()File"/usr/local/lib/python3.3/threading.py", line596, inrunself._target(*self._args,**self._kwargs)File"deadlock.py", line49, inthread_1withacquire(y_lock):File"/usr/local/lib/python3.3/contextlib.py", line48, in__enter__returnnext(self.gen)File"deadlock.py", line15, inacquireraiseRuntimeError("Lock Order Violation")RuntimeError:Lock Order Violation>>>
This crash is caused by the fact that each thread remembers the locks it has
already acquired. The acquire() function checks the list of previously acquired
locks and enforces the ordering constraint that previously acquired locks must
have an object ID that is less than the new locks being acquired.
The issue of deadlock is a well-known problem with programs involving threads (as well as a common subject in textbooks on operating systems). As a rule of thumb, as long as you can ensure that threads can hold only one lock at a time, your program will be deadlock free. However, once multiple locks are being acquired at the same time, all bets are off.
Detecting and recovering from deadlock is an extremely tricky problem with few elegant solutions. For example, a common deadlock detection and recovery scheme involves the use of a watchdog timer. As threads run, they periodically reset the timer, and as long as everything is running smoothly, all is well. However, if the program deadlocks, the watchdog timer will eventually expire. At that point, the program “recovers” by killing and then restarting itself.
Deadlock avoidance is a different strategy where locking operations are carried out in a manner that simply does not allow the program to enter a deadlocked state. The solution in which locks are always acquired in strict order of ascending object ID can be mathematically proven to avoid deadlock, although the proof is left as an exercise to the reader (the gist of it is that by acquiring locks in a purely increasing order, you can’t get cyclic locking dependencies, which are a necessary condition for deadlock to occur).
As a final example, a classic thread deadlock problem is the so-called “dining philosopher’s problem.” In this problem, five philosophers sit around a table on which there are five bowls of rice and five chopsticks. Each philosopher represents an independent thread and each chopstick represents a lock. In the problem, philosophers either sit and think or they eat rice. However, in order to eat rice, a philosopher needs two chopsticks. Unfortunately, if all of the philosophers reach over and grab the chopstick to their left, they’ll all just sit there with one stick and eventually starve to death. It’s a gruesome scene.
Using the solution, here is a simple deadlock free implementation of the dining philosopher’s problem:
importthreading# The philosopher threaddefphilosopher(left,right):whileTrue:withacquire(left,right):(threading.currentThread(),'eating')# The chopsticks (represented by locks)NSTICKS=5chopsticks=[threading.Lock()forninrange(NSTICKS)]# Create all of the philosophersforninrange(NSTICKS):t=threading.Thread(target=philosopher,args=(chopsticks[n],chopsticks[(n+1)%NSTICKS]))t.start()
Last, but not least, it should be noted that in order to avoid deadlock,
all locking operations must be carried out using our acquire() function.
If some fragment of code decided to acquire a lock directly, then
the deadlock avoidance algorithm wouldn’t work.
You need to store state that’s specific to the currently executing thread and not visible to other threads.
Sometimes in multithreaded programs, you need to store data that is only
specific to the currently executing thread. To do this, create a
thread-local storage object using threading.local(). Attributes stored
and read on this object are only visible to the executing thread and no
others.
As an interesting practical example of using thread-local storage,
consider the LazyConnection context-manager class that was first defined in
Recipe 8.3. Here is a slightly modified version that safely works
with multiple threads:
fromsocketimportsocket,AF_INET,SOCK_STREAMimportthreadingclassLazyConnection:def__init__(self,address,family=AF_INET,type=SOCK_STREAM):self.address=addressself.family=AF_INETself.type=SOCK_STREAMself.local=threading.local()def__enter__(self):ifhasattr(self.local,'sock'):raiseRuntimeError('Already connected')self.local.sock=socket(self.family,self.type)self.local.sock.connect(self.address)returnself.local.sockdef__exit__(self,exc_ty,exc_val,tb):self.local.sock.close()delself.local.sock
In this code, carefully observe the use of the self.local attribute.
It is initialized as an instance of threading.local(). The other
methods then manipulate a socket that’s stored as self.local.sock.
This is enough to make it possible to safely use an instance of
LazyConnection in multiple threads. For example:
fromfunctoolsimportpartialdeftest(conn):withconnass:s.send(b'GET /index.html HTTP/1.0\r\n')s.send(b'Host: www.python.org\r\n')s.send(b'\r\n')resp=b''.join(iter(partial(s.recv,8192),b''))('Got {} bytes'.format(len(resp)))if__name__=='__main__':conn=LazyConnection(('www.python.org',80))t1=threading.Thread(target=test,args=(conn,))t2=threading.Thread(target=test,args=(conn,))t1.start()t2.start()t1.join()t2.join()
The reason it works is that each thread actually creates
its own dedicated socket connection (stored as self.local.sock).
Thus, when the different threads perform socket operations, they
don’t interfere with one another as they are being performed on
different sockets.
Creating and manipulating thread-specific state is not a problem that often arises in most programs. However, when it does, it commonly involves situations where an object being used by multiple threads needs to manipulate some kind of dedicated system resource, such as a socket or file. You can’t just have a single socket object shared by everyone because chaos would ensue if multiple threads ever started reading and writing on it at the same time. Thread-local storage fixes this by making such resources only visible in the thread where they’re being used.
In this recipe, the use of threading.local() makes the
LazyConnection class support one connection per thread, as opposed to
one connection for the entire process. It’s a subtle but interesting
distinction.
Under the covers, an instance of threading.local() maintains a
separate instance dictionary for each thread. All of the usual
instance operations of getting, setting, and deleting values just
manipulate the per-thread dictionary. The fact that each thread uses
a separate dictionary is what provides the isolation of data.
You want to create a pool of worker threads for serving clients or performing other kinds of work.
The concurrent.futures library has a ThreadPoolExecutor class
that can be used for this purpose. Here is an example of a
simple TCP server that uses a thread-pool to serve clients:
fromsocketimportAF_INET,SOCK_STREAM,socketfromconcurrent.futuresimportThreadPoolExecutordefecho_client(sock,client_addr):'''Handle a client connection'''('Got connection from',client_addr)whileTrue:msg=sock.recv(65536)ifnotmsg:breaksock.sendall(msg)('Client closed connection')sock.close()defecho_server(addr):pool=ThreadPoolExecutor(128)sock=socket(AF_INET,SOCK_STREAM)sock.bind(addr)sock.listen(5)whileTrue:client_sock,client_addr=sock.accept()pool.submit(echo_client,client_sock,client_addr)echo_server(('',15000))
If you want to manually create your own thread pool, it’s usually
easy enough to do it using a Queue. Here is a slightly
different, but manual implementation of the same code:
fromsocketimportsocket,AF_INET,SOCK_STREAMfromthreadingimportThreadfromqueueimportQueuedefecho_client(q):'''Handle a client connection'''sock,client_addr=q.get()('Got connection from',client_addr)whileTrue:msg=sock.recv(65536)ifnotmsg:breaksock.sendall(msg)('Client closed connection')sock.close()defecho_server(addr,nworkers):# Launch the client workersq=Queue()forninrange(nworkers):t=Thread(target=echo_client,args=(q,))t.daemon=Truet.start()# Run the serversock=socket(AF_INET,SOCK_STREAM)sock.bind(addr)sock.listen(5)whileTrue:client_sock,client_addr=sock.accept()q.put((client_sock,client_addr))echo_server(('',15000),128)
One advantage of using ThreadPoolExecutor over a manual
implementation is that it makes it easier for the submitter to receive
results from the called function. For example, you could write code
like this:
fromconcurrent.futuresimportThreadPoolExecutorimporturllib.requestdeffetch_url(url):u=urllib.request.urlopen(url)data=u.read()returndatapool=ThreadPoolExecutor(10)# Submit work to the poola=pool.submit(fetch_url,'http://www.python.org')b=pool.submit(fetch_url,'http://www.pypy.org')# Get the results backx=a.result()y=b.result()
The result objects in the example handle all of the blocking
and coordination needed to get data back from the worker thread.
Specifically, the operation a.result() blocks until the
corresponding function has been executed by the pool and returned a value.
Generally, you should avoid writing programs that allow unlimited growth in the number of threads. For example, take a look at the following server:
fromthreadingimportThreadfromsocketimportsocket,AF_INET,SOCK_STREAMdefecho_client(sock,client_addr):'''Handle a client connection'''('Got connection from',client_addr)whileTrue:msg=sock.recv(65536)ifnotmsg:breaksock.sendall(msg)('Client closed connection')sock.close()defecho_server(addr,nworkers):# Run the serversock=socket(AF_INET,SOCK_STREAM)sock.bind(addr)sock.listen(5)whileTrue:client_sock,client_addr=sock.accept()t=Thread(target=echo_client,args=(client_sock,client_addr))t.daemon=Truet.start()echo_server(('',15000))
Although this works, it doesn’t prevent some asynchronous hipster from launching an attack on the server that makes it create so many threads that your program runs out of resources and crashes (thus further demonstrating the “evils” of using threads). By using a pre-initialized thread pool, you can carefully put an upper limit on the amount of supported concurrency.
You might be concerned with the effect of creating a large number of threads. However, modern systems should have no trouble creating pools of a few thousand threads. Moreover, having a thousand threads just sitting around waiting for work isn’t going to have much, if any, impact on the performance of other code (a sleeping thread does just that—nothing at all). Of course, if all of those threads wake up at the same time and start hammering on the CPU, that’s a different story—especially in light of the Global Interpreter Lock (GIL). Generally, you only want to use thread pools for I/O-bound processing.
One possible concern with creating large thread pools might be memory
use. For example, if you create 2,000 threads on OS X, the system
shows the Python process using up more than 9 GB of virtual
memory. However, this is actually somewhat misleading. When creating
a thread, the operating system reserves a region of virtual
memory to hold the thread’s execution stack (often as large as 8 MB). Only a small
fragment of this memory is actually mapped to real memory, though.
Thus, if you look a bit closer, you might find the Python process is
using far less real memory (e.g., for 2,000 threads, only 70 MB of real
memory is used, not 9 GB). If the size of the virtual memory
is a concern, you can dial it down using the threading.stack_size()
function. For example:
importthreadingthreading.stack_size(65536)
If you add this call and repeat the experiment of creating 2,000 threads, you’ll find that the Python process is now only using about 210 MB of virtual memory, although the amount of real memory in use remains about the same. Note that the thread stack size must be at least 32,768 bytes, and is usually restricted to be a multiple of the system memory page size (4096, 8192, etc.).
You have a program that performs a lot of CPU-intensive work, and you want to make it run faster by having it take advantage of multiple CPUs.
The concurrent.futures library provides a ProcessPoolExecutor
class that can be used to execute computationally intensive functions
in a separately running instance of the Python interpreter. However,
in order to use it, you first need to have some computationally
intensive work. Let’s illustrate with a simple yet practical example.
Suppose you have an entire directory of gzip-compressed Apache web server logs:
logs/ 20120701.log.gz 20120702.log.gz 20120703.log.gz 20120704.log.gz 20120705.log.gz 20120706.log.gz ...
Further suppose each log file contains lines like this:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 - ...
Here is a simple script that takes this data and identifies all hosts that have accessed the robots.txt file:
# findrobots.pyimportgzipimportioimportglobdeffind_robots(filename):'''Find all of the hosts that access robots.txt in a single log file'''robots=set()withgzip.open(filename)asf:forlineinio.TextIOWrapper(f,encoding='ascii'):fields=line.split()iffields[6]=='/robots.txt':robots.add(fields[0])returnrobotsdeffind_all_robots(logdir):'''Find all hosts across and entire sequence of files'''files=glob.glob(logdir+'/*.log.gz')all_robots=set()forrobotsinmap(find_robots,files):all_robots.update(robots)returnall_robotsif__name__=='__main__':robots=find_all_robots('logs')foripaddrinrobots:(ipaddr)
The preceding program is written in the commonly used map-reduce style.
The function find_robots() is mapped across a collection of
filenames and the results are combined into a single result (the
all_robots set in the find_all_robots() function).
Now, suppose you want to modify this program to use multiple CPUs.
It turns out to be easy—simply replace the map() operation with a
similar operation carried out on a process pool from the
concurrent.futures library. Here is a slightly modified version of
the code:
# findrobots.pyimportgzipimportioimportglobfromconcurrentimportfuturesdeffind_robots(filename):'''Find all of the hosts that access robots.txt in a single log file'''robots=set()withgzip.open(filename)asf:forlineinio.TextIOWrapper(f,encoding='ascii'):fields=line.split()iffields[6]=='/robots.txt':robots.add(fields[0])returnrobotsdeffind_all_robots(logdir):'''Find all hosts across and entire sequence of files'''files=glob.glob(logdir+'/*.log.gz')all_robots=set()withfutures.ProcessPoolExecutor()aspool:forrobotsinpool.map(find_robots,files):all_robots.update(robots)returnall_robotsif__name__=='__main__':robots=find_all_robots('logs')foripaddrinrobots:(ipaddr)
With this modification, the script produces the same result but runs about 3.5 times faster on our quad-core machine. The actual performance will vary according to the number of CPUs available on your machine.
Typical usage of a ProcessPoolExecutor is as follows:
fromconcurrent.futuresimportProcessPoolExecutorwithProcessPoolExecutor()aspool:...doworkinparallelusingpool...
Under the covers, a ProcessPoolExecutor creates N independent
running Python interpreters where N is the number of available CPUs
detected on the system. You can change the number of processes
created by supplying an optional argument to ProcessPoolExecutor(N).
The pool runs until the last statement in the with block is
executed, at which point the process pool is shut down. However, the
program will wait until all submitted work has been processed.
Work to be submitted to a pool must be defined in a function. There are
two methods for submission. If you are are trying to parallelize a list
comprehension or a map() operation, you use pool.map():
# A function that performs a lot of workdefwork(x):...returnresult# Nonparallel coderesults=map(work,data)# Parallel implementationwithProcessPoolExecutor()aspool:results=pool.map(work,data)
Alternatively, you can manually submit single tasks using the pool.submit() method:
# Some functiondefwork(x):...returnresultwithProcessPoolExecutor()aspool:...# Example of submitting work to the poolfuture_result=pool.submit(work,arg)# Obtaining the result (blocks until done)r=future_result.result()...
If you manually submit a job, the result is an instance of Future. To obtain
the actual result, you call its result() method. This blocks until
the result is computed and returned by the pool.
Instead of blocking, you can also arrange to have a callback function triggered upon completion instead. For example:
defwhen_done(r):('Got:',r.result())withProcessPoolExecutor()aspool:future_result=pool.submit(work,arg)future_result.add_done_callback(when_done)
The user-supplied callback function receives an instance of Future
that must be used to obtain the actual result (i.e., by calling
its result() method).
Although process pools can be easy to use, there are a number of important considerations to be made in designing larger programs. In no particular order:
pickle. Work is carried out in a separate interpreter using
interprocess communication. Thus, data exchanged between
interpreters has to be serialized.
fork() system call on
Unix. This makes a clone of the Python interpreter, including all of
the program state at the time of the fork. On Windows, an independent
copy of the interpreter that does not clone state is launched. The
actual forking process does not occur until the first pool.map() or
pool.submit() method is called.
You’ve heard about the Global Interpreter Lock (GIL), and are worried that it might be affecting the performance of your multithreaded program.
Although Python fully supports thread programming, parts of the C implementation of the interpreter are not entirely thread safe to a level of allowing fully concurrent execution. In fact, the interpreter is protected by a so-called Global Interpreter Lock (GIL) that only allows one Python thread to execute at any given time. The most noticeable effect of the GIL is that multithreaded Python programs are not able to fully take advantage of multiple CPU cores (e.g., a computationally intensive application using more than one thread only runs on a single CPU).
Before discussing common GIL workarounds, it is important to emphasize that the GIL tends to only affect programs that are heavily CPU bound (i.e., dominated by computation). If your program is mostly doing I/O, such as network communication, threads are often a sensible choice because they’re mostly going to spend their time sitting around waiting. In fact, you can create thousands of Python threads with barely a concern. Modern operating systems have no trouble running with that many threads, so it’s simply not something you should worry much about.
For CPU-bound programs, you really need to study the nature of the computation being performed. For instance, careful choice of the underlying algorithm may produce a far greater speedup than trying to parallelize an unoptimal algorithm with threads. Similarly, given that Python is interpreted, you might get a far greater speedup simply by moving performance-critical code into a C extension module. Extensions such as NumPy are also highly effective at speeding up certain kinds of calculations involving array data. Last, but not least, you might investigate alternative implementations, such as PyPy, which features optimizations such as a JIT compiler (although, as of this writing, it does not yet support Python 3).
It’s also worth noting that threads are not necessarily used exclusively for performance. A CPU-bound program might be using threads to manage a graphical user interface, a network connection, or provide some other kind of service. In this case, the GIL can actually present more of a problem, since code that holds it for an excessively long period will cause annoying stalls in the non-CPU-bound threads. In fact, a poorly written C extension can actually make this problem worse, even though the computation part of the code might run faster than before.
Having said all of this, there are two common strategies for working around
the limitations of the GIL. First, if you are working entirely in Python,
you can use the multiprocessing module to create a process pool
and use it like a co-processor. For example, suppose you have
the following thread code:
# Performs a large calculation (CPU bound)defsome_work(args):...returnresult# A thread that calls the above functiondefsome_thread():whileTrue:...r=some_work(args)...
Here’s how you would modify the code to use a pool:
# Processing pool (see below for initiazation)pool=None# Performs a large calculation (CPU bound)defsome_work(args):...returnresult# A thread that calls the above functiondefsome_thread():whileTrue:...r=pool.apply(some_work,(args))...# Initiaze the poolif__name__=='__main__':importmultiprocessingpool=multiprocessing.Pool()
This example with a pool works around the GIL using a neat trick. Whenever a thread wants to perform CPU-intensive work, it hands the work to the pool. The pool, in turn, hands the work to a separate Python interpreter running in a different process. While the thread is waiting for the result, it releases the GIL. Moreover, because the calculation is being performed in a separate interpreter, it’s no longer bound by the restrictions of the GIL. On a multicore system, you’ll find that this technique easily allows you to take advantage of all the CPUs.
The second strategy for working around the GIL is to focus on C extension programming. The general idea is to move computationally intensive tasks to C, independent of Python, and have the C code release the GIL while it’s working. This is done by inserting special macros into the C code like this:
#include "Python.h"...PyObject*pyfunc(PyObject*self,PyObject*args){...Py_BEGIN_ALLOW_THREADS// Threaded C code...Py_END_ALLOW_THREADS...}
If you are using other tools to access C, such as the ctypes library
or Cython, you may not need to do anything. For example,
ctypes releases the GIL when calling into C by default.
Many programmers, when faced with thread performance problems, are quick to blame the GIL for all of their ills. However, doing so is shortsighted and naive. Just as a real-world example, mysterious “stalls” in a multithreaded network program might be caused by something entirely different (e.g., a stalled DNS lookup) rather than anything related to the GIL. The bottom line is that you really need to study your code to know if the GIL is an issue or not. Again, realize that the GIL is mostly concerned with CPU-bound processing, not I/O.
If you are going to use a process pool as a workaround, be aware that
doing so involves data serialization and communication with a
different Python interpreter. For this to work, the operation to be
performed needs to be contained within a Python function defined by
the def statement (i.e., no lambdas, closures, callable instances,
etc.), and the function arguments and return value must be compatible
with pickle. Also, the amount of work to be performed must be
sufficiently large to make up for the extra communication overhead.
Another subtle aspect of pools is that mixing threads and process pools together can be a good way to make your head explode. If you are going to use both of these features together, it is often best to create the process pool as a singleton at program startup, prior to the creation of any threads. Threads will then use the same process pool for all of their computationally intensive work.
For C extensions, the most important feature is maintaining isolation from the Python interpreter process. That is, if you’re going to offload work from Python to C, you need to make sure the C code operates independently of Python. This means using no Python data structures and making no calls to Python’s C API. Another consideration is that you want to make sure your C extension does enough work to make it all worthwhile. That is, it’s much better if the extension can perform millions of calculations as opposed to just a few small calculations.
Needless to say, these solutions to working around the GIL don’t apply to all possible problems. For instance, certain kinds of applications don’t work well if separated into multiple processes, nor may you want to code parts in C. For these kinds of applications, you may have to come up with your own solution (e.g., multiple processes accessing shared memory regions, multiple interpreters running in the same process, etc.). Alternatively, you might look at some other implementations of the interpreter, such as PyPy.
See Recipes 15.7 and 15.10 for additional information on releasing the GIL in C extensions.
The “actor model” is one of the oldest and most simple approaches to concurrency and distributed computing. In fact, its underlying simplicity is part of its appeal. In a nutshell, an actor is a concurrently executing task that simply acts upon messages sent to it. In response to these messages, it may decide to send further messages to other actors. Communication with actors is one way and asynchronous. Thus, the sender of a message does not know when a message actually gets delivered, nor does it receive a response or acknowledgment that the message has been processed.
Actors are straightforward to define using a combination of a thread and a queue. For example:
fromqueueimportQueuefromthreadingimportThread,Event# Sentinel used for shutdownclassActorExit(Exception):passclassActor:def__init__(self):self._mailbox=Queue()defsend(self,msg):'''Send a message to the actor'''self._mailbox.put(msg)defrecv(self):'''Receive an incoming message'''msg=self._mailbox.get()ifmsgisActorExit:raiseActorExit()returnmsgdefclose(self):'''Close the actor, thus shutting it down'''self.send(ActorExit)defstart(self):'''Start concurrent execution'''self._terminated=Event()t=Thread(target=self._bootstrap)t.daemon=Truet.start()def_bootstrap(self):try:self.run()exceptActorExit:passfinally:self._terminated.set()defjoin(self):self._terminated.wait()defrun(self):'''Run method to be implemented by the user'''whileTrue:msg=self.recv()# Sample ActorTaskclassPrintActor(Actor):defrun(self):whileTrue:msg=self.recv()('Got:',msg)# Sample usep=PrintActor()p.start()p.send('Hello')p.send('World')p.close()p.join()
In this example, Actor instances are things that you simply send a
message to using their send() method. Under the covers, this places
the message on a queue and hands it off to an internal thread that
runs to process the received messages. The close() method is
programmed to shut down the actor by placing a special sentinel value (ActorExit)
on the queue. Users define new actors by inheriting from Actor and
redefining the run() method to implement their custom processing.
The usage of the ActorExit exception is such that user-defined code
can be programmed to catch the termination request and handle it if
appropriate (the exception is raised by the get() method and
propagated).
If you relax the requirement of concurrent and asynchronous message delivery, actor-like objects can also be minimally defined by generators. For example:
defprint_actor():whileTrue:try:msg=yield# Get a message('Got:',msg)exceptGeneratorExit:('Actor terminating')# Sample usep=print_actor()next(p)# Advance to the yield (ready to receive)p.send('Hello')p.send('World')p.close()
Part of the appeal of actors is their underlying simplicity. In
practice, there is just one core operation, send(). Plus, the
general concept of a “message” in actor-based systems is something
that can be expanded in many different directions. For example, you
could pass tagged messages in the form of tuples and have actors take
different courses of action like this:
classTaggedActor(Actor):defrun(self):whileTrue:tag,*payload=self.recv()getattr(self,'do_'+tag)(*payload)# Methods correponding to different message tagsdefdo_A(self,x):('Running A',x)defdo_B(self,x,y):('Running B',x,y)# Examplea=TaggedActor()a.start()a.send(('A',1))# Invokes do_A(1)a.send(('B',2,3))# Invokes do_B(2,3)
As another example, here is a variation of an actor that allows
arbitrary functions to be executed in a worker and results to be
communicated back using a special Result object:
fromthreadingimportEventclassResult:def__init__(self):self._evt=Event()self._result=Nonedefset_result(self,value):self._result=valueself._evt.set()defresult(self):self._evt.wait()returnself._resultclassWorker(Actor):defsubmit(self,func,*args,**kwargs):r=Result()self.send((func,args,kwargs,r))returnrdefrun(self):whileTrue:func,args,kwargs,r=self.recv()r.set_result(func(*args,**kwargs))# Example useworker=Worker()worker.start()r=worker.submit(pow,2,3)(r.result())
Last, but not least, the concept of “sending” a task a message is
something that can be scaled up into systems involving multiple
processes or even large distributed systems. For example,
the send() method of an actor-like object could be programmed
to transmit data on a socket connection or deliver it via
some kind of messaging infrastructure (e.g., AMQP, ZMQ, etc.).
You have a program based on communicating threads and want them to implement publish/subscribe messaging.
To implement publish/subscribe messaging, you typically introduce a separate “exchange” or “gateway” object that acts as an intermediary for all messages. That is, instead of directly sending a message from one task to another, a message is sent to the exchange and it delivers it to one or more attached tasks. Here is one example of a very simple exchange implementation:
fromcollectionsimportdefaultdictclassExchange:def__init__(self):self._subscribers=set()defattach(self,task):self._subscribers.add(task)defdetach(self,task):self._subscribers.remove(task)defsend(self,msg):forsubscriberinself._subscribers:subscriber.send(msg)# Dictionary of all created exchanges_exchanges=defaultdict(Exchange)# Return the Exchange instance associated with a given namedefget_exchange(name):return_exchanges[name]
An exchange is really nothing more than an object that keeps a set of
active subscribers and provides methods for attaching,
detaching, and sending messages. Each exchange
is identified by a name, and the get_exchange() function simply returns
the Exchange instance associated with a given name.
Here is a simple example that shows how to use an exchange:
# Example of a task. Any object with a send() methodclassTask:...defsend(self,msg):...task_a=Task()task_b=Task()# Example of getting an exchangeexc=get_exchange('name')# Examples of subscribing tasks to itexc.attach(task_a)exc.attach(task_b)# Example of sending messagesexc.send('msg1')exc.send('msg2')# Example of unsubscribingexc.detach(task_a)exc.detach(task_b)
Although there are many different variations on this theme, the overall idea is the same. Messages will be delivered to an exchange and the exchange will deliver them to attached subscribers.
The concept of tasks or threads sending messages to one another (often via queues) is easy to implement and quite popular. However, the benefits of using a public/subscribe (pub/sub) model instead are often overlooked.
First, the use of an exchange can simplify much of the plumbing
involved in setting up communicating threads. Instead of trying to
wire threads together across multiple program modules, you only worry
about connecting them to a known exchange. In some sense, this is
similar to how the logging library works. In practice, it can make
it easier to decouple various tasks in the program.
Second, the ability of the exchange to broadcast messages to multiple subscribers opens up new communication patterns. For example, you could implement systems with redundant tasks, broadcasting, or fan-out. You could also build debugging and diagnostic tools that attach themselves to exchanges as ordinary subscribers. For example, here is a simple diagnostic class that would display sent messages:
classDisplayMessages:def__init__(self):self.count=0defsend(self,msg):self.count+=1('msg[{}]: {!r}'.format(self.count,msg))exc=get_exchange('name')d=DisplayMessages()exc.attach(d)
Last, but not least, a notable aspect of the implementation is that it
works with a variety of task-like objects. For example, the receivers
of a message could be actors (as described in Recipe 12.10),
coroutines, network connections, or just about anything that
implements a proper send() method.
One potentially problematic aspect of an exchange concerns the proper attachment and detachment of subscribers. In order to properly manage resources, every subscriber that attaches must eventually detach. This leads to a programming model similar to this:
exc=get_exchange('name')exc.attach(some_task)try:...finally:exc.detach(some_task)
In some sense, this is similar to the usage of files, locks, and
similar objects. Experience has shown that it is quite easy to
forget the final detach() step. To simplify this, you might
consider the use of the context-management protocol. For example,
adding a subscribe() method to the exchange like this:
fromcontextlibimportcontextmanagerfromcollectionsimportdefaultdictclassExchange:def__init__(self):self._subscribers=set()defattach(self,task):self._subscribers.add(task)defdetach(self,task):self._subscribers.remove(task)@contextmanagerdefsubscribe(self,*tasks):fortaskintasks:self.attach(task)try:yieldfinally:fortaskintasks:self.detach(task)defsend(self,msg):forsubscriberinself._subscribers:subscriber.send(msg)# Dictionary of all created exchanges_exchanges=defaultdict(Exchange)# Return the Exchange instance associated with a given namedefget_exchange(name):return_exchanges[name]# Example of using the subscribe() methodexc=get_exchange('name')withexc.subscribe(task_a,task_b):...exc.send('msg1')exc.send('msg2')...# task_a and task_b detached here
Finally, it should be noted that there are numerous possible extensions to the exchange idea. For example, exchanges could implement an entire collection of message channels or apply pattern matching rules to exchange names. Exchanges can also be extended into distributed computing applications (e.g., routing messages to tasks on different machines, etc.).
You want to implement concurrency using generators (coroutines) as an alternative to system threads. This is sometimes known as user-level threading or green threading.
To implement your own concurrency using generators, you first need a
fundamental insight concerning generator functions and the yield
statement. Specifically, the fundamental behavior of yield is that
it causes a generator to suspend its execution. By suspending
execution, it is possible to write a scheduler that treats generators
as a kind of “task” and alternates their execution using a kind of
cooperative task switching.
To illustrate this idea, consider the following two generator
functions using a simple yield:
# Two simple generator functionsdefcountdown(n):whilen>0:('T-minus',n)yieldn-=1('Blastoff!')defcountup(n):x=0whilex<n:('Counting up',x)yieldx+=1
These functions probably look a bit funny using yield all by itself.
However, consider the following code that implements a simple task
scheduler:
fromcollectionsimportdequeclassTaskScheduler:def__init__(self):self._task_queue=deque()defnew_task(self,task):'''Admit a newly started task to the scheduler'''self._task_queue.append(task)defrun(self):'''Run until there are no more tasks'''whileself._task_queue:task=self._task_queue.popleft()try:# Run until the next yield statementnext(task)self._task_queue.append(task)exceptStopIteration:# Generator is no longer executingpass# Example usesched=TaskScheduler()sched.new_task(countdown(10))sched.new_task(countdown(5))sched.new_task(countup(15))sched.run()
In this code, the TaskScheduler class runs a collection of
generators in a round-robin manner—each one running until they reach
a yield statement. For the sample, the output will be as follows:
T-minus 10 T-minus 5 Counting up 0 T-minus 9 T-minus 4 Counting up 1 T-minus 8 T-minus 3 Counting up 2 T-minus 7 T-minus 2 ...
At this point, you’ve essentially implemented the tiny core of an
“operating system” if you will. Generator functions are the tasks
and the yield statement is how tasks signal that they want to
suspend. The scheduler simply cycles over the tasks until none
are left executing.
In practice, you probably wouldn’t use generators to implement concurrency for something as simple as shown. Instead, you might use generators to replace the use of threads when implementing actors (see Recipe 12.10) or network servers.
The following code illustrates the use of generators to implement a thread-free version of actors:
fromcollectionsimportdequeclassActorScheduler:def__init__(self):self._actors={}# Mapping of names to actorsself._msg_queue=deque()# Message queuedefnew_actor(self,name,actor):'''Admit a newly started actor to the scheduler and give it a name'''self._msg_queue.append((actor,None))self._actors[name]=actordefsend(self,name,msg):'''Send a message to a named actor'''actor=self._actors.get(name)ifactor:self._msg_queue.append((actor,msg))defrun(self):'''Run as long as there are pending messages.'''whileself._msg_queue:actor,msg=self._msg_queue.popleft()try:actor.send(msg)exceptStopIteration:pass# Example useif__name__=='__main__':defprinter():whileTrue:msg=yield('Got:',msg)defcounter(sched):whileTrue:# Receive the current countn=yieldifn==0:break# Send to the printer tasksched.send('printer',n)# Send the next count to the counter task (recursive)sched.send('counter',n-1)sched=ActorScheduler()# Create the initial actorssched.new_actor('printer',printer())sched.new_actor('counter',counter(sched))# Send an initial message to the counter to initiatesched.send('counter',10000)sched.run()
The execution of this code might take a bit of study, but the
key is the queue of pending messages. Essentially, the scheduler
runs as long as there are messages to deliver. A remarkable
feature is that the counter generator sends messages to itself and ends
up in a recursive cycle not bound by Python’s recursion limit.
Here is an advanced example showing the use of generators to implement a concurrent network application:
fromcollectionsimportdequefromselectimportselect# This class represents a generic yield event in the schedulerclassYieldEvent:defhandle_yield(self,sched,task):passdefhandle_resume(self,sched,task):pass# Task SchedulerclassScheduler:def__init__(self):self._numtasks=0# Total num of tasksself._ready=deque()# Tasks ready to runself._read_waiting={}# Tasks waiting to readself._write_waiting={}# Tasks waiting to write# Poll for I/O events and restart waiting tasksdef_iopoll(self):rset,wset,eset=select(self._read_waiting,self._write_waiting,[])forrinrset:evt,task=self._read_waiting.pop(r)evt.handle_resume(self,task)forwinwset:evt,task=self._write_waiting.pop(w)evt.handle_resume(self,task)defnew(self,task):'''Add a newly started task to the scheduler'''self._ready.append((task,None))self._numtasks+=1defadd_ready(self,task,msg=None):'''Append an already started task to the ready queue.msg is what to send into the task when it resumes.'''self._ready.append((task,msg))# Add a task to the reading setdef_read_wait(self,fileno,evt,task):self._read_waiting[fileno]=(evt,task)# Add a task to the write setdef_write_wait(self,fileno,evt,task):self._write_waiting[fileno]=(evt,task)defrun(self):'''Run the task scheduler until there are no tasks'''whileself._numtasks:ifnotself._ready:self._iopoll()task,msg=self._ready.popleft()try:# Run the coroutine to the next yieldr=task.send(msg)ifisinstance(r,YieldEvent):r.handle_yield(self,task)else:raiseRuntimeError('unrecognized yield event')exceptStopIteration:self._numtasks-=1# Example implementation of coroutine-based socket I/OclassReadSocket(YieldEvent):def__init__(self,sock,nbytes):self.sock=sockself.nbytes=nbytesdefhandle_yield(self,sched,task):sched._read_wait(self.sock.fileno(),self,task)defhandle_resume(self,sched,task):data=self.sock.recv(self.nbytes)sched.add_ready(task,data)classWriteSocket(YieldEvent):def__init__(self,sock,data):self.sock=sockself.data=datadefhandle_yield(self,sched,task):sched._write_wait(self.sock.fileno(),self,task)defhandle_resume(self,sched,task):nsent=self.sock.send(self.data)sched.add_ready(task,nsent)classAcceptSocket(YieldEvent):def__init__(self,sock):self.sock=sockdefhandle_yield(self,sched,task):sched._read_wait(self.sock.fileno(),self,task)defhandle_resume(self,sched,task):r=self.sock.accept()sched.add_ready(task,r)# Wrapper around a socket object for use with yieldclassSocket(object):def__init__(self,sock):self._sock=sockdefrecv(self,maxbytes):returnReadSocket(self._sock,maxbytes)defsend(self,data):returnWriteSocket(self._sock,data)defaccept(self):returnAcceptSocket(self._sock)def__getattr__(self,name):returngetattr(self._sock,name)if__name__=='__main__':fromsocketimportsocket,AF_INET,SOCK_STREAMimporttime# Example of a function involving generators. This should# be called using line = yield from readline(sock)defreadline(sock):chars=[]whileTrue:c=yieldsock.recv(1)ifnotc:breakchars.append(c)ifc==b'\n':breakreturnb''.join(chars)# Echo server using generatorsclassEchoServer:def__init__(self,addr,sched):self.sched=schedsched.new(self.server_loop(addr))defserver_loop(self,addr):s=Socket(socket(AF_INET,SOCK_STREAM))s.bind(addr)s.listen(5)whileTrue:c,a=yields.accept()('Got connection from ',a)self.sched.new(self.client_handler(Socket(c)))defclient_handler(self,client):whileTrue:line=yield fromreadline(client)ifnotline:breakline=b'GOT:'+linewhileline:nsent=yieldclient.send(line)line=line[nsent:]client.close()('Client closed')sched=Scheduler()EchoServer(('',16000),sched)sched.run()
This code will undoubtedly require a certain amount of careful study. However, it is essentially implementing a small operating system. There is a queue of tasks ready to run and there are waiting areas for tasks sleeping for I/O. Much of the scheduler involves moving tasks between the ready queue and the I/O waiting area.
When building generator-based concurrency frameworks, it is most
common to work with the more general form of yield:
defsome_generator():...result=yielddata...
Functions that use yield in this manner are more generally referred
to as “coroutines.” Within a scheduler, the yield statement gets
handled in a loop as follows:
f=some_generator()# Initial result. Is None to start since nothing has been computedresult=NonewhileTrue:try:data=f.send(result)result=...dosomecalculation...exceptStopIteration:break
The logic concerning the result is a bit convoluted. However, the
value passed to send() defines what gets returned when the yield
statement wakes back up. So, if a yield is going to return a result
in response to data that was previously yielded, it gets returned on the next
send() operation. If a generator function has just started,
sending in a value of None simply makes it advance to the first yield
statement.
In addition to sending in values, it is also possible to execute a
close() method on a generator. This causes a silent GeneratorExit
exception to be raised at the yield statement, which stops execution. If desired, a generator can catch this exception and perform
cleanup actions. It’s also possible to use the throw() method of
a generator to raise an arbitrary execution at the yield statement.
A task scheduler might use this to communicate errors into running
generators.
The yield from statement used in the last example is used to
implement coroutines that serve as subroutines or procedures to be
called from other generators. Essentially, control transparently
transfers to the new function. Unlike normal generators, a function
that is called using yield from can return a value that becomes the
result of the yield from statement. More information about yield
from can be found in PEP 380.
Finally, if programming with generators, it is important to stress that there are some major limitations. In particular, you get none of the benefits that threads provide. For instance, if you execute any code that is CPU bound or which blocks for I/O, it will suspend the entire task scheduler until the completion of that operation. To work around this, your only real option is to delegate the operation to a separate thread or process where it can run independently. Another limitation is that most Python libraries have not been written to work well with generator-based threading. If you take this approach, you may find that you need to write replacements for many standard library functions.
As basic background on coroutines and the techniques utilized in this recipe, see PEP 342 and “A Curious Course on Coroutines and Concurrency”.
PEP 3156 also has a modern take on asynchronous I/O involving coroutines. In practice, it is extremelyunlikely that you will write a low-level coroutine scheduler yourself. However, ideas surrounding coroutines are the basis for many popular libraries, including gevent, greenlet, Stackless Python, and similar projects.
You have a collection of thread queues, and you would like to be able to poll them for incoming items, much in the same way as you might poll a collection of network connections for incoming data.
A common solution to polling problems involves a little-known trick
involving a hidden loopback network connection. Essentially, the idea
is as follows: for each queue (or any object) that you want to poll,
you create a pair of connected sockets. You then write on one of the
sockets to signal the presence of data. The other sockect is then
passed to select() or a similar function to poll for the arrival of
data. Here is some sample code that illustrates this idea:
importqueueimportsocketimportosclassPollableQueue(queue.Queue):def__init__(self):super().__init__()# Create a pair of connected socketsifos.name=='posix':self._putsocket,self._getsocket=socket.socketpair()else:# Compatibility on non-POSIX systemsserver=socket.socket(socket.AF_INET,socket.SOCK_STREAM)server.bind(('127.0.0.1',0))server.listen(1)self._putsocket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)self._putsocket.connect(server.getsockname())self._getsocket,_=server.accept()server.close()deffileno(self):returnself._getsocket.fileno()defput(self,item):super().put(item)self._putsocket.send(b'x')defget(self):self._getsocket.recv(1)returnsuper().get()
In this code, a new kind of Queue instance is defined where
there is an underlying pair of connected sockets. The socketpair()
function on Unix machines can establish such sockets easily. On
Windows, you have to fake it using code similar to that shown (it looks
a bit weird, but a server socket is created and a client immediately
connects to it afterward). The normal get() and put() methods
are then redefined slightly to perform a small bit of I/O on these
sockets. The put() method writes a single byte of data to one of
the sockets after putting data on the queue. The get() method reads
a single byte of data from the other socket when removing an item from
the queue.
The fileno() method is what makes the queue pollable using a
function such as select(). Essentially, it just exposes the
underlying file descriptor of the socket used by the get()
function.
Here is an example of some code that defines a consumer which monitors multiple queues for incoming items:
importselectimportthreadingdefconsumer(queues):'''Consumer that reads data on multiple queues simultaneously'''whileTrue:can_read,_,_=select.select(queues,[],[])forrincan_read:item=r.get()('Got:',item)q1=PollableQueue()q2=PollableQueue()q3=PollableQueue()t=threading.Thread(target=consumer,args=([q1,q2,q3],))t.daemon=Truet.start()# Feed data to the queuesq1.put(1)q2.put(10)q3.put('hello')q2.put(15)...
If you try it, you’ll find that the consumer indeed receives all of the put items, regardless of which queues they are placed in.
The problem of polling non-file-like objects, such as queues, is often a lot trickier than it looks. For instance, if you don’t use the socket technique shown, your only option is to write code that cycles through the queues and uses a timer, like this:
importtimedefconsumer(queues):whileTrue:forqinqueues:ifnotq.empty():item=q.get()('Got:',item)# Sleep briefly to avoid 100% CPUtime.sleep(0.01)
This might work for certain kinds of problems, but it’s clumsy and introduces other weird performance problems. For example, if new data is added to a queue, it won’t be detected for as long as 10 milliseconds (an eternity on a modern processor).
You run into even further problems if the preceding polling is mixed with the polling of other objects, such as network sockets. For example, if you want to poll both sockets and queues at the same time, you might have to use code like this:
importselectdefevent_loop(sockets,queues):whileTrue:# polling with a timeoutcan_read,_,_=select.select(sockets,[],[],0.01)forrincan_read:handle_read(r)forqinqueues:ifnotq.empty():item=q.get()('Got:',item)
The solution shown solves a lot of these problems by simply putting
queues on equal status with sockets. A single select() call can be
used to poll for activity on both. It is not necessary to use
timeouts or other time-based hacks to periodically check. Moreover,
if data gets added to a queue, the consumer will be notified almost
instantaneously. Although there is a tiny amount of overhead
associated with the underlying I/O, it often is worth it to have better
response time and simplified coding.
You would like to write a program that runs as a proper daemon process on Unix or Unix-like systems.
Creating a proper daemon process requires a precise sequence of system calls and careful attention to detail. The following code shows how to define a daemon process along with the ability to easily stop it once launched:
#!/usr/bin/env python3# daemon.pyimportosimportsysimportatexitimportsignaldefdaemonize(pidfile,*,stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'):ifos.path.exists(pidfile):raiseRuntimeError('Already running')# First fork (detaches from parent)try:ifos.fork()>0:raiseSystemExit(0)# Parent exitexceptOSErrorase:raiseRuntimeError('fork #1 failed.')os.chdir('/')os.umask(0)os.setsid()# Second fork (relinquish session leadership)try:ifos.fork()>0:raiseSystemExit(0)exceptOSErrorase:raiseRuntimeError('fork #2 failed.')# Flush I/O bufferssys.stdout.flush()sys.stderr.flush()# Replace file descriptors for stdin, stdout, and stderrwithopen(stdin,'rb',0)asf:os.dup2(f.fileno(),sys.stdin.fileno())withopen(stdout,'ab',0)asf:os.dup2(f.fileno(),sys.stdout.fileno())withopen(stderr,'ab',0)asf:os.dup2(f.fileno(),sys.stderr.fileno())# Write the PID filewithopen(pidfile,'w')asf:(os.getpid(),file=f)# Arrange to have the PID file removed on exit/signalatexit.register(lambda:os.remove(pidfile))# Signal handler for termination (required)defsigterm_handler(signo,frame):raiseSystemExit(1)signal.signal(signal.SIGTERM,sigterm_handler)defmain():importtimesys.stdout.write('Daemon started with pid {}\n'.format(os.getpid()))whileTrue:sys.stdout.write('Daemon Alive! {}\n'.format(time.ctime()))time.sleep(10)if__name__=='__main__':PIDFILE='/tmp/daemon.pid'iflen(sys.argv)!=2:('Usage: {} [start|stop]'.format(sys.argv[0]),file=sys.stderr)raiseSystemExit(1)ifsys.argv[1]=='start':try:daemonize(PIDFILE,stdout='/tmp/daemon.log',stderr='/tmp/dameon.log')exceptRuntimeErrorase:(e,file=sys.stderr)raiseSystemExit(1)main()elifsys.argv[1]=='stop':ifos.path.exists(PIDFILE):withopen(PIDFILE)asf:os.kill(int(f.read()),signal.SIGTERM)else:('Not running',file=sys.stderr)raiseSystemExit(1)else:('Unknown command {!r}'.format(sys.argv[1]),file=sys.stderr)raiseSystemExit(1)
To launch the daemon, the user would use a command like this:
bash % daemon.py start bash % cat /tmp/daemon.pid 2882 bash % tail -f /tmp/daemon.log Daemon started with pid 2882 Daemon Alive! Fri Oct 12 13:45:37 2012 Daemon Alive! Fri Oct 12 13:45:47 2012 ...
Daemon processes run entirely in the background, so the command returns immediately. However, you can view its associated pid file and log, as just shown. To stop the daemon, use:
bash % daemon.py stop bash %
This recipe defines a function daemonize() that should be called at
program startup to make the program run as a daemon. The signature
to daemonize() is using keyword-only arguments to make the purpose
of the optional arguments more clear when used. This forces
the user to use a call such as this:
daemonize('daemon.pid',stdin='/dev/null,stdout='/tmp/daemon.log',stderr='/tmp/daemon.log')
As opposed to a more cryptic call such as:
# Illegal. Must use keyword argumentsdaemonize('daemon.pid','/dev/null','/tmp/daemon.log','/tmp/daemon.log')
The steps involved in creating a daemon are fairly cryptic, but the
general idea is as follows. First, a daemon has to detach itself
from its parent process. This is the purpose of the first os.fork()
operation and immediate termination by the parent.
After the child has been orphaned, the call to os.setsid()
creates an entirely new process session and sets the
child as the leader. This also sets the child as the leader of a new
process group and makes sure there is no controlling terminal.
If this all sounds a bit too magical, it has to do with getting the
daemon to detach properly from the terminal and making sure that
things like signals don’t interfere with its operation.
The calls to os.chdir() and os.umask(0) change the current working
directory and reset the file mode mask. Changing the directory is
usually a good idea so that the daemon is no longer working in the
directory from which it was launched.
The second call to os.fork() is by far the more mysterious operation
here. This step makes the daemon process give up the ability
to acquire a new controlling terminal and provides even more isolation
(essentially, the daemon gives up its session leadership and thus no
longer has the permission to open controlling terminals). Although
you could probably omit this step, it’s typically recommended.
Once the daemon process has been properly detached, it performs steps
to reinitialize the standard I/O streams to point at files specified
by the user. This part is actually somewhat tricky.
References to file objects associated with the standard I/O streams
are found in multiple places in the interpreter (sys.stdout,
sys.__stdout__, etc.). Simply closing sys.stdout and reassigning
it is not likely to work correctly, because there’s no way to know if
it will fix all uses of sys.stdout. Instead, a separate file object
is opened, and the os.dup2() call is used to have it replace the file
descriptor currently being used by sys.stdout. When this happens,
the original file for sys.stdout will be closed and the new one
takes its place. It must be emphasized that any file encoding or text
handling already applied to the standard I/O streams will remain in
place.
A common practice with daemon processes is to write the process ID
of the daemon in a file for later use by other programs. The last part of the
daemonize() function writes this file, but also arranges to have
the file removed on program termination. The atexit.register() function
registers a function to execute when the Python interpreter terminates.
The definition of a signal handler for SIGTERM is also required for
a graceful termination. The signal handler merely raises SystemExit()
and nothing more. This might look unnecessary, but without it,
termination signals kill the interpreter without performing the cleanup
actions registered with atexit.register(). An example of
code that kills the daemon can be found in the handling of the
stop command at the end of the program.
More information about writing daemon processes can be found in Advanced Programming in the UNIX Environment, 2nd Edition, by W. Richard Stevens and Stephen A. Rago (Addison-Wesley, 2005). Although focused on C programming, all of the material is easily adapted to Python, since all of the required POSIX functions are available in the standard library.