Ubuntu环境下实现Hadoop实时数据处理的技术方案
传统Hadoop MapReduce以批处理为核心,无法满足实时数据处理需求。需结合流处理框架(如Flink、Spark Streaming)与实时数据管道(如Kafka),构建“采集-传输-处理-存储”的完整实时流水线。以下是具体实施步骤:
sudo apt update
sudo apt install openjdk-11-jdk
验证安装:java -version(需显示Java版本信息)。/usr/local/hadoop-3.3.1),配置核心文件:
core-site.xml:设置HDFS默认文件系统地址(fs.defaultFS=hdfs://localhost:9000);hdfs-site.xml:配置NameNode数据目录(dfs.namenode.name.dir=/opt/hadoopdata/namenode)与DataNode数据目录(dfs.datanode.data.dir=/opt/hadoopdata/datanode),并设置副本数(dfs.replication=1,单机环境)。hadoop namenode -format,启动HDFS:start-dfs.sh(通过jps验证NameNode、DataNode进程是否运行)。Kafka作为分布式消息队列,负责实时数据的接收、存储与传输,是连接数据源与流处理框架的关键组件。
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
test_topic):bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic),消费者接收消息(bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning)。Flink是原生流处理框架,支持事件时间、状态管理与容错,适合实时分析、实时ETL等场景。
~/.bashrc添加export FLINK_HOME=/usr/local/flink-1.13.2、export PATH=$PATH:$FLINK_HOME/bin),启动集群:bin/start-cluster.sh
验证:访问http://localhost:8081查看Flink Web UI。from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
# 配置Kafka消费者
kafka_consumer = FlinkKafkaConsumer(
topics='test_topic',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'flink_group'}
)
# 添加数据源
stream = env.add_source(kafka_consumer)
# 数据处理:按空格分割单词,统计词频
result = stream \
.flat_map(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.key_by(lambda x: x[0]) \
.sum(1)
# 输出到控制台
result.print()
# 执行任务
env.execute("Kafka Flink WordCount")
提交任务:flink run -c your_module_name your_script.py。Spark Streaming通过微批处理模拟流处理,继承Spark的易用性与生态(如Hive、HDFS集成),适合已有Spark基础的场景。
~/.bashrc添加export SPARK_HOME=/usr/local/spark-3.2.1-bin-hadoop3.2、export PATH=$PATH:$SPARK_HOME/bin)。KafkaUtils读取Kafka数据,进行实时单词计数:from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="KafkaSparkStreaming")
ssc = StreamingContext(sc, batch_duration=1) # 1秒批处理间隔
# 配置Kafka参数
kafka_params = {"metadata.broker.list": "localhost:9092"}
topics = ["test_topic"]
# 创建Kafka流
kafka_stream = KafkaUtils.createDirectStream(ssc, topics, kafka_params)
# 数据处理:提取值,按空格分割单词,统计词频
lines = kafka_stream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 输出结果
word_counts.pprint()
# 启动流处理
ssc.start()
ssc.awaitTermination()
提交任务:spark-submit --master local[2] your_script.py。处理后的实时数据可存储至以下系统,满足不同场景需求:
FileSystemSink写入HDFS);parallelism.default)、批处理间隔(Spark Streaming的batchDuration),优化GC策略(如使用G1GC减少停顿时间)。通过以上步骤,可在Ubuntu环境下构建基于Hadoop生态的实时数据处理系统,结合流处理框架的高吞吐与低延迟特性,满足实时分析、监控等场景需求。