< BACKMake Note | BookmarkCONTINUE >
156135250194107072078175030179198180025031194137176049106218111004229223066050004198044014

threading Module

We will now introduce the higher-level threading module which gives you not only a Thread class but also a wide variety of synchronization mechanisms to use to your heart's content. Table 17.2 represents a list of all the objects which are provided for in the threading module.

Table 17.2. threading Module Objects
threading Module Objects Description
Thread object which represents a single thread of execution
Lock primitive lock object (same lock object as in the thread module)
RLock re-entrant lock object provides ability for a single thread to (re)acquire an already-held lock (recursive locking)
Condition condition variable object causes one thread to wait until a certain "condition" has been satisfied by another thread, such as changing of state or of some data value
Event general version of condition variables whereby any number of threads are waiting for some event to occur and all will awaken when the event happens
Semaphore provides a "waiting area"-like structure for threads waiting on a lock

In this section, we will examine how to use the Thread class to implement threading. Since we have already covered the basics of locking, we will not cover the locking primitives here. The Thread() class also contains a form of synchronization, so explicit use of locking primitives is not necessary.

Thread Class

There are a variety of ways you can create threads using the Thread class. We cover three of them here, all quite similar. Pick the one you feel most comfortable with, not to mention the most appropriate for your application and future scalability (we like choice 3 the best):

  1. Create Thread instance, passing in function

  2. Create Thread instance, passing in callable class instance

  3. Subclass Thread and create subclass instance

Create Thread instance, passing in function

In our first example, we will just instantiate Thread, passing in our function (and its arguments) in a manner similar to our previous examples. This function is what will be executed when we direct the thread to begin execution. Taking our mtsleep2.py script and tweaking it, adding the use of Thread objects, we have mtsleep3.py, shown in Example 17.4.

When we run it, we see output similar to its predecessors':

							
% mtsleep3.py
starting threads…
start loop 0 at: Sun Aug 13 18:16:38 2000
start loop 1 at: Sun Aug 13 18:16:38 2000
loop 1 done at: Sun Aug 13 18:16:40 2000
loop 0 done at: Sun Aug 13 18:16:42 2000
all DONE at: Sun Aug 13 18:16:42 2000

						

So what did change? Gone are the locks which we had to implement when using the thread module. Instead, we create a set of Thread objects. When each Thread is instantiated, we dutifully pass in the function (target) and arguments (args) and receive a Thread instance in return. The biggest difference between instantiating Thread [calling Thread()] and invoking thread.start_new_thread() is that the new thread does not begin execution right away. This is a useful synchronization feature, especially when you don't want the threads to start immediately.

Example 17.4. Using the threading Module (mtsleep3.py)

The Thread class from the threading module has a join() method which lets the main thread wait for thread completion.

 <$nopage>
