这篇文章主要介绍“Spark 3.0 on Kubernetes的模式是怎样的”,在日常操作中,相信很多人在Spark 3.0 on Kubernetes的模式是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark 3.0 on Kubernetes的模式是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
Spark 3.0发布后,对Kubernetes的原生支持得到大幅增强,从而方便了Spark在云原生环境中的快速部署和运行实例的管理。
Spark 运行在 Kubernetes 集群上的第一种可行方式是将 Spark 以 Standalone 模式运行,但是很快社区就提出使用 Kubernetes 原生 Scheduler 的运行模式,也就是 Native 的模式。
Native 模式简而言之就是将 Driver 和 Executor Pod 化,用户将之前向 YARN 提交 Spark 作业的方式提交给 Kubernetes 的 apiserver,提交命令如下:
$ bin/spark-submit \ --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image=<spark-image> \ local:///path/to/examples.jar
其中 master 就是 kubernetes 的 apiserver 地址。提交之后整个作业的运行方式如下,先将 Driver 通过 Pod 启动起来,然后 Driver 会启动 Executor 的 Pod。这些方式很多人应该都了解了,就不赘述了,详细信息可以参考:https://spark.apache.org/docs/latest/running-on-kubernetes.html 。
除了这种直接向 Kubernetes Scheduler 提交作业的方式,还可以通过 Spark Operator 的方式来提交。Operator 在 Kubernetes 中是一个里程碑似的产物。在 Kubernetes 刚面世的时候,关于有状态的应用如何部署在 Kubernetes 上一直都是官方不愿意谈论的话题,直到 StatefulSet 出现。StatefulSet 为有状态应用的部署实现了一种抽象,简单来说就是保证网络拓扑和存储拓扑。但是状态应用千差万别,并不是所有应用都能抽象成 StatefulSet,强行适配反正加重了开发者的心智负担。
然后 Operator 出现了。我们知道 Kubernetes 给开发者提供了非常开放的一种生态,你可以自定义 CRD,Controller 甚至 Scheduler。而 Operator 就是 CRD + Controller 的组合形式。开发者可以定义自己的 CRD,比如我定义一种 CRD 叫 EtcdCluster 如下:
apiVersion: "etcd.database.coreos.com/v1beta2"kind: "EtcdCluster"metadata: name: "example-etcd-cluster"spec: size: 3 version: "3.1.10" repository: "quay.io/coreos/etcd"
提交到 Kubernetes 之后 Etcd 的 Operator 就针对这个 yaml 中的各个字段进行处理,最后部署出来一个节点规模为 3 个节点的 etcd 集群。你可以在 github 的这个 repo:https://github.com/operator-framework/awesome-operators 中查看目前实现了 Operator 部署的分布式应用。
Google 云平台,也就是 GCP 在 github 上面开源了 Spark 的 Operator,repo 地址:GoogleCloudPlatform/spark-on-k8s-operator。Operator 部署起来也是非常的方便,使用 Helm Chart 方式部署如下,你可以简单认为就是部署一个 Kubernetes 的 API Object (Deployment)。
$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator $ helm install incubator/sparkoperator --namespace spark-operator
这个 Operator 涉及到的 CRD 如下:
ScheduledSparkApplication |__ ScheduledSparkApplicationSpec |__ SparkApplication |__ ScheduledSparkApplicationStatus |__ SparkApplication |__ SparkApplicationSpec |__ DriverSpec |__ SparkPodSpec |__ ExecutorSpec |__ SparkPodSpec |__ Dependencies |__ MonitoringSpec |__ PrometheusSpec |__ SparkApplicationStatus |__ DriverInfo
如果我要提交一个作业,那么我就可以定义如下一个 SparkApplication 的 yaml,关于 yaml 里面的字段含义,可以参考上面的 CRD 文档。
apiVersion: sparkoperator.k8s.io/v1beta1kind: SparkApplicationmetadata: ...spec: deps: {} driver: coreLimit: 200m cores: 0.1 labels: version: 2.3.0 memory: 512m serviceAccount: spark executor: cores: 1 instances: 1 labels: version: 2.3.0 memory: 512m image: gcr.io/ynli-k8s/spark:v2.4.0 mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar mainClass: org.apache.spark.examples.SparkPi mode: cluster restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 type: Scalastatus: sparkApplicationId: spark-5f4ba921c85ff3f1cb04bef324f9154c9 applicationState: state: COMPLETED completionTime: 2018-02-20T23:33:55Z driverInfo: podName: spark-pi-83ba921c85ff3f1cb04bef324f9154c9-driver webUIAddress: 35.192.234.248:31064 webUIPort: 31064 webUIServiceName: spark-pi-2402118027-ui-svc webUIIngressName: spark-pi-ui-ingress webUIIngressAddress: spark-pi.ingress.cluster.com executorState: spark-pi-83ba921c85ff3f1cb04bef324f9154c9-exec-1: COMPLETED LastSubmissionAttemptTime: 2018-02-20T23:32:27Z
提交作业。
$ kubectl apply -f spark-pi.yaml
对比来看 Operator 的作业提交方式似乎显得更加的冗长复杂,但是这也是一种更 kubernetes 化的 api 部署方式,也就是 Declarative API,声明式 API。
基本上,目前市面的大部门公司都是使用上面两种方式来做 Spark on Kubernetes 的,但是我们也知道在 Spark Core 里面对 Kubernetes 的这种 Native 方式支持其实并不是特别成熟,还有很多可以改善的地方,下面简单举例几个地方:
资源调度器可以简单分类成集中式资源调度器和两级资源调度器。两级资源调度器有一个中央调度器负责宏观资源调度,对于某个应用的调度则由下面分区资源调度器来做。两级资源调度器对于大规模应用的管理调度往往能有一个良好的支持,比如性能方面,缺点也很明显,实现复杂。其实这种设计思想在很多地方都有应用,比如内存管理里面的 tcmalloc 算法,Go 语言的内存管理实现。大数据的资源调度器 Mesos/Yarn,某种程度上都可以归类为两级资源调度器。
集中式资源调度器对于所有的资源请求进行响应和决策,这在集群规模大了之后难免会导致一个单点瓶颈,毋庸置疑。但是 Kubernetes 的 scheduler 还有一点不同的是,它是一种升级版,一种基于共享状态的集中式资源调度器。Kubernetes 通过将整个集群的资源缓存到 scheduler 本地,在进行资源调度的时候在根据缓存的资源状态来做一个 “乐观” 分配(assume + commit)来实现调度器的高性能。
Kubernetes 的默认调度器在某种程度上并不能很好的 match Spark 的 job 调度需求,对此一种可行的技术方案是再提供一种 custom scheduler 或者直接重写,比如 Spark on Kubernetes Native 方式的参与者之一的大数据公司 Palantir 就开源了他们的 custom scheduler,github repo: https://github.com/palantir/k8s-spark-scheduler。
由于 Kubernetes 的 Executor Pod 的 Shuffle 数据是存储在 PV 里面,一旦作业失败就需要重新挂载新的 PV 从头开始计算。针对这个问题,Facebook 提出了一种 Remote Shuffle Service 的方案,简单来说就是将 Shuffle 数据写在远端。直观感受上来说写远端怎么可能比写本地快呢?而写在远端的一个好处是 Failover 的时候不需要重新计算,这个特性在作业的数据规模异常大的时候比较有用。
基本上现在可以确定的是 Kubernetes 会在集群规模达到五千台的时候出现瓶颈,但是在很早期的时候 Spark 发表论文的时候就声称 Spark Standalone 模式可以支持一万台规模。Kubernetes 的瓶颈主要体现在 master 上,比如用来做元数据存储的基于 raft 一致性协议的 etcd 和 apiserver 等。对此在刚过去的 2019 上海 KubeCon 大会上,阿里巴巴做了一个关于提高 master 性能的 session: 了解 Kubernetes Master 的可扩展性和性能,感兴趣的可以自行了解。
在 Kubernetes 中,资源分为可压缩资源(比如 CPU)和不可压缩资源(比如内存),当不可压缩资源不足的时候就会将一些 Pod 驱逐出当前 Node 节点。国内某个大厂在使用 Spark on kubernetes 的时候就遇到因为磁盘 IO 不足导致 Spark 作业失败,从而间接导致整个测试集都没有跑出来结果。如何保证 Spark 的作业 Pod (Driver/Executor) 不被驱逐呢?这就涉及到优先级的问题,1.10 之后开始支持。但是说到优先级,有一个不可避免的问题就是如何设置我们的应用的优先级?常规来说,在线应用或者 long-running 应用优先级要高于 batch job,但是显然对于 Spark 作业来说这并不是一种好的方式。
Spark on Yarn 的模式下,我们可以将日志进行 aggregation 然后查看,但是在 Kubernetes 中暂时还是只能通过 Pod 的日志查看,这块如果要对接 Kubernetes 生态的话可以考虑使用 fluentd 或者 filebeat 将 Driver 和 Executor Pod 的日志汇总到 ELK 中进行查看。
Prometheus 作为 CNCF 毕业的第二个项目,基本是 Kubernetes 监控的标配,目前 Spark 并没有提供 Prometheus Sink。而且 Prometheus 的数据读取方式是 pull 的方式,对于 Spark 中 batch job 并不适合使用 pull 的方式,可能需要引入 Prometheus 的 pushgateway。
到此,关于“Spark 3.0 on Kubernetes的模式是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。