Python Concurrency: An Example of a Queue

Python comes with a lot of cool concurrency tools builtin, such as threads, Queues, semaphores and multiprocessing. In this article, we'll spend some time learning how to use Queues. A Queue can be used for first-in-first out or last-in-last-out stack-like implementations if you just use them directly. If you'd like to see that in action, see the Hellman article at the end of this post. We're going to mix threads in and create a simple file downloader script to demonstrate how Queues work for cases where we want concurrency.

Creating a Downloading Application

This code is based loosely on Hellman's article and the IBM article as they both show how to download URLs in various ways. This implementation actually downloads files. We'll use the United States Infernal (oops, I mean Internal) Revenue Service's tax forms for our example. Let's pretend we're a small business owner and we need to download a bunch of these forms for our employees. Here's some code that will suit our needs:

import os
import Queue
import threading
import urllib2

########################################################################
class Downloader(threading.Thread):
    """Threaded File Downloader"""
    
    #----------------------------------------------------------------------
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    #----------------------------------------------------------------------
    def run(self):
        while True:
            # gets the url from the queue
            url = self.queue.get()
        
            # download the file
            self.download_file(url)
        
            # send a signal to the queue that the job is done
            self.queue.task_done()
            
    #----------------------------------------------------------------------
    def download_file(self, url):
        """"""
        handle = urllib2.urlopen(url)
        fname = os.path.basename(url)
        with open(fname, "wb") as f:
            while True:
                chunk = handle.read(1024)
                if not chunk: break
                f.write(chunk)

#----------------------------------------------------------------------
def main(urls):
    """
    Run the program
    """
    queue = Queue.Queue()
    
    # create a thread pool and give them a queue
    for i in range(5):
        t = Downloader(queue)
        t.setDaemon(True)
        t.start()
    
    # give the queue some data
    for url in urls:
        queue.put(url)
 
    # wait for the queue to finish
    queue.join()

if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    main(urls)

Let's break this down a bit. First of all, we need to look at the main function definition to see how this all flows. Here we see that it accepts a list of urls. The main function then creates a queue instance that it passes to 5 daemonized threads. The main difference between daemonized and non-daemon threads is that you have to keep track of non-daemon threads and close them yourself whereas with a daemon thread you basically just set them and forget them and when your app closes, they close too. Next we load up the queue (using its put method) with the urls we passed in. Finally we tell the queue to wait for the threads to do their processing via the join method. In the download class, we have the line "self.queue.get()" which blocks until the queue has something to return. That means the threads just sit idly waiting to pick something up. It also means that for a thread to "get" something from the queue, it must call the queue's "get" method. Thus as we add or put items in the queue, the thread pool will pick up or "get" items and process them. This is also known as "dequeing". Once all the items in the queue are processed, the script ends and exits. On my machine, it downloads all 5 documents in under a second.

Further Reading

Copyright © 2024 Mouse Vs Python | Powered by Pythonlibrary