Combing multiprocessing with multithreading in Python
In this post I’ve created an example of using multiprocessing and multithreading to process a list of work on a Queue.
Personally I’ve used this example in web scraping large websites, but it can be used in many different scenarios to take full advantage of a computer’s CPU processing power. The full code is posted at the bottom of this article.
The first thing is to create a list of work. This is just an example, so I put a bunch of numbers onto the Queue.
def populate_queue(): """Fill up the queue with work to do. Additional work will not be added.""" for x in range(10000): Q.put(x)
As part of the setup I’ve created a custom threading.Thread
class. This acts exactly the same as the inherited class except that I release the semaphore after the thread has completed so there’s no lockups.
class CustomThread(threading.Thread): """This functions the same as the inherited Thread class except with added semaphore parameter, which always releases the semaphore when the thread is done. """ def __init__(self, semaphore=None, *args, **kwargs): super().__init__(*args, **kwargs) self._semaphore = semaphore def run(self): try: if self._target: self._target(*self._args, **self._kwargs) finally: if self._semaphore: self._semaphore.release() del self._target, self._args, self._kwargs
In the start_processes
function I’ve implemented the starting of processes (the number is based on a count of the computer’s CPU count). This goes through a for
loop, starting a Process
in each loop and appending the started process to a list. A semaphore is created for each process and passed to the run_threads
function where we initiate the threads. After the loop has completed spawning processes, it’s going to iterate through the list of started processes and wait until they’ve all finished.
def start_processes(): """Initialize the queue with work and start a number of process (defined above). With each process, we create a bounded semaphore to control the number of active threads per process.""" populate_queue() process_list = [] for x in range(NUMB_PROCESSES): semaphore = threading.BoundedSemaphore( NUMB_THREADS_PER_PROCESS) # pass the semaphore to the target func p = mp.Process( target=run_threads, args=(x,semaphore,), name=str(x) ) process_list.append(p) p.start() for process in process_list: # make sure all processes have completed process.join()
Meanwhile, in the run_threads
function we start a number of threads (the number is based on the NUMB_THREADS_PER_PROCESS
variable defined earlier in the script). This function continually checks for work to do and starts a thread to process that work. Remember that this entire function is being called X times based on the PROCESSES
value defined at the beginning of the script so the maximum number of threads running at any given time is determined by PROCESSES * NUMB_THREADS_PER_PROCESS
.
def run_threads(p_name, semaphore): """As long as there are items on the queue, i.e. work to do, then we keep threading.""" while not Q.empty(): semaphore.acquire() try: i = Q.get(timeout=1) except queue.Empty: break CustomThread( target=do_work, daemon=1, args=(i,p_name), semaphore=semaphore ).start()
Finally we have the do_work
function. Right now this doesn’t do anything except sleep and print out some values however, this would be where we do some work that takes time or processing power to really utilize the purpose of this script.
def do_work(i, process_name): """Do the heavy processing.""" sleep(0.5) print('Process Number: {}, value: {}'.format( process_name,i*i))
And here’s the complete code!
import multiprocessing as mp import threading import queue from time import sleep try: NUMB_PROCESSES = mp.cpu_count() except NotImplementedError: NUMB_PROCESSES = 1 NUMB_THREADS_PER_PROCESS = 8 Q = mp.Queue() class CustomThread(threading.Thread): """This functions the same as the inherited Thread class except with added semaphore parameter, which always releases the semaphore when the thread is done. """ def __init__(self, semaphore=None, *args, **kwargs): super().__init__(*args, **kwargs) self._semaphore = semaphore def run(self): try: if self._target: self._target(*self._args, **self._kwargs) finally: if self._semaphore: self._semaphore.release() del self._target, self._args, self._kwargs def do_work(i, process_name): """Do the heavy processing.""" sleep(0.5) print('Process Number: {}, value: {}'.format( process_name,i*i)) def populate_queue(): """Fill up the queue with work to do. Additional work will not be added.""" for x in range(10000): Q.put(x) def run_threads(p_name, semaphore): """As long as there are items on the queue, i.e. work to do, then we keep threading.""" while not Q.empty(): semaphore.acquire() try: i = Q.get(timeout=1) except queue.Empty: break CustomThread( target=do_work, daemon=1, args=(i,p_name), semaphore=semaphore ).start() def start_processes(): """Initialize the queue with work and start a number of process (defined above). With each process, we create a bounded semaphore to control the number of active threads per process.""" populate_queue() process_list = [] for x in range(NUMB_PROCESSES): semaphore = threading.BoundedSemaphore( NUMB_THREADS_PER_PROCESS) # pass the semaphore to the target func p = mp.Process( target=run_threads, args=(x,semaphore,), name=str(x) ) process_list.append(p) p.start() for process in process_list: # make sure all processes have completed process.join() if __name__ == '__main__': start_processes()
Thanks for reading! Leave a comment or question below. If you’d like to use this code in your own project, I only ask that you tell me where it’s being used by contacting me or leaving a comment.