Tuesday, September 22, 2015

用 Docker 搭建 Spark 集群

简介

Spark 是 Berkeley 开发的分布式计算的框架,相对于 Hadoop 来说,Spark 可以缓存中间结果到内存而提高某些需要迭代的计算场景的效率,目前收到广泛关注。
熟悉 Hadoop 的同学也不必担心,Spark 很多设计理念和用法都跟 Hadoop 保持一致和相似,并且在使用上完全兼容 HDFS。但是 Spark 的安装并不容易,依赖包括 Java、Scala、HDFS 等。
通过使用 Docker,可以快速的在本地搭建一套 Spark 环境,方便大家开发 Spark 应用,或者扩展到生产环境。
Spark 的设计理念很简单master 节点负责接收要执行的应用和指令worker 节点负责具体处理
除了自带的集群机制,还兼容集成到其他的集群框架,包括 mesos、yarn 等。

安装

其实官方的源码中已经提供了支持 mesos 的 Dockerfile 文件,在 docker\spark-mesos 下面,可以据此生成 Docker 镜像。
这里,我们采用比较热门的 sequenceiq/docker-spark 镜像,这个镜像已经安装了对 Spark 的完整依赖。
无论官方还是 sequenceiq/docker-spark 镜像,都是基于提前打包的二进制文件来制作的,都可以放心使用。
由于镜像比较大(2+ GB),首先下载该镜像到本地。
$ docker pull sequenceiq/spark:1.4.0
下载完毕后,从 https://github.com/yeasy/docker-compose-files/tree/master/spark_cluster 下载编写好的支持 Spark 集群的 docker-compose.yml 文件,并启动它
$ sudo pip install docker-compose==1.4.0
$ docker-compose up
可以看到类似如下的输出:
Creating sparkcompose_master_1...
Creating sparkcompose_slave_1...
Attaching to sparkcompose_master_1, sparkcompose_slave_1
master_1 | /
master_1 | Starting sshd: [  OK  ]
slave_1  | /
slave_1  | Starting sshd: [  OK  ]
master_1 | Starting namenodes on [master]
slave_1  | Starting namenodes on [5d0ea02da185]
master_1 | master: starting namenode, logging to /usr/local/hadoop/logs/hadoop-root-namenode-master.out
slave_1  | 5d0ea02da185: starting namenode, logging to /usr/local/hadoop/logs/hadoop-root-namenode-5d0ea02da185.out
master_1 | localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-root-datanode-master.out
slave_1  | localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-root-datanode-5d0ea02da185.out
master_1 | Starting secondary namenodes [0.0.0.0]
slave_1  | Starting secondary namenodes [0.0.0.0]
master_1 | 0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-root-secondarynamenode-master.out
master_1 | starting yarn daemons
master_1 | starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-master.out
master_1 | localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-root-nodemanager-master.out
master_1 | starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark--org.apache.spark.deploy.master.Master-1-master.out
slave_1  | 0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-root-secondarynamenode-5d0ea02da185.out
slave_1  | starting yarn daemons
slave_1  | starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-5d0ea02da185.out
slave_1  | localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-root-nodemanager-5d0ea02da185.out
slave_1  | starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark--org.apache.spark.deploy.worker.Worker-1-5d0ea02da185.out
docker-compose 服务起来后我们还可以用 scale 命令来动态扩展 Spark 的 worker 节点数例如
$ docker-compose scale worker=2
Creating and starting 2... done
注意,需要先用 pip 安装 docker-compose,1.4.1 版本有 bug,推荐使用 1.4.0 版本。

docker-compose 文件

我们来解析下 docker-compose 文件,核心内容很简单,包括 master 服务和 worker 服务。
master:
  image: sequenceiq/spark:1.4.0
  hostname: master
  ports:
  - "4040:4040"
  - "8042:8042"
  - "7077:7077"
  - "8080:8080"
  - "8088:8088"
  restart: always
  command: bash /usr/local/spark/sbin/start-master.sh && ping localhost > /dev/null

worker:
  image: sequenceiq/spark:1.4.0
  links:
  - master:master
  expose
  - "8081"
  restart: always
  command: bash /usr/local/spark/sbin/start-slave.sh spark://master:7077 && ping localhost >/dev/null

master 服务

首先,master 服务映射了好几组端口到本地,分别功能为: 4040:Spark 运行任务时候提供 web 界面观测任务的具体执行状况,包括执行到哪个阶段、在哪个 executor 上执行; 8042:Hadoop 的节点管理界面; 7077:Spark 主节点的监听端口,用户可以提交应用到这个端口,worker 节点也可以通过这个端口连接到主节点构成集群; 8080:Spark 的监控界面,可以看到所有的 worker、应用整体信息; * 8088:Hadoop 集群的整体监控界面。
master 服务启动后执行了 bash /usr/local/spark/sbin/start-master.sh 命令来配置自己为 master 节点然后通过 ping 来避免容器退出

worker 服务

类似 master 节点,启动后,执行了 /usr/local/spark/sbin/start-slave.sh spark://master:7077 命令来配置自己为 worker 节点,然后通过 ping 来避免容器退出。
注意,启动脚本后面需要提供 spark://master:7077 参数来指定 master 节点地址。
8081 端口提供的 web 界面,可以看到该 worker 节点上任务的具体执行情况。

执行应用

Spark 推荐用 spark-submit 命令来提交执行的命令基本语法为
spark-submit \
  --class your-class-name \
  --master master_url \
  your-jar-file
  app_params
例如,我们可以使用 spark 自带样例中的计算 Pi 的应用。
在 master 节点上执行命令
/urs/local/spark/bin/spark-submit --master spark://master:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/lib/spark-examples-1.4.0-hadoop2.6.0.jar 1000
最后的参数 1000 表示要计算的迭代次数为 1000 次。
任务运行中,可以用浏览器访问 4040 端口,看到任务被分配到了两个 worker 节点上执行。 
计算过程中也会输出结果,如下:
...
15/09/22 03:07:54 INFO scheduler.TaskSetManager: Finished task 998.0 in stage 0.0 (TID 998) in 201 ms on 172.17.0.49 (998/1000)
15/09/22 03:07:54 INFO scheduler.TaskSetManager: Finished task 999.0 in stage 0.0 (TID 999) in 142 ms on 172.17.0.49 (999/1000)
15/09/22 03:07:54 INFO scheduler.TaskSetManager: Finished task 997.0 in stage 0.0 (TID 997) in 220 ms on 172.17.0.49 (1000/1000)
15/09/22 03:07:54 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/09/22 03:07:54 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:35) finished in 23.149 s
15/09/22 03:07:54 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:35, took 23.544018 s
Pi is roughly 3.1417124
15/09/22 03:07:54 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
15/09/22 03:07:54 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/09/22 03:07:54 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
...

总结

集群系统作为现代 IT 基础架构的重要组成部分,一直是运维部门的“硬骨头”。上线、部署、扩容……规模大了后各方面都可能出问题。而结合 Docker 这样的容器技术,可以很大程度上解耦对周边环境的依赖。如果再配合 mesos 等资源管理系统,可以大大降低人工运维的风险和复杂性。
当然,优秀的系统一定要配合正确的流程。生产环境中如果缺乏一套成熟的 devops 流程,是不可能有高效的生产力的。加班,也就成了常态。
转载请注明。