001 1  #!/usr/bin/env python
002 2
003 3  import threading
004 4  from time import sleep, time, ctime
005 5
006 6  loops = [ 4, 2 ]
007 7
008 8  def loop(nloop, nsec):
009 9      print 'start loop', nloop, 'at:', ctime(time())
010 10     sleep(nsec)
011 11     print 'loop', nloop, 'done at:', ctime(time())
012 12
013 13 def main():
014 14     print 'starting threads…'
015 15     threads = []
016 16     nloops = range(len(loops))
017 17
018 18     for i in nloops:
019 19         t = threading.Thread(target=loop,
020 20             args=(i, loops[i]))
021 21         threads.append(t)
022 22
023 23     for i in nloops:          # start threads
024 24         threads[i].start()
025 25
026 26     for i in nloops:          # wait for all
027 27         threads[i].join(       # threads to finish
028 28
029 29     print 'all DONE at:', ctime(time())
030 30
031 31 if __name__ == '__main__':
032 32     main()
033  <$nopage>

Once all the threads have been allocated, we let them go off to the races by invoking each thread's start() method, but not a moment before that. And rather than having to manage a set of locks (allocating, acquiring, releasing, checking lock state, etc.), we simply call the join() method for each thread. join() will wait until a thread terminates, or, if provided, a timeout occurs. Use of join() appears much cleaner than an infinite loop waiting for locks to be released (causing these locks to sometimes be known as "spin locks").

One other important aspect of join() is that it does not need to be called at all. Once threads are started, they will execute until their given function completes, whereby they will exit. If your main thread has things to do other than wait for threads to complete (such as other processing or waiting for new client requests), it should be all means do so. join() is useful only when you want to wait for thread completion.

Create Thread instance, passing in callable class instance

A similar offshoot to passing in a function when creating a thread is to have a callable class and passing in an instance for execution—this is the more OO approach to MT programming. Such a callable class embodies an execution environment that is much more flexible than a function or choosing from a set of functions. You now have the power of a class object behind you, as opposed to a single function or a list/tuple of functions.

Adding our new class ThreadFunc to the code and making other slight modifications to mtsleep3.py, we get mtsleep4.py, given in Example 17.5.

If we run mtsleep4.py, we get the expected output:

							
% mtsleep4.py
starting threads…
start loop 0 at: Sun Aug 13 18:49:17 2000
start loop 1 at: Sun Aug 13 18:49:17 2000
loop 1 done at: Sun Aug 13 18:49:19 2000
loop 0 done at: Sun Aug 13 18:49:21 2000
all DONE at: Sun Aug 13 18:49:21 2000

						

So what are the changes this time? The addition of the ThreadFunc class and a minor change to instantiate the Thread object, which also instantiates ThreadFunc, our callable class. In effect, we have a double instantiation going on here. Let's take a closer look at our ThreadFunc class.

We want to make this class general enough to use with other functions besides our loop() function, so we added some new infrastructure, such as having this class hold the arguments for the function, the function itself, and also a function name string. The constructor __init__() just sets all the values.

Example 17.5. Using Callable classes (mtsleep4.py)

In this example we pass in a callable class (instance) as opposed to just a function. It presents more of an OO approach than mtsleep3.py.

 <$nopage>
001 1  #!/usr/bin/env python
002 2
003 3  import threading
004 4  from time import sleep, time, ctime
005 5
006 6  loops = [ 4, 2 ]
007 7
008 8  class ThreadFunc:
009 9
010 10     def __init__(self, func, args, name=''):
011 11         self.name = name
012 12         self.func = func
013 13         self.args = args
014 14
015 15     def __call__(self):
016 16         apply(self.func, self.args)
017 17
018 18 def loop(nloop, nsec):
019 19     print 'start loop', nloop, 'at:', ctime(time())
020 20     sleep(nsec)
021 21     print 'loop', nloop, 'done at:', ctime(time())
022 22
023 23 def main():
024 24     print 'starting threads…'
025 25     threads = []
026 26     nloops = range(len(loops))
027 27
028 28     for i in nloops: # create all threads
029 29         t = threading.Thread( \
030 30             target=ThreadFunc(loop, (i, loops[i]),
031 31             loop.__name__))
032 32         threads.append(t)
033 33
034 34     for i in nloops: # start all threads
035 35         threads[i].start()
036 36
037 37     for i in nloops: # wait for completion
038 38         threads[i].join()
039 39
040 40     print 'all DONE at:', ctime(time())
041 41
042 42 if __name__ == '__main__':
043 43     main()
044  <$nopage>

When the Thread code calls our ThreadFunc object when a new thread is created, it will invoke the __call__() special method. Because we already have our set of arguments, we do not need to pass it to the Thread() constructor, but do have to use apply() in our code now because we have an argument tuple. Those of you who have Python 1.6 and higher can use the new function invocation syntax described in Section 11.6.3 instead of using apply() on line 17:

							
self.res = self.func(*self.args)

						
Subclass Thread and create subclass instance

The final introductory example involves subclassing Thread(), which turns out to be extremely similar to creating a callable class as in the previous example. Subclassing is a bit easier to read when you are creating your threads (lines 28–29). We will present the code for mtsleep5.py in Example 17.6 as well as the output obtained from its execution, and leave it as an exercise for the reader to compare mtsleep5.py to mtsleep4.py.

Here is the output for mtsleep5.py, again, just what we expected:

							
% mtsleep5.py
starting threads…
start loop 0 at: Sun Aug 13 19:14:26 2000
start loop 1 at: Sun Aug 13 19:14:26 2000
loop 1 done at: Sun Aug 13 19:14:28 2000
loop 0 done at: Sun Aug 13 19:14:30 2000
all DONE at: Sun Aug 13 19:14:30 2000

						

While the reader compares the source between the mtsleep4 and mtsleep5 modules, we want to point out the most significant changes: (1) our MyThread subclass constructor must first invoke the base class constructor (line 9), and (2) the former special method __call__() must be called run() in the subclass.

We now modify our MyThread class with some diagnostic output and store it in a separate module called myThread (see Example 17.7) and import this class for the upcoming examples. Rather than simply calling apply() to run our functions, we also save the result to instance attribute self.res, and create a new method to retrieve that value, getResult().

Example 17.6. Subclassing Thread (mtsleep5.py)

Rather than instantiating the Thread class, we subclass it. This gives us more flexibility in customizing our threading objects and simplifies the thread creation call.

 <$nopage>
001 1  #!/usr/bin/env python
002 2
003 3  import threading
004 4  from time import sleep, time, ctime
005 5
006 6  loops = ( 4, 2 )
007 7
008 8  class MyThread(threading.Thread):
009 9      def __init__(self, func, args, name=''):
010 10         threading.Thread.__init__(self)
011 11         self.name = name
012 12         self.func = func
013 13         self.args = args
014 14
015 15     def run(self):
016 16         apply(self.func, self.args)
017 17
018 18 def loop(nloop, nsec):
019 19     print 'start loop', nloop, 'at:', ctime(time())
020 20     sleep(nsec)
021 21     print 'loop', nloop, 'done at:', ctime(time())
022 22
023 23 def main():
024 24     print 'starting threads…'
025 25     threads = []
026 26     nloops = range(len(loops))
027 27
028 28     for i in nloops:
029 29         t = MyThread(loop, (i, loops[i]), \
030 30             loop.__name__)
031 31         threads.append(t)
032 32
033 33     for i in nloops:
034 34         threads[i].start()
035 35
036 36     for i in nloops:
037 37         threads[i].join()
038 38
039 39     print 'all DONE at:', ctime(time())'
040 40
041 41 if __name__ == '__main__':
042 42     main()
043  <$nopage>
Example 17.7. MyThread Subclass of Thread (myThread.py)

To generalize our subclass of Thread from mtsleep5.py, we move the subclass to a separate module and add a getResult() method for callables which produce return values.

 <$nopage>
001 1  #!/usr/bin/env python
002 2
003 3  import threading
004 4  from time import time, ctime
005 5
006 6  class MyThread(threading.Thread):
007 7      def __init__(self, func, args, name=''):
008 8          threading.Thread.__init__(self)
009 9          self.name = name
010 10         self.func = func
011 11         self.args = args
012 12
013 13     def getResult(self):
014 14         return self.res
015 15
016 16     def run(self):
017 17         print 'starting', self.name, 'at:', \
018 18             ctime(time())
019 19         self.res = apply(self.func, self.args)
020 20         print self.name, 'finished at:', \
021 21             ctime(time())
022  <$nopage>

Fibonacci and factorial… take 2, plus summation

The mtfacfib.py script, given in Example 17.8, compares execution of the recursive Fibonacci, factorial, and summation functions. This script runs all three functions in a single-threaded manner, then performs the same task using threads to illustrate one of the advantages of having a threading environment.

Example 17.8. Fibonacci, Factorial, Summation (mtfacfib.py)

In this MT application, we execute 3 separate recursive functions—first in a single-threaded fashion, followed by the alternative with multiple threads.

 <$nopage>
001 1  #!/usr/bin/env python
002 2
003 3  from myThread import MyThread
004 4  from time import time, ctime, sleep
005 5
006 6  def fib(x):
007 7      sleep(0.005)
008 8      if x < 2: return 1
009 9      return (fib(x-2) + fib(x-1))
010 10
011 11 def fac(x):
012 12     sleep(0.1)
013 13     if x < 2: return 1
014 14     return (x * fac(x-1))
015 15
016 16 def sum(x):
017 17     sleep(0.1)
018 18     if x < 2: return 1
019 19     return (x + sum(x-1))
020 20
021 21 funcs = [fib, fac, sum]
022 22 n = 12
023 23
024 24 def main():
025 25     nfuncs = range(len(funcs))
026 26
027 27     print '*** SINGLE THREAD'
028 28     for i in nfuncs:
029 29         print 'starting', funcs[i].__name__, 'at:', \
030 30             ctime(time())
031 31         print funcs[i](n)
032 32         print funcs[i].__name__, 'finished at:', \
033 33             ctime(time())
034 34
035 35     print '\n*** MULTIPLE THREADS'
036 36     threads = []
037 37     for i in nfuncs:
038 38         t = MyThread(funcs[i], (n,),
039 39             funcs[i].__name__)
040 40         threads.append(t)
041 41
042 42     for i in nfuncs:
043 43         threads[i].start()
044 44
045 45     for i in nfuncs:
046 46         threads[i].join()
047 47         print threads[i].getResult()
048 48
049 49     print 'all DONE'
050 50
051 51 if __name__ == '__main__':
052 52     main()
053  <$nopage>

Running in single-threaded mode simply involves calling the functions one at a time and displaying the corresponding the results right after the function call.

When running in multithreaded mode, we do not display the result right away. Because we want to keep our MyThread class as general as possible (being able to execute callables which do and do not produce output), we wait until the end to call the getResult() method to finally show you the return values of each function call.

Because these functions execute so quickly (well, maybe except for the Fibonacci function), you will noticed that we had to add calls to sleep() to each function to slow things down so that we can see how threading may improve performance, if indeed the actual work had varying execution times—you certainly wouldn't pad your work with calls to sleep(). Anyway, here is the output:

						
% mtfacfib.py
*** SINGLE THREAD
starting fib at: Sun Jun 18 19:52:20 2000
233
fib finished at: Sun Jun 18 19:52:24 2000
starting fac at: Sun Jun 18 19:52:24 2000
479001600
fac finished at: Sun Jun 18 19:52:26 2000
starting sum at: Sun Jun 18 19:52:26 2000
78
sum finished at: Sun Jun 18 19:52:27 2000

*** MULTIPLE THREADS
starting fib at: Sun Jun 18 19:52:27 2000
starting fac at: Sun Jun 18 19:52:27 2000
starting sum at: Sun Jun 18 19:52:27 2000
fac finished at: Sun Jun 18 19:52:28 2000
sum finished at: Sun Jun 18 19:52:28 2000
fib finished at: Sun Jun 18 19:52:31 2000
233
479001600
78
all DONE

					

Producer-Consumer Problem and the Queue Module

The final example illustrates the producer-consumer scenario where a producer of goods or services creates goods and places it in a data structure such as a queue. The amount of time between producing goods is non-deterministic, as is the consumer consuming the goods produced by the producer.

We use the Queue module to provide an interthread communication mechanism which allows threads to share data with each other. In particular, we create a queue for the producer (thread) to place new goods into and where the consumer (thread) can consume goods from.

In particular, we will use the following attributes from the Queue module (see Table 17.3).

Table 17.3. Common Queue Module Attributes
Function/Method Description
Queue Module Function  
queue(size) creates a Queue object of given size
Queue Object Methods  
qsize() returns queue size (approximate, since queue may be getting updated by other threads)
empty() returns 1 if queue empty, 0 otherwise
full() returns 1 if queue full, 0 otherwise
put(item, block=0) puts item in queue, if block given (not 0), block until room is available
get(block=0) gets item from queue, if block given (not 0), block until an item is available

Without further ado, we present the code for prodcons.py, shown in Example 17.9.

Example 17.9. Producer-Consumer Problem (prodcons.py)

We feature an implementation of the Producer–Consumer problem using Queue objects and a random number of goods produced (and consumed). The producer and consumer are individually—and concurrently—executing threads.

 <$nopage>
001 1  #!/usr/bin/env python
002 2
003 3  from random import randint
004 4  from time import time, ctime, sleep
005 5  from Queue import Queue
006 6  from myThread import MyThread
007 7
008 8  def writeQ(queue):
009 9      print 'producing object for Q…',
010 10     queue.put('xxx', 1)
011 11     print "size now", queue.qsize()
012 12
013 13 def readQ(queue):
014 14     val = queue.get(1)
015 15 print 'consumed object from Q… size now', \
016 16              queue.qsize()
017 17
018 18 def writer(queue, loops):
019 19     for i in range(loops):
020 20         writeQ(queue)
021 21         sleep(randint(1, 3))
022 22
023 23 def reader(queue, loops):
024 24     for i in range(loops):
025 25         readQ(queue)
026 26         sleep(randint(2, 5))
027 27
028 28 funcs = [writer, reader]
029 29 nfuncs = range(len(funcs))
030 30
031 31 def main():
032 32     nloops = randint(2, 5)
033 33     q = Queue(32)
034 34
035 35     threads = []
036 36     for i in nfuncs:
037 37         t = MyThread(funcs[i], (q, nloops), \
038 38             funcs[i].__name__)
039 39         threads.append(t)
040 40
041 41     for i in nfuncs:
042 42         threads[i].start()
043 43
044 44     for i in nfuncs:
045 45         threads[i].join()
046 46
047 47     print 'all DONE'
048 48
049 49 if __name__ == '__main__':
050 50     main()
051  <$nopage>

Here is the output from one execution of this script:

						
% prodcons.py
starting writer at: Sun Jun 18 20:27:07 2000
producing object for Q… size now 1
starting reader at: Sun Jun 18 20:27:07 2000
consumed object from Q… size now 0
producing object for Q… size now 1
consumed object from Q… size now 0
producing object for Q… size now 1
producing object for Q… size now 2
producing object for Q… size now 3
consumed object from Q… size now 2
consumed object from Q… size now 1
writer finished at: Sun Jun 18 20:27:17 2000
consumed object from Q… size now 0
reader finished at: Sun Jun 18 20:27:25 2000
all DONE

As you can see, the producer and consumer do not necessarily alternate in execution. (Thank goodness for random numbers!) Seriously though, real life is generally random and non-deterministic.

Line-by-line explanation
Lines 1–6

In this module, we will use the Queue.Queue object as well as our thread class myThread.MyThread which we gave in Example 17.7. We will use random.randint() to make production and consumption somewhat varied, and also grab the usual suspects from the time module.

Lines 8–16

The writeQ() and readQ() functions each have a specific purpose, to place an object in the queue—we are using the string 'xxx' for example—and to consume a queued object, respectively. Notice that we are producing one object and reading one object each time.

Lines 18–26

The writer() is going to run as a single thread who sole purpose is to produce an item for the queue, wait for a bit, then do it again, up to the specified number of times, chosen randomly per script execution. The reader() will do likewise, with the exception of consuming an item, of course.

You will notice that the random number of seconds that the writer sleeps is in general shorter than the amount of time the reader sleeps. This is to discourage the reader from trying to take items from an empty queue. By giving the writer a shorter time period of waiting, it is more likely that there will already be an object for the reader to consume by the time their turn rolls around again.

Lines 28–29

These are just setup lines to set the total number of threads that are to be spawned and executed.

Lines 31–47

Finally, our main() function, which should look quite similar to the main() in all of the other scripts in this chapter. We create the appropriate threads and send them on their way, finishing up when both threads have concluded execution.

We infer from this example that a program that has multiple tasks to perform can be organized to use separate threads for each of the tasks. This can result in a much cleaner program design than a single threaded program that attempts to do all of the tasks.

In this chapter, we illustrated how a single-threaded process may limit an application's performance. In particular, programs with independent, non-deterministic, and non-causal tasks which execute sequentially can be improved by division into separate tasks executed by individual threads. Not all applications may benefit from multithreading and its overheads, but now you are more cognizant of Python's threading capability enough to use this tool to your advantage when appropriate.


Last updated on 9/14/2001
Core Python Programming, © 2002 Prentice Hall PTR

< BACKMake Note | BookmarkCONTINUE >

© 2002, O'Reilly & Associates, Inc.