Flume

作用

将本地文件实时地, 动态上传到 HDFS 中

安装

  1. 官网下载
  2. 删除lib目录下面的guava-xxx.jar

案例

监控端口数据

总体流程

  1. 通过 netcat 工具向本机的 44444 端口发送数据
  2. Flume 监控本机的 44444 端口, 通过 Flume 的 source 端读取数据
  3. Flume 将获取的数据通过 Sink 端写出到控制台

操作步骤

  1. netcat 作为一个网络通信工具

    1. nc开启服务端

      image-20221117142416543

    2. nc开启监听

      image-20221117142552821
  2. 创建任务的配置文件

    # 创建存放所有配置文件的 job 目录
    mkdir /opt/module/flume-1.9.0/job
    
    # 创建见名知义的文件名
    touch /opt/module/flume-1.9.0/job/net-flume-logger.conf
  1. 基本配置文件内容

    # 为 flume 的各个组件进行命名
    # agent相当于flume job的id, 因此同一台主机上多个flume job的agent不能相同
    agent01.sources = source01
    agent01.sinks = sink01
    agent01.channels = channel01
    # 可以配置多个channels, 但如果这里指定, 但下面没有配置的话会启动报错
    # agent01.channels = channel02
    
    # source的配置信息
    agent01.sources.source01.type = netcat
    agent01.sources.source01.bind = localhost
    agent01.sources.source01.port = 44444
    
    # sink的配置信息
    agent01.sinks.sink01.type = logger
    
    # channel的配置信息
    agent01.channels.channel01.type = memory
    agent01.channels.channel01.capacity = 1000
    agent01.channels.channel01.transactionCapacity = 100
    
    # 一个source可以绑定多个channel
    # 一个sink只可以绑定一个channel
    agent01.sources.source01.channels = channel01
    agent01.sinks.sink01.channel = channel01
  1. 启动agent

    注意关闭netcat服务器, 保留netcat客户端发送数据

    /opt/module/flume-1.9.0/bin/flume-ng agent \
    # agent名称
    -n agent01 \
    # flume的配置文件
    -c /opt/module/flume-1.9.0/conf/ \
    # job任务的配置文件
    -f /opt/module/flume-1.9.0/job/net-flume-logger.conf \
    # 指定日志级别和输出位置
    -Dflume.root.logger=INFO,console
    /opt/module/flume-1.9.0/bin/flume-ng agent \
    -n agent01 \
    -c /opt/module/flume-1.9.0/conf/ \
    -f /opt/module/flume-1.9.0/job/net-flume-logger.conf \
    -Dflume.root.logger=INFO,console

    image-20221117153348783

  2. 效果展示

    1. netcat发送数据, 发送成功会显示OK

      image-20221117153759180

    2. flume接受到数据并展示到控制台上

      image-20221117153928972

实时监控文件并上传到HDFS

监控一个文件的内容变化, 上传到HDFS中, 按时间创建新的文件夹和文件

总体流程

  1. 使用exec source监控文件变化, 使用官方文档查看exec source相关的配置项

    exec表示使用Unix命令, 其中tail -F <文件名> 可以监控文件内容的变化

  2. 监听文件的内容变化

  3. 查看hdfs中的变化

操作步骤

  1. 从官网中查看对应source源的配置信息, 例如本案例中为exec source (如何确定source源类型???)

    image-20221117155056539

  2. 编写job的配置文件

    # 为 flume 的各个组件进行命名
    # agent相当于flume job的id, 因此同一台主机上多个flume job的agent不能相同
    agent02.sources = source01
    agent02.sinks = sink01
    agent02.channels = channel01
    
    

channel的配置信息

agent02.channels.channel01.type = memory
agent02.channels.channel01.capacity = 1000
agent02.channels.channel01.transactionCapacity = 100

source的配置信息

一个source可以绑定多个channel

agent02.sources.source01.channels = channel01
agent02.sources.source01.type = exec

监控指定的配置文件

agent02.sources.source01.command = tail -F /opt/module/test/flume.txt

sink配置信息

一个sink只可以绑定一个channel

agent02.sinks.sink01.channel = channel01
agent02.sinks.sink01.type = hdfs

注意这里不要添加端口号

agent02.sinks.sink01.hdfs.path = hdfs://hadoop001/flume/%Y%m%d/%H

上传文件前缀

agent02.sinks.sink01.hdfs.filePrefix = logs-

是否按照时间滚动文件夹

agent02.sinks.sink01.hdfs.round = true

#多少时间单位创建一个新的文件夹
agent02.sinks.sink01.hdfs.roundValue = 1

#重新定义时间单位
agent02.sinks.sink01.hdfs.roundUnit = hour

#是否使用本地时间戳
agent02.sinks.sink01.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次
agent02.sinks.sink01.hdfs.batchSize = 100

#设置文件类型,可支持压缩
agent02.sinks.sink01.hdfs.fileType = DataStream

#多少秒生成一个新的文件
agent02.sinks.sink01.hdfs.rollInterval = 30

#设置每个文件的滚动大小
agent02.sinks.sink01.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关
agent02.sinks.sink01.hdfs.rollCount = 0


3. 为 /opt/module/test/flume.txt 添加测试数据, 并在hadoop的web页面查看

   ![image-20221117170217185](https://namebucket.oss-cn-beijing.aliyuncs.com/img/image-20221117170217185.png)

### 实时监控文件夹的变化

#### 总体流程

1. 使用spooldir source监控目录变化
2. 

#### 操作流程

1. 配置文件内容

2. **在hdfs中提前创建上传文件夹**

3. 成功内容展示

   > 注意事项: 
   >
   > 不能在已有flume.txt.COMPLETED文件的情况下再添加flume.txt, 这样会导致无法更改文件名, 进而造成flume job任务崩溃, 看似还能正常运行, 但实际上后面再添加符合要求的文件也不能产生效果, 需要重启flume job

   ![image-20221117174454009](https://namebucket.oss-cn-beijing.aliyuncs.com/img/image-20221117174454009.png)

   ![image-20221117174602386](https://namebucket.oss-cn-beijing.aliyuncs.com/img/image-20221117174602386.png)

### 实时监控目录下的多个追加文件

+ Exec Source 适用于监控一个实时追加的文件, 不能实现断点续传
+ SpoolDir Source 适合用于同步新文件, 但不适合对实时追加的日志文件进行监听并同步上传
+ TailDir Source适合用于监听多个实时追加的文件, 并且能够实现断点续传

在文件夹中添加新文件在hdfs中会产生新的文件. 对监控目录下的文件追加内容, 也会在hdfs中产生新的文件, 而不是在hdfs的原文件中追加内容

**TailDir Source和Log4j搭配使用存在缺陷**: Log4j会有更名操作, 而TailDir Source对于更名后的文件会重新上传一次, 因此会导致日志数据存储两份. 

**解决方案:**

+ 不使用更名的日志框架, 例如使用logback
+ 修改源码flume-taildir-source包, **只使用inode的值来判断是否是同一个文件, 这样文件更名就不会被flume框架认为是新的文件**

   转载规则


《》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
Source -> [PUT事务] -> Channel <- [TAKE事务] Sink Source端 推送 到Channel中 Sink端 主动拉取 Channel中的数据 Flume进阶学习中断 http
2022-11-17
下一篇 
自定义 InputFormatPriorFileInputFormat.java 文件 package org.example; import lombok.SneakyThrows; import org.apache.hadoop.c
2022-11-16
  目录