使用Linux Kafka进行实时数据流处理涉及以下几个步骤:
sudo apt install openjdk-8-jdk
sudo yum install java-1.8.0-openjdk
java -version
tar -zxvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0.tgz kafka
cd /usr/local/kafka/config
server.properties 文件:vim server.properties
修改以下配置项:
broker.id:设置为唯一标识符,例如 1。log.dirs:设置Kafka日志存储目录,例如 /tmp/kafka-logs。zookeeper.connect:设置Zookeeper连接地址,例如 zk_ip:2181。listeners:设置Kafka监听地址,例如 PLAINTEXT://your_server_ip:9092。log.retention.hours:设置日志保留时间,例如 168(表示一周)。delete.topic.enable:设置为 true 以允许删除topic。nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
ps -ef | grep zookeeper
测试Kafka生产者和消费者:
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server your_server_ip:9092
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server your_server_ip:9092
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
public class KafkaStreamsApp {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
KStream<String, String> processed = source.mapValues(value -> processValue(value));
processed.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig());
streams.start();
}
private static String processValue(String value) {
// 处理逻辑
return value;
}
private static Properties getStreamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
}
java -jar kafka-streams-app.jar
通过上述步骤,您可以在Linux上安装和配置Kafka,并使用Kafka Streams进行实时数据流处理。