Multiprocessing Example

Published on Aug. 17, 2019, 6:41 p.m.

This is a multiprocessing module example with Process object which initiates several workers (separate CPU cores) and each of them is searching a number on a list by using - 

1) linear search

2) binary search 

import multiprocessing
from multiprocessing import Process
import time


sentinel = -1
item_found = False


def binary_search(val_to_find, sorted_num_list):

    def rec_binary_search(val, val_list, min_index, max_index):
        """
        This recursive function returns in O(lg(n)) the index of val in val_list. If val is not present in val_list - the function returns -1.
        :param val_list: The list of values to search in.
        :param val: The value which index we need to find.
        :param min_index: The true minimal index of the passed list.
        :param max_index: The true maximal index of the passed list.
        :return: Index of val in val_list, if val is present in val_list, -1 otherwise.
        """
        if len(val_list) == 0:                                      # if the array is of zero size - the item was not found, so return.
            return -1
        elif len(val_list) == 1:                                    # if the array is of size 1 - check if the current item is the one we search for, and if not - return false, otherwise return it.
            if val_list[0] == val:
                return min_index
            else:
                return -1
        else:                                                       # if there are more than one item in the array - send recursive call on the smaller array, or return the index of the item if found.
            true_mid_index = int((min_index + max_index) / 2)
            index_offset = int(len(val_list) / 2)
            mid_val = val_list[index_offset]
            if mid_val == val:                                      # if the item has been found - return its' index with the offset
                return min_index + index_offset
            elif mid_val < val:                                     # if the middle item is smaller than the searched item - search the left side
                return rec_binary_search(val, val_list[(index_offset + 1):], true_mid_index + 1, max_index)
            elif mid_val > val:                                     # if the middle item is bigger than the searched item - search the right side.
                return rec_binary_search(val, val_list[:index_offset], min_index, true_mid_index)

    return rec_binary_search(val_to_find, sorted_num_list, 0, len(sorted_num_list))


def linear_search(searched_item, item_list):
    searched_item_index = -1
    for item_index, list_item in enumerate(item_list):
        if list_item == searched_item:
            searched_item_index = item_index
            break
    return searched_item_index


def find_item(searched_item, part_list):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    item_index = linear_search(searched_item, part_list)
    if item_index > -1:
        print('Found {} at index {} in {}'.format(searched_item, item_index, part_list))
    else:
        print('{} not found in {}'.format(searched_item, part_list))


def print_found():
    print('found')


if __name__ == '__main__':
    data = [5, 10, 13, -1, 100, 300, 20, 111, 13, 221, 30, 44, 666, 71, 2002, 444, 10, 12, 24, 19, 50, 100, 130, -10, 1000, 3000, 200, 1110, 130, 2201, 300, 404, 6606, 710, 20002, 4440, 100, 120, 240, 190, 50, 100, 103, -101, 1090, 3090, 209, 1119, 193, 2219, 309, 494, 6966, 791, 20902, 4944, 190, 129, 294, 199]
    sorted_data = sorted(data)
    p_pool = []
    for p_ind in range(10):
        p = Process(target=find_item, args=(10, sorted_data[p_ind:p_ind + 5]))
        p_pool.append(p)
        p.start()

    for p in p_pool:
        p.join()

    print('Number of free processors  : {}'.format(multiprocessing.cpu_count()))
    process_1 = Process(target=find_item, args=(10, sorted_data[:5]))
    process_2 = Process(target=find_item, args=(1, sorted_data[5:10]))
    process_3 = Process(target=find_item, args=(666, sorted_data[10:15]))
    process_4 = Process(target=find_item, args=(5, sorted_data[15:20]))
    process_5 = Process(target=find_item, args=(10, sorted_data[20:25]))
    process_6 = Process(target=find_item, args=(1, sorted_data[25:30]))
    process_7 = Process(target=find_item, args=(666, sorted_data[30:35]))
    process_8 = Process(target=find_item, args=(5, sorted_data[15:20]))
    process_9 = Process(target=find_item, args=(5, sorted_data[15:20]))

    print(' ================================= MP LINEAR SEARCH START ================================= ')
    print(sorted_data)
    mp_reg_time_start = time.time()
    process_1.start()
    process_2.start()
    process_3.start()
    process_4.start()
    process_5.start()
    process_6.start()
    process_7.start()
    process_8.start()
    process_9.start()
    mp_reg_time_end = time.time() - mp_reg_time_start
    print(' ================================= MP LINEAR SEARCH END ================================= ')

    process_1.join()
    process_2.join()
    process_3.join()
    process_4.join()
    process_5.join()
    process_6.join()
    process_7.join()
    process_8.join()
    process_9.join()
    process_1 = Process(target=binary_search, args=(10, sorted_data[:5]))
    process_2 = Process(target=binary_search, args=(1, sorted_data[5:10]))
    process_3 = Process(target=binary_search, args=(666, sorted_data[10:15]))
    process_4 = Process(target=binary_search, args=(5, sorted_data[15:20]))
    process_5 = Process(target=binary_search, args=(10, sorted_data[20:25]))
    process_6 = Process(target=binary_search, args=(1, sorted_data[25:30]))
    process_7 = Process(target=binary_search, args=(666, sorted_data[30:35]))
    process_8 = Process(target=binary_search, args=(5, sorted_data[15:20]))
    process_9 = Process(target=binary_search, args=(5, sorted_data[15:20]))
    # signal.signal(signal.SIGTERM, print_found)

    print(' ================================= MP BINARY SEARCH START ================================= ')
    print(sorted_data)
    mp_bs_time_start = time.time()
    process_1.start()
    process_2.start()
    process_3.start()
    process_4.start()
    process_5.start()
    process_6.start()
    process_7.start()
    process_8.start()
    process_9.start()
    mp_bs_time_end = time.time() - mp_bs_time_start
    print(' ================================= MP BINARY SEARCH END ================================= ')

    process_1.join()
    process_2.join()
    process_3.join()
    process_4.join()
    process_5.join()
    process_6.join()
    process_7.join()
    process_8.join()
    process_9.join()

    items_to_find_list = [10, 1, 666, 5, 10, 1, 666, 5, 5]
    print(' ================================= LINEAR SEARCH START ================================= ')
    reg_time_start = time.time()
    print(sorted_data)
    for item in items_to_find_list:
        find_item(item, sorted_data)
    reg_time_end = time.time() - reg_time_start
    print(' ================================= LINEAR SEARCH END ================================= ')

    print(' ================================= BINARY SEARCH START ================================= ')
    bs_time_start = time.time()
    print(sorted_data)
    for item in items_to_find_list:
        binary_search(item, sorted_data)
    bs_time_end = time.time() - bs_time_start
    print(' ================================= BINARY SEARCH END ================================= ')
    print('Multi processing Regular search took {} seconds\nMulti processing Binary search took {} seconds (BS {}% faster than LS)'.format(mp_reg_time_end, mp_bs_time_end, 100 - (mp_bs_time_end * 100) / (mp_reg_time_end + 0.000001)))
    print('Single processing Regular search took {} seconds\nSingle processing Binary search took {} seconds (BS {}% faster than LS)'.format(reg_time_end, bs_time_end, 100 - (bs_time_end * 100) / (reg_time_end + 0.000001)))
  • Multiprocessing Example
    (currently viewing)