MLSQL是如何集成TensorFlow Cluster的

前言

我们知道MLSQL支持SKLearn,TF等流行的算法框架,不过虽然支持了多个实例同时运行,但其实每个模型都需要跑全部数据。有的时候数据太大,确实是个问题,所以这个时候还是需要引入Cluster的。MLSQL基于Spark,所以问题就变成了如何在Spark里集成TF Cluster了。TFoS 已经实现了类似的功能,但遗憾的是,TFoS完全是用Python编写的,并且每次都需要启动一个新的Spark 实例来运行,overhead 是比较高的。

MLSQL集成TF Cluster

MLSQL集成TF Cluster 的主要优势有:

  1. 一个Spark实例可以运行多个TF Cluster,互不影响。
  2. 可以local模式运行TF Cluster
  3. 数据交互本地化(也可以消费Kafka),假设你配置了10个worker,数据会被切分成十份,然后同步到对应worker的本地目录。
  4. 易用,你只要写一个python脚本,所有调度相关工作全部由MLSQL来完成。

感兴趣的可以参看这个 PR ,看看具体实现源码。

一个示例

load libsvm.`/tmp/william/sample_libsvm_data.txt` as data;

train data as DTFAlg.`/tmp/jack`
where
pythonScriptPath="/tmp/tensorflow-distribute.py"
and `kafkaParam.bootstrap.servers`="127.0.0.1:9092"
and `kafkaParam.topic`="test"
and `kafkaParam.group_id`="g_test-1"
and  keepVersion="true"
and  enableDataLocal="true"
and  dataLocalFormat="json"
and distributeEveryExecutor="false"

and  `fitParam.0.jobName`="worker"
and  `fitParam.0.taskIndex`="0"

and  `fitParam.1.jobName`="worker"
and  `fitParam.1.taskIndex`="1"

and  `fitParam.2.jobName`="ps"
and  `fitParam.2.taskIndex`="0"


and `systemParam.pythonPath`="python"
and `systemParam.pythonVer`="2.7"
;

我们看到,只要配置一个python脚本,然后通过fitParam指定每个节点的jobName,taskIndex即可。

在python脚本中,你可以通过如下方式拿到这些参数:

jobName = param("jobName", "worker")
taskIndex = int(param("taskIndex", "0"))
clusterSpec = json.loads(mlsql.internal_system_param["clusterSpec"])
checkpoint_dir = mlsql.internal_system_param["checkpointDir"]

一个大致的TF脚本如下:

def run():
    # create the cluster configured by `ps_hosts' and 'worker_hosts'
    cluster = tf.train.ClusterSpec(clusterSpec)

    # create a server for local task
    server = tf.train.Server(cluster, job_name=jobName,
                             task_index=taskIndex)

    if jobName == "ps":
        server.join()  # ps hosts only join
    elif jobName == "worker":
       .......

当然,不可避免的,你可能需要用到MonitoredTrainingSession等和集群相关的API。

难点

这个需求我昨天早上提出,下午开始弄,我一开始以为一个下午就能搞定,但是最后还是做到了晚上十一点多,这里有几个问题需要注意:

  1. 用户可能取消任务,如何及时的杀掉TF cluster.
  2. spark 可能异常退出,如何保证也能退出TF cluster
  3. 如何区别对待PS/Worker角色

实现方式

worker需要能够和driver 进行交互。为什么呢?TF启动Cluster的时候,是需要ClusterSpec,也就是每个节点host和port。

Spark在分发Task的时候是并行的,你不知道会分发到哪个节点,并且分发后,你也不知道TF能够在对应的节点获取到哪个端口。为了完成这些信息的收集,需要走如下几个流程:

  1. 每个Task在启动TF Server之前,需要先获取端口,并且占用住,然后上报给Driver,Driver会记住这些。

  2. 接着Task会等待所有的Task都上报完成,然后释放占用的端口,启动对应的TF Server。

    TF Server 完成训练后会上报Driver。

  3. PS会监听是不是所有的Worker都已经完成了工作,如果是,则会自己把自己结束掉。

  4. 最后整个训练结束,并且把训练好的模型发送到HDFS上。

Executor 和Driver 交互,其实MLSQL里已经实现了自己的PRC层。不过因为这次比较简单,只需要单向通讯即可,所以直接基于Driver 的http接口完成。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章