Combing multiprocessing with multithreading in Python

Combing multiprocessing with multithreading in Python

In this post I’ve created an example of using multiprocessing and multithreading to process a list of work on a Queue.

Personally I’ve used this example in web scraping large websites, but it can be used in many different scenarios to take full advantage of a computer’s CPU processing power. The full code is posted at the bottom of this article.

The first thing is to create a list of work. This is just an example, so I put a bunch of numbers onto the Queue.

def populate_queue():
    """Fill up the queue with work to do.
    Additional work will not be added."""
    for x in range(10000):
        Q.put(x)

As part of the setup I’ve created a custom threading.Thread class. This acts exactly the same as the inherited class except that I release the semaphore after the thread has completed so there’s no lockups.

class CustomThread(threading.Thread):
    """This functions the same as the inherited Thread
    class except with added semaphore parameter, which
    always releases the semaphore when the thread is done.
    """
    def __init__(self, semaphore=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._semaphore = semaphore

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            if self._semaphore:
                self._semaphore.release()
            del self._target, self._args, self._kwargs

In the start_processes function I’ve implemented the starting of processes (the number is based on a count of the computer’s CPU count). This goes through a for loop, starting a Process in each loop and appending the started process to a list. A semaphore is created for each process and passed to the run_threads function where we initiate the threads. After the loop has completed spawning processes, it’s going to iterate through the list of started processes and wait until they’ve all finished.

def start_processes():
    """Initialize the queue with work and start
    a number of process (defined above). With
    each process, we create a bounded semaphore
    to control the number of active threads
    per process."""
    populate_queue()
    process_list = []
    for x in range(NUMB_PROCESSES):
        semaphore = threading.BoundedSemaphore(
            NUMB_THREADS_PER_PROCESS)
        # pass the semaphore to the target func
        p = mp.Process(
            target=run_threads,
            args=(x,semaphore,), name=str(x)
        )
        process_list.append(p)
        p.start()
    for process in process_list:
        # make sure all processes have completed
        process.join()

Meanwhile, in the run_threads function we start a number of threads (the number is based on the NUMB_THREADS_PER_PROCESS variable defined earlier in the script). This function continually checks for work to do and starts a thread to process that work. Remember that this entire function is being called X times based on the PROCESSES value defined at the beginning of the script so the maximum number of threads running at any given time is determined by PROCESSES * NUMB_THREADS_PER_PROCESS.

def run_threads(p_name, semaphore):
    """As long as there are items on the queue,
    i.e. work to do, then we keep threading."""
    while not Q.empty():
        semaphore.acquire()
        try:
            i = Q.get(timeout=1)
        except queue.Empty:
            break
        CustomThread(
            target=do_work, daemon=1,
            args=(i,p_name), semaphore=semaphore
        ).start()

Finally we have the do_work function. Right now this doesn’t do anything except sleep and print out some values however, this would be where we do some work that takes time or processing power to really utilize the purpose of this script.

def do_work(i, process_name):
    """Do the heavy processing."""
    sleep(0.5)
    print('Process Number: {}, value: {}'.format(
        process_name,i*i))

And here’s the complete code!

import multiprocessing as mp
import threading
import queue
from time import sleep

try:
    NUMB_PROCESSES = mp.cpu_count()
except NotImplementedError:
    NUMB_PROCESSES = 1
NUMB_THREADS_PER_PROCESS = 8
Q = mp.Queue()


class CustomThread(threading.Thread):
    """This functions the same as the inherited Thread
    class except with added semaphore parameter, which
    always releases the semaphore when the thread is done.
    """
    def __init__(self, semaphore=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._semaphore = semaphore

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            if self._semaphore:
                self._semaphore.release()
            del self._target, self._args, self._kwargs


def do_work(i, process_name):
    """Do the heavy processing."""
    sleep(0.5)
    print('Process Number: {}, value: {}'.format(
        process_name,i*i))


def populate_queue():
    """Fill up the queue with work to do.
    Additional work will not be added."""
    for x in range(10000):
        Q.put(x)


def run_threads(p_name, semaphore):
    """As long as there are items on the queue,
    i.e. work to do, then we keep threading."""
    while not Q.empty():
        semaphore.acquire()
        try:
            i = Q.get(timeout=1)
        except queue.Empty:
            break
        CustomThread(
            target=do_work, daemon=1,
            args=(i,p_name), semaphore=semaphore
        ).start()


def start_processes():
    """Initialize the queue with work and start
    a number of process (defined above). With
    each process, we create a bounded semaphore
    to control the number of active threads
    per process."""
    populate_queue()
    process_list = []
    for x in range(NUMB_PROCESSES):
        semaphore = threading.BoundedSemaphore(
            NUMB_THREADS_PER_PROCESS)
        # pass the semaphore to the target func
        p = mp.Process(
            target=run_threads,
            args=(x,semaphore,), name=str(x)
        )
        process_list.append(p)
        p.start()
    for process in process_list:
        # make sure all processes have completed
        process.join()

if __name__ == '__main__':
    start_processes()

Thanks for reading! Leave a comment or question below. If you’d like to use this code in your own project, I only ask that you tell me where it’s being used by contacting me or leaving a comment.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.