2021-05-12 14:32:11
Tensorflow 分散式部署簡介
tensorflow-0.8 的一大特性為可以部署在分散式的叢集上,本文的內容由Tensorflow的分散式部署手冊翻譯而來,該手冊連結為TensorFlow分散式部署手冊
分散式TensorFlow
本文介紹了如何搭建一個TensorFlow伺服器的叢集,並將一個計算圖部署在該分散式叢集上。以下操作建立在你對 TensorFlow的基礎操作已經熟練掌握的基礎之上。
Hello world的分散式範例的編寫
以下是一個簡單的TensorFlow分散式程式的編寫範例
# Start a TensorFlow server as a single-process "cluster".
$ Python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target) # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server() 會在本地建立一個單進程叢集,該叢集中的服務預設為啟動狀態。
建立叢集(cluster)
TensorFlow中的叢集(cluster)指的是一系列能夠對TensorFlow中的圖(graph)進行分散式計算的任務(task)。每個任務是同服務(server)相關聯的。TensorFlow中的服務會包含一個用於建立session的主節點和一個用於圖運算的工作節點。另外, TensorFlow中的叢集可以拆分成一個或多個作業(job), 每個作業可以包含一個或多個任務。下圖為作者對叢集內關係的理解。
建立叢集的必要條件是為每個任務啟動一個服務。這些任務可以執行在不同的機器上,但你也可以在同一台機器上啟動多個任務(比如說在本地多個不同的GPU上執行)。每個任務會做如下的兩步工作:
- 建立一個
tf.train.ClusterSpec
用於對叢集中的所有任務進行描述,該描述內容對於所有任務應該是相同的。 - 建立一個
tf.train.Server
並將tf.train.ClusterSpec
中的引數傳入建構函式,並將作業的名稱和當前任務的編號寫入本地任務中。
建立tf.train.ClusterSpec
的具體方法
tf.train.ClusterSpec
的傳入引數是作業和任務之間的關係對映,該對映關係中的任務是通過ip地址和埠號表示的。具體對映關係如下表所示:
tf.train.ClusterSpec construction | Available tasks |
---|---|
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) |
/job:local/task:0 local |
tf.train.ClusterSpec({ "worker": [ "worker0.example.com:2222", "worker1.example.com:2222", "worker2.example.com:2222" ], "ps": [ "ps0.example.com:2222", "ps1.example.com:2222" ]}) |
/job:worker/task:0 /job:worker/task:1 /job:worker/task:2 /job:ps/task:0 /job:ps/task:1 |
為每一個任務建立tf.train.Server
的範例
每一個tf.train.Server 物件都包含一個本地裝置的集合, 一個向其他任務的連線集合,以及一個可以利用以上資源進行分散式計算的“對談目標”(“session target“)。每一個服務程式都是一個指定作業的一員,其在作業中擁有自己獨立的任務號。每一個服務程式都可以和叢集中的其他任何服務程式進行通訊。
以下兩個程式碼片段講述了如何在原生的2222和2223兩個埠上設定不同的任務。
# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
註 :當前手動設定任務節點還是一個比較初級的做法,尤其是在遇到較大的叢集管理的情況下。tensorflow團隊正在開發一個自動程式化設定任務的節點的工具。例如:叢集管理工具Kubernetes。如果你希望tensorflow支援某個特定的管理工具,可以將該請求發到GitHub issue 裡。
為模型指定分散式的裝置
為了將某些操作執行在特定的進程上,可以使用tf.device() 函數來指定程式碼執行在CPU或GPU上。例如:
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
在上面的例子中,引數的宣告是通過ps作業中的兩個任務完成的,而模型計算相關的部分則是在work作業裡進行的。TensorFlow將在內部實現作業間的資料傳輸。(ps到work間的向前傳遞;work到ps的計算梯度)
計算流程
在上面的這個稱為“資料並行化”的公用訓練設定項裡,一般會包含多個用於對不同資料大小進行計算的任務(構成了work
作業) 和 一個或多個分布在不同機器上用於不停更新共用引數的任務(構成了ps
作業)。 所有的這些任務都可以執行在不同的機器上。實現這養的邏輯有很多的方法,目前TensorFlow團隊採用的是構建連結庫(lib)的方式來簡化模型的工作,其實現了如下幾種方法:
- 圖內的拷貝(In-graph replication). 在這種方法下,用戶端程式會建立一個獨立的
tf.Graph
,該圖中的一系列節點 (tf.Variable
)會通過ps
作業(/job:ps)宣告,而計算相關的多份拷貝會通過work作業(/job:worker)來進行。 - 圖間的拷貝(Between-graph replication). 在這種方法下,每一個任務(
/job:worker
) 都是通過獨立用戶端單獨宣告的。其相互之間結構類似,每一個用戶端都會建立一個相似的圖結構, 該結構中包含的引數均通過ps
作業(/job:ps)進行宣告並使用tf.train.replica_device_setter() 方法將引數對映到不同的任務中。模型中每一個獨立的計算單元都會對映到/job:worker
的原生的任務中。 - 非同步訓練(Asynchronous training). 在這種方法下,每一個圖的備份都會使用獨立的訓練邏輯進行獨立訓練,該方法需要配合上面的兩種方法一同使用。
- 同步訓練(Synchronous training). 在這種方法下,所有的計算任務會讀取當前引數中相同的值並用於並行化的計算梯度,然後將計算結果合併。這種方法需要和圖內的拷貝(In-graph replication)方法(例如,在CIFAR-10 multi-GPU trainer 中我們使用該方法對梯度求平均值) 和圖間的拷貝(Between-graph replication)(例如,
tf.train.SyncReplicasOptimizer
)一同使用。
分散式訓練程式的舉例說明
接下來的程式碼是一個分散式訓練程式的大致程式碼框架,其中實現了圖間的拷貝和非同步訓練兩種方法。該範例中包含了引數服務(parameter server)和工作任務(work task)的程式碼。
import tensorflow as tf
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
# 建立並啟動服務
# 其引數中使用task_index 指定任務的編號
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Assigns ops to the local worker by default.
# 將op 掛載到各個原生的worker上
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Build model...
loss = ...
global_step = tf.Variable(0)
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir="/tmp/train_logs",
init_op=init_op,
summary_op=summary_op,
saver=saver,
global_step=global_step,
save_model_secs=600)
# The supervisor takes care of session initialization, restoring from
# a checkpoint, and closing when done or an error occurs.
with sv.managed_session(server.target) as sess:
# Loop until the supervisor shuts down or 1000000 steps have completed.
step = 0
while not sv.should_stop() and step < 1000000:
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
_, step = sess.run([train_op, global_step])
# Ask for all the services to stop.
sv.stop()
if __name__ == "__main__":
tf.app.run()
使用以下命令可以啟動兩個引數服務和兩個工作任務。(假設上面的python指令碼名字為 train.py)
# On ps0.example.com:
$ python trainer.py
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222
--job_name=worker --task_index=1
名詞解釋
用戶端(Client)
- 用戶端是一個用於建立TensorFlow計算圖並創立與叢集進行互動的對談層
tensorflow::Session
的程式。一般用戶端是通過python或C++實現的。一個獨立的用戶端進程可以同時與多個TensorFlow的伺服器端相連 (上面的計算流程一節),同時一個獨立的伺服器端也可以與多個用戶端相連。
叢集(Cluster)
- 一個TensorFlow的叢集裡包含了一個或多個作業(job), 每一個作業又可以拆分成一個或多個任務(task)。叢集的概念主要用與一個特定的高層次物件中,比如說訓練神經網路,並行化操作多台機器等等。叢集物件可以通過tf.train.ClusterSpec
來定義。
作業(Job)
- 一個作業可以拆封成多個具有相同目的的任務(task),比如說,一個稱之為ps(parameter server,引數伺服器)的作業中的任務主要是儲存和更新變數,而一個名為work(工作)的作業一般是管理無狀態且主要從事計算的任務。一個作業中的任務可以執行於不同的機器上,作業的角色也是靈活可變的,比如說稱之為”work”的作業可以儲存一些狀態。
主節點的服務邏輯(Master service)
- 一個RPC 服務程式可以用來遠端連線一系列的分散式裝置,並扮演一個對談終端的角色,主服務程式實現了一個tensorflow::Session
的藉口並負責通過工作節點的服務進程(worker service)與工作的任務進行通訊。所有的主服務程式都有了主節點的服務邏輯。
任務(Task)
- 任務相當於是一個特定的TesnsorFlow伺服器端,其相當於一個獨立的進程,該進程屬於特定的作業並在作業中擁有對應的序號。
TensorFlow伺服器端(TensorFlow server)
- 一個執行了tf.train.Server
範例的進程,其為叢集中的一員,並有主節點和工作節點之分。
工作節點的服務邏輯(Worker service)
- 其為一個可以使用本地裝置對部分圖進行計算的RPC 邏輯,一個工作節點的服務邏輯實現了worker_service.proto
介面, 所有的TensorFlow伺服器端均包含工作節點的服務邏輯。
本文永久更新連結地址:http://www.linuxidc.com/Linux/2016-07/133217.htm
相關文章