Flume
作用
将本地文件实时地, 动态上传到 HDFS 中
安装
- 官网下载
- 删除lib目录下面的guava-xxx.jar
案例
监控端口数据
总体流程
- 通过 netcat 工具向本机的 44444 端口发送数据
- Flume 监控本机的 44444 端口, 通过 Flume 的 source 端读取数据
- Flume 将获取的数据通过 Sink 端写出到控制台
操作步骤
netcat 作为一个网络通信工具
nc开启服务端

nc开启监听
创建任务的配置文件
# 创建存放所有配置文件的 job 目录 mkdir /opt/module/flume-1.9.0/job # 创建见名知义的文件名 touch /opt/module/flume-1.9.0/job/net-flume-logger.conf
基本配置文件内容
# 为 flume 的各个组件进行命名 # agent相当于flume job的id, 因此同一台主机上多个flume job的agent不能相同 agent01.sources = source01 agent01.sinks = sink01 agent01.channels = channel01 # 可以配置多个channels, 但如果这里指定, 但下面没有配置的话会启动报错 # agent01.channels = channel02 # source的配置信息 agent01.sources.source01.type = netcat agent01.sources.source01.bind = localhost agent01.sources.source01.port = 44444 # sink的配置信息 agent01.sinks.sink01.type = logger # channel的配置信息 agent01.channels.channel01.type = memory agent01.channels.channel01.capacity = 1000 agent01.channels.channel01.transactionCapacity = 100 # 一个source可以绑定多个channel # 一个sink只可以绑定一个channel agent01.sources.source01.channels = channel01 agent01.sinks.sink01.channel = channel01
启动agent
注意关闭netcat服务器, 保留netcat客户端发送数据
/opt/module/flume-1.9.0/bin/flume-ng agent \ # agent名称 -n agent01 \ # flume的配置文件 -c /opt/module/flume-1.9.0/conf/ \ # job任务的配置文件 -f /opt/module/flume-1.9.0/job/net-flume-logger.conf \ # 指定日志级别和输出位置 -Dflume.root.logger=INFO,console/opt/module/flume-1.9.0/bin/flume-ng agent \ -n agent01 \ -c /opt/module/flume-1.9.0/conf/ \ -f /opt/module/flume-1.9.0/job/net-flume-logger.conf \ -Dflume.root.logger=INFO,console
效果展示
netcat发送数据, 发送成功会显示
OK
flume接受到数据并展示到控制台上

实时监控文件并上传到HDFS
监控一个文件的内容变化, 上传到HDFS中, 按时间创建新的文件夹和文件
总体流程
使用exec source监控文件变化, 使用官方文档查看exec source相关的配置项
exec表示使用Unix命令, 其中
tail -F <文件名>可以监控文件内容的变化监听文件的内容变化
查看hdfs中的变化
操作步骤
从官网中查看对应source源的配置信息, 例如本案例中为
exec source(如何确定source源类型???)
编写job的配置文件
# 为 flume 的各个组件进行命名 # agent相当于flume job的id, 因此同一台主机上多个flume job的agent不能相同 agent02.sources = source01 agent02.sinks = sink01 agent02.channels = channel01
channel的配置信息
agent02.channels.channel01.type = memory
agent02.channels.channel01.capacity = 1000
agent02.channels.channel01.transactionCapacity = 100
source的配置信息
一个source可以绑定多个channel
agent02.sources.source01.channels = channel01
agent02.sources.source01.type = exec
监控指定的配置文件
agent02.sources.source01.command = tail -F /opt/module/test/flume.txt
sink配置信息
一个sink只可以绑定一个channel
agent02.sinks.sink01.channel = channel01
agent02.sinks.sink01.type = hdfs
注意这里不要添加端口号
agent02.sinks.sink01.hdfs.path = hdfs://hadoop001/flume/%Y%m%d/%H
上传文件前缀
agent02.sinks.sink01.hdfs.filePrefix = logs-
是否按照时间滚动文件夹
agent02.sinks.sink01.hdfs.round = true
#多少时间单位创建一个新的文件夹
agent02.sinks.sink01.hdfs.roundValue = 1
#重新定义时间单位
agent02.sinks.sink01.hdfs.roundUnit = hour
#是否使用本地时间戳
agent02.sinks.sink01.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
agent02.sinks.sink01.hdfs.batchSize = 100
#设置文件类型,可支持压缩
agent02.sinks.sink01.hdfs.fileType = DataStream
#多少秒生成一个新的文件
agent02.sinks.sink01.hdfs.rollInterval = 30
#设置每个文件的滚动大小
agent02.sinks.sink01.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
agent02.sinks.sink01.hdfs.rollCount = 0
3. 为 /opt/module/test/flume.txt 添加测试数据, 并在hadoop的web页面查看

### 实时监控文件夹的变化
#### 总体流程
1. 使用spooldir source监控目录变化
2.
#### 操作流程
1. 配置文件内容
2. **在hdfs中提前创建上传文件夹**
3. 成功内容展示
> 注意事项:
>
> 不能在已有flume.txt.COMPLETED文件的情况下再添加flume.txt, 这样会导致无法更改文件名, 进而造成flume job任务崩溃, 看似还能正常运行, 但实际上后面再添加符合要求的文件也不能产生效果, 需要重启flume job


### 实时监控目录下的多个追加文件
+ Exec Source 适用于监控一个实时追加的文件, 不能实现断点续传
+ SpoolDir Source 适合用于同步新文件, 但不适合对实时追加的日志文件进行监听并同步上传
+ TailDir Source适合用于监听多个实时追加的文件, 并且能够实现断点续传
在文件夹中添加新文件在hdfs中会产生新的文件. 对监控目录下的文件追加内容, 也会在hdfs中产生新的文件, 而不是在hdfs的原文件中追加内容
**TailDir Source和Log4j搭配使用存在缺陷**: Log4j会有更名操作, 而TailDir Source对于更名后的文件会重新上传一次, 因此会导致日志数据存储两份.
**解决方案:**
+ 不使用更名的日志框架, 例如使用logback
+ 修改源码flume-taildir-source包, **只使用inode的值来判断是否是同一个文件, 这样文件更名就不会被flume框架认为是新的文件**