在Ubuntu上配置Kafka的生产者和消费者,你需要遵循以下步骤:
Kafka需要Java运行环境,首先确保你的系统上安装了Java。
sudo apt update
sudo apt install openjdk-11-jdk
验证Java安装:
java -version
从Apache Kafka官网下载最新版本的Kafka,并解压到你选择的目录。
wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
Kafka使用Zookeeper来管理集群,所以你需要先启动Zookeeper,然后启动Kafka服务器。
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
在Kafka中发送和接收消息之前,你需要创建一个或多个主题。
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Kafka生产者可以通过配置文件或编程方式配置。以下是通过配置文件producer.properties进行配置的示例:
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
保存这个文件,然后使用以下命令启动生产者:
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092 --property print.key=true --property parsing.mode=DELIMITED --property key.separator=,
在控制台中输入消息,按回车发送。
同样,Kafka消费者也可以通过配置文件或编程方式配置。以下是通过配置文件consumer.properties进行配置的示例:
bootstrap.servers=localhost:9092
group.id=test-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000
保存这个文件,然后使用以下命令启动消费者:
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092 --property group.id=test-group
这将使消费者从头开始读取消息。
bootstrap.servers中的主机名和端口。以上步骤应该可以帮助你在Ubuntu上配置Kafka的生产者和消费者。如果你遇到任何问题,请检查Kafka和Zookeeper的日志文件以获取更多信息。