温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

apache spark中怎么实现端对端的 exactly once

发布时间:2021-12-13 10:24:44 来源:亿速云 阅读:161 作者:小新 栏目:大数据

这篇文章主要介绍了apache spark中怎么实现端对端的 exactly once,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

Exactly Once的实现

如果实时作业要实现端对端的 exactly once 则需要数据源、数据处理与数据存储的三个阶段都保证 exactly once 的语义。目前基 于Kafka Direct API加上Spark RDD 算子精确一次的保证能够实现端对端的 exactly once 的语义。

在数据存储阶段一般实现 exactly once 需要保证存储的过程是幂等操作或事务操作。很多系统本身就支持了幂等操作,比如相同数据写 hdfs 同一个文件,这本身就是幂等操作,保证了多次操作最终获取的值还是相同;HBase、ElasticSearch 与 redis 等都能够实现幂等操作。对于关系型数据库的操作一般都是能够支持事务性操作。

官方在创建 DirectKafkaInputStream 时只需要输入消费 Kafka 的 From Offset,然后其自行获取本次消费的 End Offset,也就是当前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消费从上次的 End Offset 开始消费。

当程序宕机或重启任务后,这其中存在一些问题。如果在数据处理完成前存储 Offset,则可能存在作业处理数据失败与作业宕机等情况,重启后会无法追溯上次处理的数据导致数据出现丢失。如果在数据处理完成后存储 Offset,但是存储 Offset 过程中发生失败或作业宕机等情况,则在重启后会重复消费上次已经消费过的数据。

而且此时又无法保证重启后消费的数据与宕机前的数据量相同数据相当,这又会引入另外一个问题,如果是基于聚合统计指标作更新操作,这会带来无法判断上次数据是否已经更新成功。

所以在 muise spark core 中我们加入了自己的实现用以保证 Exactly once 的语义。具体的实现是我们对 Spark 源码进行了改造,保证在创建 DirectKafkaInputStream 可以同时输入 From Offset 与 End Offset,并且我们在存储 Kafka Offset 的时候保存了每个批次的起始Offset 与结束 Offset,具体格式如下:

apache spark中怎么实现端对端的 exactly once

如此做的用意在于能够确保无论是宕机还是人为重启,重启后的第一个批次与重启前的最后一个批次数据一模一样。这样的设计使得后面用户在后面对于第一个批次的数据处理非常灵活可变,如果用户直接忽略第一个批次的数据,那此时保证的是 at most once 的语义,因为我们无法获知重启前的最后一个批次数据操作是否有成功完成。

如果用户依照原有逻辑处理第一个批次的数据,不对其做去重操作,那此时保证的是 at least once 的语义,最终结果中可能存在重复数据;最后如果用户想要实现 exactly once,muise spark core 提供了根据topic、partition 与 offset 生成 UID 的功能。

只要确保两个批次消费的 Offset 相同,则最终生成的 UID 也相同,用户可以根据此 UID 作为判断上个批次数据是否有存储成功的依据。下面简单的给出了重启后第一个批次操作的行为。

apache spark中怎么实现端对端的 exactly once

002

Metrics系统

Musie spark core 基于 Spark 本身的 metrics 系统进行了改造,添加了许多定制的 metrics,并且向用户暴露了 metrics 注册接口,用户可以非常方便的注册自己的 metrics 并在程序中更新 metrics 的数值。最后所有的 metrics 会根据作业设定的批次间隔写入 Graphite,基于公司定制的预警系统进行报警,前端可以通过 Grafana 展现各项 metrics 指标。

Muise spark core本身定制的metrics包含以下三种:

  • Fail 批次时间内 spark task 失败次数超过4次便报警,用于监控程序的运行状态。

  • Ack 批次时间内 spark streaming 处理的数据量小0便报警,用于监控程序是否在正常消费数据。

  • Lag 批次时间内数据消费延迟大于设定值便报警。

其中由于我们大部分作业开启了 Back Pressure 功能,这就导致在Spark UI 中看到每个批次数据都能在正常时间内消费完成,然而可能此时 kafka 中已经积压了大量数据,故每个批次我们都会计算当前消费时间与数据本身时间的一个平均差值,如果这个差值大于批次时间,说明本身数据消费就已经存在了延迟。

下图展现了预警系统中,基于用户自定义注册的Metrics以及系统定制的Metrics进行预警。

apache spark中怎么实现端对端的 exactly once

003

容错

其实在上面 Exactly Once 一章中已经详细的描述了 muise spark core如何在程序宕机后能够保证数据正确的处理。但是为了能够让 Spark Sreaming 能够长时间稳定的运行在Yarn集群上,还需要添加许多配置,感兴趣的朋友可以查看:Long running Spark Streaming Jobs on Yarn Cluster。

除了上述容错保证之外,Muise Portal(后面会讲)也提供了对 Spark Streaming 作业定时检测的功能。目前每过5分钟对当前所有数据库中状态标记为 Running 的 Spark Streaming 作业进行状态检测,通过Yarn提供的REST APIs可以根据每个作业的Application Id查询作业在Yarn上的状态,如果状态处于非运行状态,则会尝试重启作业。

感谢你能够认真阅读完这篇文章,希望小编分享的“apache spark中怎么实现端对端的 exactly once”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI