温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spark中ContinuousExecution执行流程是怎么样的

发布时间:2021-12-16 11:22:29 来源:亿速云 阅读:135 作者:小新 栏目:云计算

这篇文章主要介绍Spark中ContinuousExecution执行流程是怎么样的,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

最重要的是看ContinuousExecution怎么重写LogicalPlan的,详细代码不贴了,最后是创建了Sink类型的LogicalPlan。

    val writer = sink.createStreamWriter(
      s"$runId",
      triggerLogicalPlan.schema,
      outputMode,
      new DataSourceOptions(extraOptions.asJava))
    val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)

    val reader = withSink.collect {
      case DataSourceV2Relation(_, r: ContinuousReader) => r
    }.head

这里的sink可以看成就是DataSource。然后用withSink作为入参创建了IncrementalExecution。

triggerLogicalPlan是StreamingDataSourceV2Relation类。

IncrementalExecution本身没啥,只是在每一个处理的时候包装了一些额外的辅助处理而已。

WriteToDataSourceV2的作用是将triggerLogicalPlan的物理计划的执行结果通过writer写入到外部存储中,所有这里也不看WriteToDataSourceV2了,就看看triggerLogicalPlan的对应的物理计划是什么,前面说过了它对应的逻辑计划是:StreamingDataSourceV2Relation。

直接找是不是StreamingDataSourceV2Relation对应的物理计划的,所以我们先看看StreamingDataSourceV2Relation类的定义:

class StreamingDataSourceV2Relation(
    output: Seq[AttributeReference],
    reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
  override def isStreaming: Boolean = true
}

原来是DataSourceV2Relation的子类啊!

直接找DataSourceV2Relation的物理计划吧,在DataSourceV2Strategy.scala文件中定义了。

object DataSourceV2Strategy extends Strategy {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case DataSourceV2Relation(output, reader) =>
      DataSourceV2ScanExec(output, reader) :: Nil

    case WriteToDataSourceV2(writer, query) =>
      WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

    case _ => Nil
  }
}

DataSourceV2Relation对应的物理计划是DataSourceV2ScanExec。

DataSourceV2ScanExec的代码也不多。

DataSourceV2ScanExec是用DataSourceReader来作为数据源的读取器的,它的inputRDDs返回的是DataSourceRDD或者ContinuousDataSourceRDD,ContinuousDataSourceRDD肯定是对应的ContinuousExecution,其他方式就是DataSourceRDD了。

不管是DataSourceRDD或者ContinuousDataSourceRDD,他们的读取数据源的类都是一样的,都是DataSourceReader过来的。DataSourceRDD或者ContinuousDataSourceRDD这两者的代码都非常少,一看就知道怎么回事了。

以上是“Spark中ContinuousExecution执行流程是怎么样的”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI