Python's multiprocessing package

This post demonstrates how to use the Python’s multiprocessing package to achieve parallel data generation.


The main program has a chief that spawns multiple worker processes. Each worker spawns a single work process. The work process generates random integer data [1,3].

Each worker has it’s own local queue. When data is generated, it is stored in it’s local queue. When the local queue’s size is greater than 5, the data is retrieved & 0.1 is added to the data, this result is stored in the Chief’s global queue. When the Chief’s global queue’s size is greater than 3, the result is retrieved & printed on screen.


Code on my Github

If Github is not loading the Jupyter notebook, a known Github issue, click here to view the notebook on Jupyter’s nbviewer.


The Worker class:

class Worker(object):
  def __init__(self, worker_id, g_queue):
    self.g_queue = g_queue
    self.worker_id = worker_id
    self.queue = Queue() # local worker queue
    self.work_process = Process(target=self.work, args=())
    self.work_process.start()
    info(worker_id, self.work_process, "Worker")

  def work(self):

    info(self.worker_id, self.work_process, "work")

    while True:
      data = np.random.randint(1,4)
      self.queue.put(data)

      # process data in queue
      if self.queue.qsize() > 5:
        data = self.queue.get()
        result = data + 0.1
        self.g_queue.put(result) # send result to global queue

      time.sleep(1) # work every x sec interval

    return self.w_id  

The Chief class:

class Chief(object):
  def __init__(self, num_workers):
    self.g_queue = Queue() # global queue    
    self.num_workers = num_workers

  def dispatch_workers(self):   
    worker_processes = [Process(target=Worker(w_id, self.g_queue), args=()) for w_id in range(num_workers)]
    return worker_processes

  def result(self):
    if self.g_queue.qsize() > 3:
      result = self.g_queue.get()
      print("result", result)

The main program:

if __name__ == '__main__':  
  print('main parent process id:', os.getppid())
  print('main process id:', os.getpid())

  num_workers = 2
  chief = Chief(num_workers)
  workers_processes = chief.dispatch_workers()

  i = 0
  while True:    
    time.sleep(2) # chk g_queue every x sec interval to get result
    chief.result()
    print("i=", i)

    if i>9:
      break
    i+=1    

A helper display function:

def info(worker_id, process, function_name):
    print("worker_id=", worker_id,
          'module name:', __name__,
          'function name:', function_name,
          'parent process:', os.getppid(),
          'current process id:', os.getpid(),
          'spawn process id:', process.pid)


2020

PBT for MARL

46 minute read

My attempt to implement a water down version of PBT (Population based training) for MARL (Multi-agent reinforcement learning).

Back to top ↑

2019

.bash_profile for Mac

15 minute read

This post demonstrates how to create customized functions to bundle commands in a .bash_profile file on Mac.

DPPO distributed tensorflow

72 minute read

This post documents my implementation of the Distributed Proximal Policy Optimization (Distributed PPO or DPPO) algorithm. (Distributed continuous version)

A3C distributed tensorflow

27 minute read

This post documents my implementation of the A3C (Asynchronous Advantage Actor Critic) algorithm (Distributed discrete version).

Distributed Tensorflow

78 minute read

This post demonstrates a simple usage example of distributed Tensorflow with Python multiprocessing package.

N-step targets

79 minute read

This post documents my implementation of the N-step Q-values estimation algorithm.

Dueling DDQN with PER

49 minute read

This post documents my implementation of the Dueling Double Deep Q Network with Priority Experience Replay (Duel DDQN with PER) algorithm.

Dueling DDQN

24 minute read

This post documents my implementation of the Dueling Double Deep Q Network (Dueling DDQN) algorithm.

DDQN

29 minute read

This post documents my implementation of the Double Deep Q Network (DDQN) algorithm.

DQN

24 minute read

This post documents my implementation of the Deep Q Network (DQN) algorithm.

Back to top ↑