简述

Cloud云服务日益普遍,其后的支撑但是容器化的集群,所谓容器化(Containerization)就是将各种系统或用户服务,通过熟知的容器编排(Orchestration)工具,如docker、kubernetes等,部署运行,而不是直接与服务器直接打交道,容器化的服务大大减轻了服务部署、运维及资源控制等的繁琐程度,同时也使服务更加的健壮。

而Spark 2.3之前的版本,如果想要使用容器化的部署方式,只能通过mesos、yarn,但还不支持在Kubenetes集群上部署,至于他们之间的差异,或是优缺点,这里就不过多分析。总之,kubernetes凭借其优秀的代码质量、模块化设计、松耦合的插件功能等,在整个容器化领域已经收获很多的赞誉。

因此Spark官方2.4版本,通过External Manger的方式,加入了对kubernetes任务和资源调度的功能,且功能相对稳定,但整个集群的部署和用户作业的提交更加偏向于Spark传统的部署和提交方式,这对于熟悉Kubernetes用户而不是很熟悉Spark的用户,或是相反的用户来说,在技术栈上存在一差异,因此同样是Google的团队,开源了Spark Operator项目,从Kubernetes的视角封装了Spark on Kubertes的功能,有兴趣的同学可以深入了解。

Spark Kubernetes模块设计及实现

这一节的内容根据Spark 2.4.3官方kubernetes模块的实现分析,如果有发现与自己的理解不同的地方,请注意版本号,或是随意指出文章的错误。

核心类UML图

在这里插入图片描述
图1. Spark Kubernetes模块核心类的UML图

作业提交运行流程

用户通过spark-submit命令,提交一个Spark应用,比如spark-submit --master k8s://http://${k8s_apiserver_address} --deploy-mode cluster myapp.jar,则会在本地启动一个JAVA进程,进入SparkSubmit类的实例化过程,完成命令行参数进行解析。

Spark 2.4.x目前只支持以Cluster的部署模式运行在Kubernetes集群,因此默认必须指定–deploy-mode cluster。

SparkSubmit类解析到,当前作业希望被提交到一个k8s集群,同时的部署模式Cluster,也就意味着Driver进程需要在对应的集群中启动,而不是启动在本地,因此就会创建一个KubernetesClientApplication类的实例,用来向指定的Master提交作业(即k8s集群中的apiserver)。KubernetesClientApplication类继承自Spark中的SparkApplication类,它是一个作业程序的入口类,因此也就由这个类负责提交Driver进程的描述信息(Pod的描述信息,如环境变量、启动命令等),然后发送给k8s的apiserver拉起对应的Pod,并监听Driver容器直到作业完成。

可以发现KubernetesClientApplication类型的实例的行为,不同于提交任务到Standalone集群的过程,会创建AppClient实例并与Spark Master通信,(实际上AppClient就是一个RpcEndpoint),最终通过Master在随机的在可用的Worker节点上拉起Driver进程。

K8S API Server接收到创建Driver Pod的请求,就会在集群中合适的结点上拉起一个名字为xx-driver的Pod,同时也会将配置信息写入到Driver容器内的文件/opt/spark/conf/spark.properties中,并将配置文件映射成一个k8s中的ConfigMap对象,以便Executor Pod读取配置信息。

在生成Driver容器的启动命令时,会设置待运行的Driver角色的工作模式为client,即–deploy-mode client,以便driver进程启动在当前容器内。

Diver进程启动时,会执行容器中已经被放置的entrypoint.sh脚本,根据待启动的角色类的不同,driver/driver-py/driver-r/executor,执行不同的命令,(这里对应于driver类型),真正地执行用户指定的入口类,也就是用户提交的作业程序。

用户作业进程启动后,在创建SparkSession时会实例化SparkContext类,这个类会实例化所有与作业执行相关所必需的对象。由于即将运行的作业是一个k8s的作业,因此会实例化KubernetesClusterSchedulerBackend类,进行资源的分配。

KubernetesClusterSchedulerBackend继承自Spark默认的executor调度后端CoarseGrainedSchedulerBackend类,负责在K8s集群上的资源分配及状态监控,即executor容器的管理,而至于如何具体地生成、分发任务,还是使用Spark默认的实现方式,即TaskSchedulerImpl

