温馨提示×

Ubuntu Kafka如何与其他服务集成

小樊
39
2025-12-30 23:45:20
栏目: 编程语言

Ubuntu 上 Kafka 与其他服务的集成实践

一 基础准备与网络连通

  • 安装 OpenJDK 11,下载并解压 Apache Kafka(示例版本:3.5.1),将目录移至 /usr/local/kafka
  • 建议使用 systemd 管理 ZookeeperKafka,便于开机自启与集中日志:
    • Zookeeper 服务单元示例:ExecStart 指向 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties;Kafka 服务单元示例:ExecStart 指向 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties,并在 [Service] 中设置 Environment=“JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64”
  • 关键网络配置(server.properties):设置 listeners=PLAINTEXT://:9092advertised.listeners=PLAINTEXT://<服务器公网或内网IP>:9092,确保其它主机可连;创建测试 Topic
    • bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sampleTopic
  • 以上步骤完成后,可通过 systemctl 启动并在本机验证 Topic 列表与连通性。

二 与日志采集 Flume 集成

  • 典型链路:应用日志/文件 → Flume Source → MemoryChannel → Kafka Sink → Kafka Topic
  • 示例(exec-memory-avro.conf + avro-memory-kafka.conf):
    • Source:exec 读取日志文件(如 tail -F /home/hadoop/data/data.log)。
    • Channel:memory。
    • Sink:KafkaSink,配置 brokerList=192.168.0.230:9092topic=hello_topicbatchSize=5requiredAcks=1
  • 启动顺序与验证:
    • 先启动 avro-memory-kafka,再启动 exec-memory-avro;随后启动 Kafka 服务与消费者:
      • kafka-console-consumer.sh --zookeeper 192.168.0.230:2181 --topic hello_topic
  • 该方案适合将服务器日志、业务埋点等实时写入 Kafka 做解耦与缓冲。

三 与 Spark Streaming 集成

  • Ubuntu 环境可搭建 Zookeeper + Kafka + Spark Streaming 单机开发环境,使用 Scala 2.11Spark 2.x 进行流式消费与处理。
  • 要点:
    • 启动 ZookeeperKafka;创建目标 Topic
    • Spark Streaming 应用通过 KafkaUtils.createDirectStream(或旧版 Receiver API)订阅 Topic,进行实时计算与落地。
  • 该集成路径适合入门与功能验证,生产环境建议使用更现代的 Structured Streaming 与更稳定的版本组合。

四 与 Storm 集成

  • 使用 KafkaSpout 从 Kafka 读取数据,使用 KafkaBolt 写回 Kafka,构建 Storm Topology
    • 代码示例中通过 KafkaSpout<String, String> 读取、KafkaBolt<String, String> 写出,设置 KafkaBolt.KAFKA_BROKER_PROPERTIESwithTopicSelector/withTupleToKafkaMapper 完成序列化与路由。
    • 提交拓扑前准备输入输出 Topic,安全模式需准备 keytab 并完成 Kerberos 登录与授权。
  • 运行与验证:
    • 通过 storm jar 提交拓扑;用 kafka-console-consumer.sh 观察输出 Topic 数据是否生成,以确认端到端链路正常。

五 与 Spring Boot 微服务集成

  • Spring Boot 项目中引入依赖 spring-kafka,在 application.yml 配置 bootstrap-servers、序列化器,必要时开启 SASL/SSL
    • 示例(精简):
      • spring.kafka.bootstrap-servers=localhost:9092
      • spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
      • spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
      • 安全示例:spring.kafka.security.protocol=SASL_PLAINTEXT;spring.kafka.sasl.mechanism=SCRAM-SHA-256;并配置 jaas-config
  • 生产建议:启用 SASL/SSL、合理设置 acks、retries、batch.size、linger.ms,并使用 AdminClient 管理 TopicACL

0