温馨提示×

Ubuntu HDFS与Kafka如何配合使用

小樊
37
2025-12-24 17:19:11
栏目: 智能运维

Ubuntu 下 HDFS 与 Kafka 配合的常用方案

一、方案总览与适用场景

  • Spark Structured Streaming:从 Kafka 持续消费,做转换后写入 HDFS(支持 Parquet/ORC/CSV 等),适合需要实时计算、窗口聚合、Exactly-once 语义的场景。
  • Apache Flume:以 Kafka Source → Memory/File Channel → HDFS Sink 的管道方式落盘,适合日志归档、解耦生产与存储、按时间/大小滚动文件。
  • Hudi DeltaStreamer:借助 HudiKafka 数据以 COPY_ON_WRITE/MERGE_ON_READ 模式写入 HDFS,并可与 Hive 同步,适合增量更新、近实时湖仓一体。

二、方案一 Spark Structured Streaming 消费 Kafka 写入 HDFS

  • 前置准备
    • 启动 HDFS(确保 NameNode/DataNode 正常,HDFS Web/CLI 可访问)。
    • 启动 Kafka(单机或集群均可,创建测试 topic)。
    • 准备 Spark(建议与 Kafka 客户端版本匹配),并在提交作业时引入 spark-sql-kafka 连接器。
  • 核心配置与示例
    • 连接器依赖(示例):org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3(按你的 Spark/Kafka 版本调整)。
    • 读取 Kafka(关键参数示例)
      • kafka.bootstrap.servers=localhost:9092
      • subscribe=your_topic
      • 反序列化:key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
      • 容错:startingOffsets=earliest(首次消费从最早开始)
    • 写入 HDFS(示例)
      • 格式:parquet/orc/csv
      • 分区:partitionBy(“dt”)
      • 模式:append / complete / update(依据业务语义选择)
    • 提交示例
      • spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3
        –class your.main.Class your-app.jar
        –master yarn
        –deploy-mode cluster
  • 实践要点
    • 合理设置 maxOffsetsPerTriggertrigger(interval) 控制微批/持续处理速率。
    • 选择列式格式(如 Parquet)并配合 partitionBy 提升后续查询性能。
    • 若需端到端一致性,结合 checkpointLocation 与幂等/两阶段提交(如 foreachBatch + commit)。

三、方案二 Apache Flume 从 Kafka 写入 HDFS

  • 组件与拓扑
    • Source:Kafka(org.apache.flume.source.kafka.KafkaSource)
    • Channel:Memory(快但易丢)或 File(持久化,容错更好)
    • Sink:HDFS(HdfsSink)
  • 关键配置示例(kafka-to-hdfs.conf)
    • agent.sources = kafka-source
    • agent.channels = file-channel
    • agent.sinks = hdfs-sink
    • agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
    • agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
    • agent.sources.kafka-source.kafka.topics = my-topic
    • agent.sources.kafka-source.kafka.consumer.group.id = flume-hdfs
    • agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest
    • agent.channels.file-channel.type = file
    • agent.channels.file-channel.checkpointDir = /var/flume/checkpoint
    • agent.channels.file-channel.dataDirs = /var/flume/data
    • agent.sinks.hdfs-sink.type = hdfs
    • agent.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/flume/kafka/%Y-%m-%d
    • agent.sinks.hdfs-sink.hdfs.filePrefix = log-
    • agent.sinks.hdfs-sink.hdfs.fileType = DataStream
    • agent.sinks.hdfs-sink.hdfs.writeFormat = Text
    • agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
    • agent.sinks.hdfs-sink.hdfs.rollSize = 134217728
    • agent.sinks.hdfs-sink.hdfs.rollCount = 0
    • agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
    • agent.sources.kafka-source.channels = file-channel
    • agent.sinks.hdfs-sink.channel = file-channel
  • 运行与验证
    • 启动 Flume:bin/flume-ng agent -n agent -c conf -f conf/kafka-to-hdfs.conf
    • 验证:hdfs dfs -ls /flume/kafka/2025-08-14 查看按日期滚动落盘文件。

四、方案三 Hudi DeltaStreamer 近实时入湖并写入 HDFS

  • 适用场景
    • 需要 增量更新/去重/回撤时光回溯、与 Hive 无缝对接的湖仓场景。
  • 快速上手
    • 准备 Hudi 工具包(HoodieDeltaStreamer 及其依赖),以及 Kafka 源配置(如 JSON 格式、bootstrap.servers、topic、消费组等)。
    • 写入 COPY_ON_WRITE(快照查询快,写入放大高)
      • spark-submit
        –class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE
        –table-type COPY_ON_WRITE
        –source-class org.apache.hudi.utilities.sources.JsonKafkaSource
        –source-ordering-field ts
        –target-base-path /user/hive/warehouse/stock_ticks_cow
        –target-table stock_ticks_cow
        –props /var/demo/config/kafka-source.properties
        –schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
    • 写入 MERGE_ON_READ(写入延迟低,查询视合并策略而定)
      • 在上述命令基础上将 –table-type 改为 MERGE_ON_READ,可按需关闭/开启压缩合并。
    • Hive 同步
      • /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh
        –jdbc-url jdbc:hive2://hiveserver:10000
        –user hive --pass hive
        –partitioned-by dt
        –base-path /user/hive/warehouse/stock_ticks_cow
        –database default --table stock_ticks_cow
  • 结果验证
    • HDFS 路径下可见分区目录、.hoodie 元数据目录与 parquet 数据文件;Hive 中可查询对应表。

五、实践要点与常见问题

  • 时间分区与滚动策略
    • dt=yyyy-MM-dd 分区便于管理与查询;HDFS Sink 通过 rollInterval/rollSize/rollCount 控制文件滚动,避免小文件过多。
  • 容错与一致性
    • Flume 优先选 File Channel;Spark 启用 checkpoint 与幂等写入;Kafka 消费位点由 group.id 管理,必要时重置为 earliest
  • 权限与安全
    • 确保运行用户对 HDFS 目标路径具备写权限;如使用 Kerberos,需 kinit 并在提交作业时配置 JAAS/keytab。
  • 版本兼容
    • 对齐 Scala/Spark/Kafka/Hudi 版本,避免依赖冲突;连接器与库版本需与集群一致。
  • 监控与告警
    • 关注 消费滞后(lag)落盘延迟HDFS 容量小文件数量,按需调整批次大小与分区数。

0