Distributed Tensorflow

This post demonstrates a simple usage example of distributed Tensorflow with Python multiprocessing package.


Distributed Tensorflow with Python multiprocessing package.

A tf.FIFOQueue is used as a storage across processes.

Code on my Github

If Github is not loading the Jupyter notebook, a known Github issue, click here to view the notebook on Jupyter’s nbviewer.


Cluster definition:

2 workers, 1 parameter server.

cluster = tf.train.ClusterSpec({
    "worker": ["localhost:2223",
               "localhost:2224"
              ],
    "ps": ["localhost:2225"]
})

Parameter server function:

A tf.Variable (var) & a tf.FIFOQueue (q) is declared with the parameter server. They are both sharable across processes.

For tf.FIFOQueue to be sharable, it has to be declared with the same device (in this case, the ps device) in both the parameter_server function and the worker function. A shared_name has to be given as well.

The tf.Variable (var) is also declared under the ps device. The value of var is displayed in the first for loop.

At the end of the function, the values stored in q will be displayed in the last for loop.

def parameter_server():
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')        
        q = tf.FIFOQueue(10, tf.float32, shared_name="shared_queue")

    server = tf.train.Server(cluster,
                             job_name="ps",
                             task_index=0)
    sess = tf.Session(target=server.target)

    print("Parameter server: waiting for cluster connection...")
    sess.run(tf.report_uninitialized_variables())
    print("Parameter server: cluster ready!")

    print("Parameter server: initializing variables...")
    sess.run(tf.global_variables_initializer())
    print("Parameter server: variables initialized")

    for i in range(10):
        print("Parameter server: var has value %.1f" % sess.run(var))
        sleep(1.0)
        if sess.run(var) == 10.0:
          break

    sleep(3.0)
    print("ps q.size(): ", sess.run(q.size()))  

    for j in range(sess.run(q.size())):
        print("ps: r", sess.run(q.dequeue()))

    #print("Parameter server: blocking...")
    #server.join() # currently blocks forever    
    print("Parameter server: ended...")

Worker function:

A tf.FIFOQueue (q) is declared with the ps device. A same shared_name is also used.

The tf.Variable (var) is declared under the worker device. It does not have to be declared under the ps device.

The for loop increments the value of var and the values are stored in q.

def worker(worker_n):
    with tf.device("/job:ps/task:0"):
        q = tf.FIFOQueue(10, tf.float32, shared_name="shared_queue")     
    with tf.device(tf.train.replica_device_setter(
                        worker_device='/job:worker/task:' + str(worker_n),
                        cluster=cluster)):
        var = tf.Variable(0.0, name='var')

    server = tf.train.Server(cluster,
                             job_name="worker",
                             task_index=worker_n)
    sess = tf.Session(target=server.target)

    print("Worker %d: waiting for cluster connection..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: cluster ready!" % worker_n)

    while sess.run(tf.report_uninitialized_variables()):
        print("Worker %d: waiting for variable initialization..." % worker_n)
        sleep(1.0)
    print("Worker %d: variables initialized" % worker_n)

    for i in range(5):
        print("Worker %d: incrementing var" % worker_n, sess.run(var))
        sess.run(var.assign_add(1.0))
        qe = q.enqueue(sess.run(var))
        sess.run(qe)
        sleep(1.0)

    print("Worker %d: ended..." % worker_n)

Main program:

Create the processes, run them and finally terminate them in a for loop.

ps_proc = Process(target=parameter_server, daemon=True)
w1_proc = Process(target=worker, args=(0, ), daemon=True)
w2_proc = Process(target=worker, args=(1, ), daemon=True)

ps_proc.start()
w1_proc.start()
w2_proc.start()

ps_proc.join() # only ps need to call join()

for proc in [w1_proc, w2_proc, ps_proc]:
    proc.terminate() # only way to kill server is to kill it's process

print('All done.')            

Output:

WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.

WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.
Parameter server: waiting for cluster connection...
Worker 0: waiting for cluster connection...
Worker 1: waiting for cluster connection...
Worker 1: cluster ready!
Worker 1: waiting for variable initialization...
Parameter server: cluster ready!
Parameter server: initializing variables...
Parameter server: variables initialized
Parameter server: var has value 0.0
Worker 0: cluster ready!
/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:65: DeprecationWarning: The truth value of an empty array is ambiguous. Returning False, but in future this will result in an error. Use `array.size > 0` to check that an array is not empty.
Worker 0: variables initialized
Worker 0: incrementing var 0.0
/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:65: DeprecationWarning: The truth value of an empty array is ambiguous. Returning False, but in future this will result in an error. Use `array.size > 0` to check that an array is not empty.
Worker 1: variables initialized
Worker 1: incrementing var 1.0
Parameter server: var has value 2.0
Worker 0: incrementing var 2.0
Worker 1: incrementing var 3.0
Parameter server: var has value 4.0
Worker 0: incrementing var 4.0
Worker 1: incrementing var 5.0
Parameter server: var has value 6.0
Worker 0: incrementing var 6.0
Worker 1: incrementing var 7.0
Parameter server: var has value 8.0
Worker 0: incrementing var 8.0
Worker 1: incrementing var 9.0
Worker 0: ended...
Worker 1: ended...
ps q.size():  10
ps: r 1.0
ps: r 2.0
ps: r 3.0
ps: r 4.0
ps: r 5.0
ps: r 6.0
ps: r 7.0
ps: r 8.0
ps: r 9.0
ps: r 10.0
Parameter server: ended...
All done.


2020

PBT for MARL

46 minute read

My attempt to implement a water down version of PBT (Population based training) for MARL (Multi-agent reinforcement learning).

Back to top ↑

2019

.bash_profile for Mac

15 minute read

This post demonstrates how to create customized functions to bundle commands in a .bash_profile file on Mac.

DPPO distributed tensorflow

72 minute read

This post documents my implementation of the Distributed Proximal Policy Optimization (Distributed PPO or DPPO) algorithm. (Distributed continuous version)

A3C distributed tensorflow

27 minute read

This post documents my implementation of the A3C (Asynchronous Advantage Actor Critic) algorithm (Distributed discrete version).

Distributed Tensorflow

76 minute read

This post demonstrates a simple usage example of distributed Tensorflow with Python multiprocessing package.

N-step targets

76 minute read

This post documents my implementation of the N-step Q-values estimation algorithm.

Dueling DDQN with PER

49 minute read

This post documents my implementation of the Dueling Double Deep Q Network with Priority Experience Replay (Duel DDQN with PER) algorithm.

Dueling DDQN

24 minute read

This post documents my implementation of the Dueling Double Deep Q Network (Dueling DDQN) algorithm.

DDQN

29 minute read

This post documents my implementation of the Double Deep Q Network (DDQN) algorithm.

DQN

24 minute read

This post documents my implementation of the Deep Q Network (DQN) algorithm.

Back to top ↑