温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

SparkStreaming整合kafka的补充

发布时间:2020-07-06 09:43:00 来源:网络 阅读:586 作者:原生zzy 栏目:大数据

(1)SparkStreaming 整合 kafka 两种方式对比

Direct 方式的优缺点分析

  • 优点:
    • 简化并行(Simplified Parallelism)。不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应。
    • 高效(Efficiency)。基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAheadLog.enable=true,此种方式需要保存两份数据,浪费存储空间也影响效率。而 Direct 方式则不存在这个问题。
    • 强一致语义(Exactly-once semantics)。High-level 数据由 Spark Streaming 消费,但是Offsets 则是由 Zookeeper 保存。通过参数配置,可以实现 at-least once 消费,此种情况有重复消费数据的可能。
    • 降低资源。Direct 不需要 Receivers,其申请的 Executors 全部参与到计算任务中;而Receiver-based 则需要专门的 Receivers 来读取 Kafka 数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
    • 降低内存。Receiver-based 的 Receiver 与其他 Exectuor 是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高 Receiver 的内存,但是参与计算的 Executor 并无需那么多的内存。而 Direct 因为没有 Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
  • 缺点:
    • 提高成本。Direct 需要用户采用 checkpoint 或者第三方存储来维护 offsets,而不像Receiver-based 那样,通过 ZooKeeper 来维护 Offsets,此提高了用户的开发成本。
    • 监控可视化。Receiver-based 方式指定 topic 指定 consumer 的消费情况均能通过ZooKeeper 来监控,而 Direct 则没有这种便利,不能自动保存 offset 到 zookeeper,如果做到监控并可视化,则需要投入人力开发。
      Receiver 方式的优缺点分析
  • 优点:
    • 专注计算。Kafka 的 high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护 consumer 的 offsets,这减少用户的工作量以及代码量而且相对比较简单。
  • 缺点:
    • 防数据丢失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable参数,配置 spark.streaming.receiver.writeAheadLog.enable 参数,每次处理之前需要将该batch 内的日志备份到 checkpoint 目录中,这降低了数据处理效率,反过来又加重了Receiver 端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
    • 单 Receiver 内存。由于 receiver 也是属于 Executor 的一部分,那么为了提高吞吐量
    • 重复消费。在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新 offset的情况,这导致数据重复消费。
    • Receiver 和计算的 Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver 则在一直接收数据,这非常容易导致程序崩溃。

      (2)对kafka消费的offset的管理

  • spark自带的checkpoint:
    • 启用spark streaming的checkpoint是存储偏移量的最简单方法
    • 流式checkpoint专门保存用户应用程序的状态
    • 但是checkpoint的目录是不能共享的,无法跨越应用程序进行恢复
    • 一般不使用checkpoint管理offset
  • 使用zookeeper管理offset
    • 如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset
    • 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
  • 使用hbase保存offset
    • Rowkey的设计:topic名称 + groupid + streaming的batchtime.milliSeconds
  • 使用hdfs管理offset:当然这种情况不推荐使用,因为在hdfs中会生成大量的小文件,导致,hdfs的性能急剧下降

    (3)Driver的HA

      介绍:他能够在driver失败的时候,通过读取checkpoint目录下的元数据,恢复当前streamingContext对象的状态;它能够察觉到driver进程异常退出之后,自动重启。
      具体流程:当第一次运行程序时,发现checkpoint中没有数据,则根据定义的函数来第一次创建StreamingContext对象,当程序异常退出的时候,此时会根据checkpoint中的元数据恢复一个StreamingContext对象,达到异常退出之前的状态,而实现异常退出并自动启动则是sparkStreaming应用程序对driver进行监控,并且在他失败的时候感知,并进行重启。
      必要条件
      - spark-submit提交作业的时候,必须是集群模式(cluster),并且必须在spark-standalong下。

    spark-submit \
    --class com.aura.mazh.spark.streaming.kafka.SparkStreamDemo_Direct \
    //这里只能使用spark的standalong模式,所以配置为spark集群
    --master spark://hadoop02:7077,hadoop04:7077 \
    --driver-memory 512m \
    --total-executor-cores 3 \
    --executor-memory 512m \
    #这句代码一定要加,他可以使异常退出的driver程序,重新启动
    --supervise \   
    --name SparkStreamDemo_Direct \
    --jars /home/hadoop/lib/kafka_2.11-0.8.2.1.jar,\
    /home/hadoop/lib/metrics-core-2.2.0.jar,\
    /home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\
    /home/hadoop/lib/spark-streaming-kafka-0-8_2.11-2.3.2.jar,\
    /home/hadoop/lib/zkclient-0.3.jar \
    /home/hadoop/original-spark-1.0-SNAPSHOT.jar \
    spark://hadoop02:7077,hadoop04:7077

      - 需要添加--supervise \,才能实现失败自启动
      - 需要配置checkpoint目录,并且是存储在hdfs上,jar也要放置在hdfs上

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI