在 CentOS 上处理 Kafka 消息的实用指南
一 环境准备与启动
bin/zkServer.sh startbin/zkServer.sh statuslisteners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://<服务器IP>:9092log.dirs=/var/lib/kafka/logszookeeper.connect=localhost:2181bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh -daemon config/server.propertiesfirewall-cmd --add-port=2181/tcp --permanent && firewall-cmd --add-port=9092/tcp --permanent && firewall-cmd --reloadnc -vz <服务器IP> 9092 与 nc -vz <服务器IP> 2181 应返回 succeeded。二 命令行收发消息
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1bin/kafka-topics.sh --list --bootstrap-server localhost:9092bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningbin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test三 消费模式与关键机制
四 可靠性与顺序性配置要点
acks=0:不等待确认,延迟最低、可靠性最差;acks=1:等待 Leader 落盘后确认;acks=all(或 -1):等待 Leader+所有 ISR 副本 落盘后确认,可靠性最高。enable.idempotence=true)可降低失败重试导致的重复与乱序风险。五 编程消费示例 Java
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest"); // 首次消费位置:earliest/latest
props.put("enable.auto.commit", "true"); // 自动提交
props.put("auto.commit.interval.ms", "5000");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> r : records) {
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
r.topic(), r.partition(), r.offset(), r.key(), r.value());
}
// 若 enable.auto.commit=false,在此处手动提交:
// consumer.commitSync();
}
}
}
}