在开始集成前,需确保Linux系统已安装以下组件(版本需兼容):
sudo apt-get install openjdk-8-jdk(Ubuntu/Debian)或brew install openjdk@8(MacOS)安装。bin/zookeeper-server-start.sh config/zookeeper.properties)和Kafka服务(bin/kafka-server-start.sh config/server.properties)。export SPARK_HOME=/path/to/spark; export PATH=$PATH:$SPARK_HOME/bin)。spark-sql-kafka连接器访问Kafka,在Spark配置中添加spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3(版本需与Spark/Kafka匹配)。使用Kafka命令行工具创建用于数据传输的主题(如test_topic),指定Broker地址、分区数和副本数:
bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
主题创建后,可通过bin/kafka-topics.sh --list --bootstrap-server localhost:9092验证主题是否存在。
使用Spark Structured Streaming API从Kafka消费数据,关键配置项包括:
kafka.bootstrap.servers:Kafka Broker地址(如localhost:9092);subscribe:订阅的主题名称(如test_topic);startingOffsets:起始偏移量(默认earliest,可选latest)。PySpark示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("KafkaSparkIntegration") \
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test_topic") \
.load()
# 将Kafka的value字段(字节数组)转换为字符串
processed_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
processed_df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
.awaitTermination()
Scala示例:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("KafkaSparkIntegration")
.getOrCreate()
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test_topic")
.load()
val processedDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
processedDF.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
上述代码会从Kafka读取数据,并将key/value以字符串形式输出到控制台。
若需将Spark处理后的数据写回Kafka,可使用writeStream结合Kafka sink,关键配置项包括:
kafka.bootstrap.servers:Kafka Broker地址;topic:目标Kafka主题;checkpointLocation:检查点路径(用于故障恢复)。PySpark示例:
# 假设processed_df是处理后的DataFrame,包含"value"列(需转换为字节数组)
processed_df.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output_topic") \
.option("checkpointLocation", "/tmp/spark_kafka_checkpoint") \
.start() \
.awaitTermination()
Scala示例:
processedDF.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output_topic") \
.option("checkpointLocation", "/tmp/spark_kafka_checkpoint") \
.start() \
.awaitTermination()
上述代码会将processed_df中的数据写入Kafka的output_topic主题。
假设Kafka中有一个sensor_data主题,存储了物联网设备的温度、湿度数据(JSON格式),以下是Spark实时过滤高温数据的完整流程:
使用kafka-python库模拟传感器数据发送,每秒发送一条包含sensor_id、temperature、humidity的JSON消息:
from kafka import KafkaProducer
import json
import random
import time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
while True:
data = {
'sensor_id': random.randint(1, 100),
'temperature': round(random.uniform(20.0, 30.0), 2),
'humidity': round(random.uniform(30.0, 70.0), 2),
'timestamp': int(time.time())
}
producer.send('sensor_data', data)
time.sleep(1)
使用Spark Structured Streaming读取sensor_data主题,过滤出温度超过25℃的数据,并将结果写入控制台:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, TimestampType
spark = SparkSession.builder \
.appName("SensorDataProcessing") \
.getOrCreate()
# 定义数据模式(用于解析JSON)
schema = StructType([
StructField("sensor_id", IntegerType()),
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("timestamp", TimestampType())
])
# 从Kafka读取数据
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor_data") \
.load()
# 解析JSON数据并过滤
processed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
.selectExpr("from_json(value, 'sensor_id INT, temperature FLOAT, humidity FLOAT, timestamp TIMESTAMP') as data") \
.select("data.*") \
.filter("temperature > 25.0")
# 输出结果到控制台
query = processed_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
query.awaitTermination()
运行后,Spark会实时打印温度超过25℃的传感器数据。
ERROR Unable to connect to Kafka at localhost:9092,需检查Kafka服务是否启动(ps -ef | grep kafka)、Broker地址是否正确(server.properties中的listeners配置)。auto.offset.reset=earliest(从最早消息开始读取)。spark-sql-kafka-0-10_2.12:3.1.3)。repartition()或coalesce()调整Spark DataFrame的分区数,提高处理效率(如processed_df.repartition(4))。processingTime='5 seconds'),平衡延迟与吞吐量。compression_type=gzip),减少网络传输数据量。通过以上步骤,可实现Linux环境下Kafka与Spark的高效集成,支撑实时数据采集、处理与分析需求。