KubernetesClusterSchedulerBackend启动后,就会开始在集群拉起指定数据的Executor容器,每个Executor容器的创建过程与创建Driver容器的过程类似,不同的地方在于executor容器启动执行的是如下的命令:

    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

当一定数量的executor容器也拉起后,就可以开始执行用户的编写的程序了。比如有以下的用户代码:

// SparkContext已经实例化,sql(...)方法会将sql语句解析成逻辑计划,
// 被封装成Dataset[T] / DataFrame对象,但不会执行作业的子任务。
val df =  sparkSession.sql("select * from test")
// 调用collect()方法,即一个action方法,触发SparkContext.runJob(...)方法,
// 最终调用DagScheduler.runJob(...)方法,完成Stage(ResultStage和ShuffleMapStage)
// 的划分及Task(ResultTask和ShuffleMapTask)的创建,及调度执行。
df.collect()

至于Spark Job的详细执行流程,可以浏览我之前的博客文章

Spark on Kubernetes的设计实现

Spark 2.4中kubernetes模块的设计实现,可以说更多地站在了熟悉Spark开发模式的用户的角度来设计实现的,从提交作业的命令参数的方式,参数不仅包含spark本身的参数,也包含了创建k8s资源对象,如Pod,所需要的参数;到跟踪整个作业生命周期的流程等,这些细节的地方目前看是如此的繁琐、凌乱且不系统,因此Google Cloud Platform开源了另外一个项目Spark Operator,以期解决这些问题。

Spark Operator大体的设计方向是,基于Spark kubernetes框架,通过外包spark-submit命令并利用Kubernetes的Operator开发模式的方式,将Spark提交作业的行为,实现成kubectl命令可支持的方式,同时将一个Spark Job抽象成SparkApplication资源对象(Custom Resource Definition,CRD,用户资源定义),以期简化原来的过程,并提供更加友好的作业交互过程。

整体架构图

整个过程的详述,可以翻阅官方github的地址,这里就不翻译了。
在这里插入图片描述
图2. Spark Operator整体架构图

简单来说,通过Spark operator服务,k8s用户可以使用kubectl命令,提交一个spark-pi.yaml配置文件(定义了driver和executor的运行时的各种参数),然后由Operator服务,图2中中间浅蓝色部分,负责命令和参数拼接、替换,最终完成作业集群的创建。
用户也可以通过kubectl命令,跟踪SparkApplication的生命周期,也就是Spark Job的生命周期。

作业提交及运行流程

Spark Operator提供了Kubernetes传统提交作业的方式,即通过yaml来描述一个SparkApplication作业的具体细节,然后通过命令行工具kubectl提交这个yaml文件到APIServer。

APIServer接收到用户自定义(CRD)的对象SparkApplication的创建请求后并创建了对象后,Spark Operator就会监听到这个事件,并可以拿到新创建的CRD对象。由于CRD对象里面包含了运行Spark Job所需要的所有参数,因此就可以从这个对象构建spark-submit命令,然后提交执行,提交执行的过程同前面一节分析的过程是一样的。从这个地方可以看到,SparkOpeator帮我们做了Spark参数和k8s参数间映射的过程,省了不少力。

这里要注意的是,Submission Runner提交作业是在Controller的worker线程中执行的,为了在执行spark-submit命令后,能够处理其它的事件,构建命令时添加了config.SparkWaitAppCompletion(= false),作业一旦提交成功或是失败就返回,而不用等待整个作业的完成

当driver和executor被拉起后,Spark Operator通过监听Driver容器的完成状态,来定义事个SparkApplication的状态,也可以说是Spark Job的状态;而通过监听Executor容器的状态,来进行资源的分配或是回收。因此Spark原本on kubernetes的能力是有限的,如果不定义kuberntes集群支持的参数,就不能使用相应的特性,而通过Operator的封装,使得用户可以更加自由的使用Kubernetes特性,比如标签选择、亲和性、失败策略等。

最终如果整个Spark作业执行完成,不论是失败或是成功,如果没有配置立即删除作业的参数,用户可以通过kubectl命令,随时查看SparkApplication的状态,而不像是Spark官方默认的做法那样,直接删除Pod。

总 结

Spark 3.0版本的kubernetes功能更加健壮,同时也支持通过Pod的模板yaml文件来配置参数等其它Kubernetes的主流特性。

Logo

开源、云原生的融合云平台

更多推荐