在大数据场景下,Hadoop 本身不是一个传统意义上的“数据库”,而是一个分布式存储与计算平台。数据清洗与预处理通常是在 Hadoop 生态组件(Hive、Spark、MapReduce、HBase、HDFS) 中完成的。下面从流程、常用工具、典型做法几个层面系统说明。
清洗:去除或修正异常、脏数据
预处理:转换、标准化、增强数据,便于后续分析或建模
典型处理流程:
原始数据
↓
HDFS 存储
↓
格式解析(结构化 / 半结构化)
↓
数据清洗(去重、补全、纠错)
↓
数据转换(标准化、特征工程)
↓
输出到 Hive / HBase / 数据湖
| 工具 | 适用场景 | 特点 |
|---|---|---|
| Hive | SQL 化清洗 | 写 SQL,易上手,适合离线 |
| Spark(推荐) | 大规模复杂清洗 | 内存计算,速度快 |
| MapReduce | 极低层处理 | 编写复杂,已逐渐少用 |
| HBase | 实时半结构化数据 | 适合增量清洗 |
| Sqoop | 关系数据库导入 | ETL 前一步 |
| Flume / Kafka | 流式数据 | 实时清洗 |
INSERT OVERWRITE TABLE clean_table
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) as rn
FROM raw_table
) t
WHERE rn = 1;
df = spark.read.parquet("hdfs:///data/raw")
df_clean = df.dropDuplicates(["id"])
SELECT
COALESCE(age, 0) AS age,
CASE WHEN name IS NULL THEN 'unknown' ELSE name END AS name
FROM raw_table;
from pyspark.sql.functions import when, col
df = df.fillna({"age": 0, "name": "unknown"})
例如:年龄 < 0 或 > 120
WHERE age BETWEEN 0 AND 120
或
df = df.filter((col("age") >= 0) & (col("age") <= 120))
SELECT unix_timestamp(time_str, 'yyyy-MM-dd') AS ts
FROM raw_table;
SELECT get_json_object(log, '$.user_id') AS user_id
FROM raw_log;
CASE
WHEN gender = '1' THEN 'M'
WHEN gender = '0' THEN 'F'
ELSE 'Unknown'
END
Spark 示例:
from pyspark.sql.functions import regexp_replace
df = df.withColumn(
"content",
regexp_replace("content", "\\s+", " ")
)
map(LongWritable key, Text value) {
String line = value.toString();
if (line.contains("error")) return;
context.write(new Text(line), NullWritable.get());
}
✅ 适合:
✅ 层级化清洗
✅ 可重跑
✅ 数据质量监控
✅ 小文件问题
coalesce / repartitionKafka / Flume
↓
Spark Streaming / Flink
↓
HDFS / Hive(ETL后)
↓
Hive / Spark SQL
↓
数据仓库 / 模型训练
Hadoop 本身不做数据清洗,真正做清洗的是 Hive、Spark、MapReduce 等计算框架。
如果你愿意,我可以:
你可以直接告诉我:你现在的 Hadoop 版本、数据格式和清洗目标。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。