温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flume如何采集到HDFS

发布时间:2021-12-09 11:22:28 来源:亿速云 阅读:204 作者:小新 栏目:大数据

这篇文章主要介绍Flume如何采集到HDFS,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

一、需求:

采集指定文件的内容到HDFS

技术选型:exec - memory - hdfs

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.0.129:9000/user/hadoop/flume
a1.sinks.k1.hdfs.batchSize = 10            #10行产生新文件
a1.sinks.k1.hdfs.fileType = DataStream     #压缩格式
a1.sinks.k1.hdfs.writeFormat = Text        #格式类型
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动:

./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343

添加测试数据:

[hadoop@hadoop001 data]$ touch data.log
[hadoop@hadoop001 data]$ echo test >> data.log
[hadoop@hadoop001 data]$ echo test >> data.log
[hadoop@hadoop001 data]$ echo test >> data.log
[hadoop@hadoop001 data]$ echo test >> data.log
[hadoop@hadoop001 data]$ echo test >> data.log

检查HDFS:

[hadoop@hadoop001 flume]$ hdfs dfs -text hdfs://192.168.0.129:9000/user/hadoop/flume/*
18/08/09 20:59:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
test
test
test
test
test

二、需求:

采集指定文件夹的内容到(HDFS或者控制台)

==》文件夹下文件不能修改切不能重名

==》处理完当前文件添加.COMPLETED标识

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三、需求:(生产使用,记录偏移量)

采集指定文件夹和文件内容到(控制台或者HDFS)

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
#记录偏移量,重启续传
a1.sources.r1.positionFile = /home/hadoop/script/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
#监控指定log文件
a1.sources.r1.filegroups.f1 =/home/hadoop/data/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
#监控文加下的所有log*文件夹和内容
a1.sources.r1.filegroups.f2 = /home/hadoop/data/test/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
# 控制台输出
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动:

./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \
-Dflume.root.logger=INFO,console

记录偏移量:

[hadoop@hadoop001 flume]$ cat taildir_position.json

[{"inode":679982,"pos":14,"file":"/home/hadoop/data/example.log"}

{"inode":679984,"pos":0,"file":"/home/hadoop/data/test/log1.log"}]

以上是“Flume如何采集到HDFS”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI