The multiprocessing module was added to Python in version 2.6. It was originally defined in PEP 371 by Jesse Noller and Richard Oudkerk. The multiprocessing module allows you to spawn processes in much that same manner than you can spawn threads with the threading module. The idea here is that because you are now spawning processes, you can avoid the Global Interpreter Lock (GIL) and take full advantages of multiple processors on a machine.
The multiprocessing package also includes some APIs that are not in the threading module at all. For example, there is a neat Pool class that you can use to parallelize executing a function across multiple inputs. We will be looking at Pool in a later section. We will start with the multiprocessing module's Process class.
The Process class is very similar to the threading module's Thread class. Let's try creating a series of processes that call the same function and see how that works:
import os from multiprocessing import Process def doubler(number): """ A doubling function that can be used by a process """ result = number * 2 proc = os.getpid() print('{0} doubled to {1} by process id: {2}'.format( number, result, proc)) if __name__ == '__main__': numbers = [5, 10, 15, 20, 25] procs = [] for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() for proc in procs: proc.join()
For this example, we import Process and create a doubler function. Inside the function, we double the number that was passed in. We also use Python's os module to get the current process's ID (or pid). This will tell us which process is calling the function. Then in the block of code at the bottom, we create a series of Processes and start them. The very last loop just calls the join() method on each process, which tells Python to wait for the process to terminate. If you need to stop a process, you can call its terminate() method.
When you run this code, you should see output that is similar to the following:
5 doubled to 10 by process id: 10468 10 doubled to 20 by process id: 10469 15 doubled to 30 by process id: 10470 20 doubled to 40 by process id: 10471 25 doubled to 50 by process id: 10472
Sometimes it's nicer to have a more human readable name for your process though. Fortunately, the Process class does allow you to access the same of your process. Let's take a look:
import os from multiprocessing import Process, current_process def doubler(number): """ A doubling function that can be used by a process """ result = number * 2 proc_name = current_process().name print('{0} doubled to {1} by: {2}'.format( number, result, proc_name)) if __name__ == '__main__': numbers = [5, 10, 15, 20, 25] procs = [] proc = Process(target=doubler, args=(5,)) for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() proc = Process(target=doubler, name='Test', args=(2,)) proc.start() procs.append(proc) for proc in procs: proc.join()
This time around, we import something extra: current_process. The current_process is basically the same thing as the threading module's current_thread. We use it to grab the name of the thread that is calling our function. You will note that for the first five processes, we don't set a name. Then for the sixth, we set the process name to "Test". Let's see what we get for output:
5 doubled to 10 by: Process-2 10 doubled to 20 by: Process-3 15 doubled to 30 by: Process-4 20 doubled to 40 by: Process-5 25 doubled to 50 by: Process-6 2 doubled to 4 by: Test
The output demonstrates that the multiprocessing module assigns a number to each process as a part of its name by default. Of course, when we specify a name, a number isn't going to get added to it.
The multiprocessing module supports locks in much the same way as the threading module does. All you need to do is import Lock, acquire it, do something and release it. Let's take a look:
from multiprocessing import Process, Lock def printer(item, lock): """ Prints out the item that was passed in """ lock.acquire() try: print(item) finally: lock.release() if __name__ == '__main__': lock = Lock() items = ['tango', 'foxtrot', 10] for item in items: p = Process(target=printer, args=(item, lock)) p.start()
Here we create a simple printing function that prints whatever you pass to it. To prevent the threads from interfering with each other, we use a Lock object. This code will loop over our list of three items and create a process for each of them. Each process will call our function and pass it one of the items from the iterable. Because we're using locks, the next process in line will wait for the lock to release before it can continue.
Logging processes is a little different than logging threads. The reason for this is that Python's logging packages doesn't use process shared locks, so it's possible for you to end up with messages from different processes getting mixed up. Let's try adding basic logging to the previous example. Here's the code:
import logging import multiprocessing from multiprocessing import Process, Lock def printer(item, lock): """ Prints out the item that was passed in """ lock.acquire() try: print(item) finally: lock.release() if __name__ == '__main__': lock = Lock() items = ['tango', 'foxtrot', 10] multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) for item in items: p = Process(target=printer, args=(item, lock)) p.start()
The simplest way to log is to send it all to stderr. We can do this by calling the log_to_stderr() function. Then we call the get_logger function to get access to a logger and set its logging level to INFO. The rest of the code is the same. I will note that I'm not calling the join() method here. Instead, the parent thread (i.e. your script) will call join() implicitly when it exits.
When you do this, you should get output like the following:
[INFO/Process-1] child process calling self.run() tango [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/Process-2] child process calling self.run() [INFO/MainProcess] process shutting down foxtrot [INFO/Process-2] process shutting down [INFO/Process-3] child process calling self.run() [INFO/Process-2] process exiting with exitcode 0 10 [INFO/MainProcess] calling join() for process Process-3 [INFO/Process-3] process shutting down [INFO/Process-3] process exiting with exitcode 0 [INFO/MainProcess] calling join() for process Process-2
Now if you want to save the log to disk, then it gets a little trickier. You can read about that topic in Python's logging Cookbook.
The Pool class is used to represent a pool of worker processes. It has methods which can allow you to offload tasks to the worker processes. Let's look at a really simple example:
from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == '__main__': numbers = [5, 10, 20] pool = Pool(processes=3) print(pool.map(doubler, numbers))
Basically what's happening here is that we create an instance of Pool and tell it to create three worker processes. Then we use the map method to map a function and an iterable to each process. Finally we print the result, which in this case is actually a list: [10, 20, 40].
You can also get the result of your process in a pool by using the apply_async method:
from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == '__main__': pool = Pool(processes=3) result = pool.apply_async(doubler, (25,)) print(result.get(timeout=1))
What this allows us to do is actually ask for the result of the process. That is what the get function is all about. It tries to get our result. You will note that we also have a timeout set just in case something happened to the function we were calling. We don't want it to block indefinitely after all.
When it comes to communicating between processes, the multiprocessing modules has two primary methods: Queues and Pipes. The Queue implementation is actually both thread and process safe. Let's take a look at a fairly simple example that's based on the Queue code from one of my threading articles:
from multiprocessing import Process, Queue sentinel = -1 def creator(data, q): """ Creates data to be consumed and waits for the consumer to finish processing """ print('Creating data and putting it on the queue') for item in data: q.put(item) def my_consumer(q): """ Consumes some data and works on it In this case, all it does is double the input """ while True: data = q.get() print('data found to be processed: {}'.format(data)) processed = data * 2 print(processed) if data is sentinel: break if __name__ == '__main__': q = Queue() data = [5, 10, 13, -1] process_one = Process(target=creator, args=(data, q)) process_two = Process(target=my_consumer, args=(q,)) process_one.start() process_two.start() q.close() q.join_thread() process_one.join() process_two.join()
Here we just need to import Queue and Process. Then we two functions, one to create data and add it to the queue and the second to consume the data and process it. Adding data to the Queue is done by using the Queue's put() method whereas getting data from the Queue is done via the get method. The last chunk of code just creates the Queue object and a couple of Processes and then runs them. You will note that we call join() on our process objects rather than the Queue itself.
We have a lot of material here. You have learned how to use the multiprocessing module to target regular functions, communicate between processes using Queues, naming threads and much more. There is also a lot more in the Python documentation that isn't even touched in this article, so be sure to dive into that as well. In the meantime, you now know how to utilize all your computer's processing power with Python!
Copyright © 2024 Mouse Vs Python | Powered by Pythonlibrary