Debian系统Kafka生产者配置指南
Kafka基于JVM运行,需先安装Java开发工具包(JDK)。推荐使用OpenJDK 8或11(兼容性更好):
sudo apt update
sudo apt install openjdk-11-jdk # 或 openjdk-8-jdk
java -version # 验证安装(需显示Java版本信息)
从Apache Kafka官网下载最新稳定版(如3.5.2),解压至目标目录(如/opt/kafka):
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -xzf kafka_2.12-3.5.2.tgz
sudo mv kafka_2.12-3.5.2 /opt/kafka # 移动至指定目录
Kafka依赖Zookeeper进行集群管理,需先启动Zookeeper再启动Kafka:
# 启动Zookeeper(默认端口2181)
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties &
# 启动Kafka(默认端口9092)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
创建生产者配置文件(如/opt/kafka/config/producer.properties),设置核心参数:
# Kafka集群地址(单节点用localhost,集群用逗号分隔的broker地址)
bootstrap.servers=localhost:9092
# 键/值的序列化器(需与生产者代码中的类一致)
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# 消息确认机制(可靠性保障):
# 0:不等待broker确认(低可靠性,高吞吐);
# 1:等待leader分区确认(默认,平衡可靠性与性能);
# all:等待所有ISR副本确认(最高可靠性)
acks=all
# 发送失败重试次数(默认0,建议3次)
retries=3
# 重试间隔时间(毫秒,默认100)
retry.backoff.ms=100
# 批量发送大小(字节,默认16KB,调大可提升吞吐)
batch.size=32768
# 批量发送延迟(毫秒,默认0,等待积累更多消息)
linger.ms=10
# 消息压缩类型(可选gzip/snappy/lz4,减少网络传输)
compression.type=snappy
# 客户端唯一标识(便于监控)
client.id=my-producer
使用Kafka客户端API编写生产者程序,加载配置文件并发送消息:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 加载配置文件
Properties props = new Properties();
props.load(KafkaProducerExample.class.getClassLoader().getResourceAsStream("producer.properties"));
// 创建生产者实例
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 发送消息(主题、key、value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "test-key", "Hello, Debian Kafka!");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent to topic %s, partition %d, offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用Maven管理依赖(pom.xml添加Kafka客户端依赖),编译并运行代码:
# 创建pom.xml文件(若未使用Maven,可直接下载kafka-clients.jar)
cat <<EOF > pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.2</version>
</dependency>
</dependencies>
</project>
EOF
# 编译代码
javac -cp "/opt/kafka/libs/*" KafkaProducerExample.java
# 运行生产者
java -cp ".:/opt/kafka/libs/*" KafkaProducerExample
使用Kafka自带的消费者工具,验证消息是否成功发送到指定主题:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
若输出Hello, Debian Kafka!,则表示生产者配置成功。
bootstrap.servers地址是否正确,防火墙是否开放9092端口(sudo ufw allow 9092)。key.serializer/value.serializer与代码中的类一致(如StringSerializer对应字符串类型)。acks设置为all,并启用retries(建议3次以上),提升可靠性。