Flume使用指南

一、安装 Flume

首先检查机器是否联网

8.8.8.8 是Google提供的公共 DNS 服务器的IP地址。ping 命令是用来测试网络通信的,它通过向目标IP地址发送数据包并等待响应,来检查是否能够成功地与该地址进行通信。

通过向Google提供的公共DNS服务器发送数据包并等待响应,可以检查本地主机是否能够成功地连接到互联网上的其他计算机。因为Google的公共 DNS 服务器在全球多个位置提供服务,并且稳定可靠,所以它成为了Linux用户常用的网络测试地址之一。

当用户发现自己的电脑无法上网时,可以使用 ping 8.8.8.8 命令来检查是否能够与 Google 的DNS服务器进行通信。如果能够成功连接,那么问题可能出现在其他地方,如果连接失败,则意味着可能存在网络连接问题,需要进一步检查和解决。

1
$ ping 8.8.8.8

检查 nc(netcat) 的版本

netcat(也称为 nc)是一种网络工具,它是一个非常强大的用于创建 TCP/IP 连接的命令行工具。netcat 的功能十分强大,可以用于端口扫描、文件传输、调试协议等多种场景。

通过 netcat 可以进行以下操作:

  • 在主机之间传输数据,可以作为 client(客户端)或 server(服务器)。
  • 在不打开 FTP、SSH、Telnet 等服务的情况下,创建服务器程序,监听端口并处理连接请求。
  • 进行端口扫描和服务探测,可测试端口是否打开,并查看被连接服务的协议版本。
  • 通过网络发送文件或数据,如将文件拷贝到目标主机、进行数据备份等。
  • 调试网络协议,可以手动输入数据包并观察响应信息来调试各种协议的通信。

netcat 是一个非常灵活的工具,它可以作为一个网络的 Swiss Army Knife,可以被用于多种不同的目的。但同时,由于其强大的功能和灵活的使用方式,也可能被用于网络攻击和安全测试,因此在使用时需要谨慎。

1
$ nc -version

如果你的虚拟机没有 nc ,使用以下指令来安装

1
$ yum install nc

GUI 用户打开虚拟机中的浏览器,然后复制链接点击下载 https://archive.apache.org/dist/flume/1.9.0/

点击下载

命令行用户在安装目录下执行以下指令(需要安装 wget 工具)

1
$ wget https://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

没有 wget 工具的执行以下指令进行安装

1
$ yum install wget

解压安装包

1
$ tar -xvf apache-flume-1.9.0-bin.tar.gz

将解压之后的文件夹进行重命名为 flume

1
$ mv apache-flume-1.9.0-bin flume

将重命名之后的文件夹移动到 /training 文件夹下(大数据所有组件都在这里)

1
$ mv flume /training/

修改配置文件

1
vi ~/.bash_profile

将下面两行加入配置文件

export FLUME_HOME=/training/flume/

export PATH=$PATH:$FLUME_HOME/bin

重新导入资源

1
$ source ~/.bash_profile

查看 flume 版本

1
$ flume-ng version

查看 JAVA_HOME

1
$ $JAVA_HOME

flume-env.sh.template 文件重命名为 flume-env.sh

1
$ mv /training/flume/conf/flume-env.sh.template flume-env.sh

修改 flume-env.sh 文件

1
$ vi /training/flume/conf/flume-env.sh

flume-env.sh 文件中的 JAVA_HOME 修改位自己的 jdk 路径

1
export JAVA_HOME=/training/jdk1.8.0_301

再次查看 flume 版本

1
$ flume-ng version

安装成功

二、什么是 Flume

Flume是一个高可靠性、高可用性、分布式、可定制化的海量日志采集、聚合和传输的系统工具。

Flume 是 Apache 的一个开源项目,是一个分布式日志收集系统,以流水线的方式收集、聚合日志。它具有高可扩展性和高可靠性,可以在不停机的情况下进行在线扩容和日志收集。

Flume 支持多种数据源,例如 Avro、Netcat、JMS、Exec、Thrift 等,也支持多种数据目的地,例如 HDFS、HBase、Solr、Elasticsearch 等。

Flume 的核心工作单元是 Agent,它由三个主要组件组成:Source、Channel 和 Sink,分别表示数据来源、数据缓存和数据目的地。

Flume 有许多不同的使用场景,例如日志收集、数据摄取、事件处理等,广泛应用于 Web 应用程序、大数据、云计算等领域。

没有 Hadoop 我们也可以运行 Flume

从不同的数据源手机数据,然后传输到中心化储存的系统中

不依赖于任何操作系统,可以运行在任何设备

Flume Event

Flume Event 是 Apache Flume 中的一个概念,代表了一个在 Flume Agent 中传输的数据单元。一个 Flume Event 包含了一个数据载体和多个可选的头部信息(header)。其中,数据载体是必须的,而头部信息是可选的。

Flume Event 的数据载体是一个字节数组,在传输过程中可以被多个 SourceSink 共享使用。Flume Event 的头部信息是一组键值对,通过这些头部信息可以对数据进行标记和描述,例如可以包含数据的来源、时间戳、数据类型、序列号等信息。

在 Flume Agent 中,多个 Source 可以向一个 Channel 发送 Flume Event多个 Sink 可以从一个 Channel 中获取 Flume Event。在传输过程中,每个 Flume Event 可以通过拦截器(interceptor)进行处理,例如可以对数据进行清洗、格式化、过滤等操作,从而满足不同的业务需求。

需要注意的是,在 Flume Agent 中,一个 Flume Event 的传输是基于异步的,即在发送和接收 Flume Event 的过程中,不需要等待对方的响应。这种设计可以确保高效地传输大量的日志和事件数据。同时,Flume Event 的传输还可以进行批量处理,从而进一步提高传输效率。

Flume 用来传送数据的最小单位,一个 event 就是一个数据的单位,event 就是数据,许多数据流的集合

Flume Agent

Flume Agent 是 Apache Flume 中的一个概念,代表了一个在单个 JVM 进程中运行的 Flume 实例。可以将 Flume Agent 看作是一个数据传输管道,通过各种 SourceSink 连接起来,从而实现数据的采集、传输、处理等功能。

在 Flume Agent 中,每个 SourceSink 都是一个独立的组件。Source 用于采集数据,可以从各种数据源中获取数据,例如从日志文件、网络流、消息队列、Twitter 等等中获取数据。Sink 用于输出数据,可以将获取到的数据输出到各种存储或处理目的地,例如输出到 HDFS、HBase、MySQL、Kafka、Elasticsearch 等等。

除了 SourceSink 组件,Flume Agent 中还有一个 Channel 组件。Channel 是连接 SourceSink 的缓冲区,用于暂存从 Source 中获取到的数据,等待 Sink 进行输出。Flume Agent 中可以配置多个 Source 和多个 Sink,它们通过 Channel 进行连接,从而实现数据的传输和处理。

Flume Agent 的配置文件通过指定各个组件的名称、类型、属性等来实现对 Agent 的定义。Flume Agent 配置文件的结构与 XML 文件类似,由 <agent> 标签组成,其中包含了多个 <source><channel><sink> 标签,每个标签都包含了对应组件的相关配置信息。在配置好 Flume Agent 后,可以通过启动 Flume Agent 进程来启动整个数据传输管道,从而实现数据的采集、传输、处理等功能。

Flume Agent(Flume代理)是 Flume 中的核心概念之一,代表一个 Flume 进程实例,其中包含 SourceChannelSink 三个组件。Flume Agent 用于采集、聚合和传输日志和数据,可运行于单个节点或多个节点的集群中。每个 Flume Agent 进程都可以独立工作,同时也可以和其他 Flume Agent 进程协同工作,以实现更高的可靠性和扩展性。

Flume Agent 包含以下三个核心组件:

  1. Source(数据源):负责从数据源(例如文件、日志、数据流、消息队列等)收集数据,并将数据传递到 Channel
  2. Channel(数据缓存):负责存储 Source 收集到的数据,以便 Sink 进一步处理。Channel 可以是内存缓存、文件系统缓存、Kafka 等第三方缓存系统。
  3. Sink(数据目的地):负责将数据从 Channel 读取并将其传输到目标系统,例如 HDFS、HBase、Elasticsearch、Solr 等。

Flume Agent 可以根据需要自定义配置,如定义多个 SourceChannelSink`,也可以将多个 Agent 配置成一个 Agent 组,以实现更高效的数据采集和传输。

web server 网路服务器,不停产生日志文件,是一个数据源

Flume agent 的任务是将日志文件传输到HDFS中

source 将日志数据从网络服务器传输到 channel 中,拉取数据

channel 将存储数据并传输到 sink

sink 消费从 channel 传来的数据并将他们传输到 HDFS,推送数据

-put 每次只能传输一个文件(从本地到HDFS),这就是第一个限制

flume 可以实时与 hadoop 链接,这样就不用来一个数据执行一次 -put命令了

flume 可以发送大量 实时数据,可以管理数据流

channel会存储数据,直到sink将所有数据都存储到HDFS 中,channel才会把这些数据删除

如果hdfs接收的速度小于webserver发送的速度,channel就会把剩下的数据存储起来,直到sink将其存储到hdfs

channel就像水阀

临时保存

一种是临时保存在内存上,另一种是保存在一个文件上

内存会比硬盘更快,

complex data flow

“复杂数据流”是指在数据处理系统中的一种数据流,它包含一系列复杂的计算和处理过程,用于处理和转换复杂的数据结构。这种处理方式通常需要使用一些先进的数据处理技术和算法,如机器学习、图像处理、自然语言处理等。

复杂数据流通常涉及到多个数据处理阶段,每个阶段都需要处理原始数据并输出处理结果。每个阶段的处理方式和输出结果通常会影响到下一个阶段的处理方式和输出结果。因此,在设计和实现复杂数据流时需要考虑到数据处理阶段之间的依赖关系和数据流的整个工作流程。

复杂数据流可以应用于各种领域,例如金融、医疗、电商、物流等等,用于解决各种数据处理和决策方面的问题。然而,设计和实现复杂数据流也是具有挑战性的,需要考虑到数据的质量、效率、可靠性和安全性等因素。因此,许多现代数据处理系统提供了各种复杂数据处理和分析的工具和框架,以帮助企业处理和分析大量数据,并从中获得更多价值。

Flume Avro 是 Flume 中用于实现数据流的一种机制。Avro 是一种数据序列化格式,它具有良好的可扩展性和高效性能,使得 Flume Avro 可以非常高效地传输大量数据。Flume Avro 可以通过 Avro Source 和 Avro Sink 实现数据的传输,使用 Avro 的优点是可以减少数据传输时所需的带宽和网络延迟。

Avro Source 是 Flume 中的一种 Source 类型,用于接收 Avro 格式的数据。它可以从任何支持 Avro 协议和数据类型的源系统中接收数据,并将其传输到 Flume 的 Channel 组件中,以便后续处理。 Avro Sink 是 Flume 中的一种 Sink 类型,用于将数据传输到支持 Avro 格式的目的地系统。它可以从 Flume 的 Channel 组件中获取数据,并将其传输到目标系统,例如 Avro RPC 服务、HDFS、HBase、Elasticsearch 等。

使用 Flume Avro 的好处是,它可以支持面向数据的实时流处理,同时也可以有效地降低数据传输时所需的带宽和延迟。在 Flume Avro 中,数据是以二进制格式传输的,数据包括数据类型和数据值,可以快速地进行序列化、反序列化和传输。同时,使用 Avro 的模式定义语言,可以定义和更新数据模式,从而支持数据的演进。这种灵活的模式定义方式使得 Flume Avro 可以支持向后兼容和向前兼容,使得数据流处理更加容易和灵活。

当我们使用复杂数据流的时候,我们需要使用avro

。在计算机领域中,replicating 通常指复制数据或实现数据的冗余备份,以增加数据的可靠性和可用性。通过复制数据或备份数据,可以保证数据不会因为单点故障而丢失,同时也可以提高数据的读取速度和吞吐量。Replication 在数据库、分布式系统以及数据中心等领域中被广泛应用。

netcat是一个数据生成器

在 Flume 中,Replicating 是一种 Channel 类型,用于在多个 Sink 之间复制消息。当使用 Replicating Channel 时,Flume 会将每个事件广播到多个 Sink,从而提高了消息的可靠性和可用性。Replicating Channel 相对于其他 Channel 类型的优点是,可以同时将消息发送到多个 Sink,从而减少了单点故障的风险,并且可以提供更好的消息传递效率。

在使用 Replicating Channel 时,每个 Sink 都会接收到相同的消息,并且处理消息的方式也相同。这种方式适用于许多场景,例如在多个数据中心之间复制消息,或者在多个目的地系统之间分发消息。使用 Replicating Channel 可以确保消息的一致性,并提高消息的投递成功率。

需要注意的是,在使用 Replicating Channel 时,由于每个事件都要广播到多个 Sink,因此会增加网络带宽的使用量。同时,如果 Sink 数量很多,也会增加 Flume Agent 的负载和延迟。因此,在使用 Replicating Channel 时需要根据具体情况权衡利弊,选择合适的 Channel 类型和 Sink 数量。

multiplexing可以分发数据到不同的hdfs

Flume 可以传输非结构化数据

FLumeData = filePrefix

是个默认名字后缀

改名字只是更改前缀

三、Flume 的操作

使用另一台机器作为数据源

  1. 设置agent name设置source name设置channel name设置sink name

    1
    agentname.source = sourcename //我们可以使用任何名字来命名
  2. 定义source 的属性

    1
    2
    3
    4
    agentname.sources.sourcename.type = netcat; //有专门的几个词,不能随意
    agentname.sources.sourcename.bind = localhost; //ip地址
    agentname.sources.sourcename.port = portnumber(1-65000);
    //我们不能
  3. 定义 channel的属性

    1
    agentname.channel.channelname.type = memory;//有专门的几个词,不能随意
  4. 定义sink 的属性

    1
    agentnamename.sink.sinkname.type = logger //是hdfs的话就写和hdfs,logger是终端(terminal)
  5. 在channel的帮助下链接source和sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# names of agent,source,channel,sink

andrew.sources=source1
andrew.channels=channel1
andrew.sinks=sink1

# source information

andrew.sources.source1.type=netcat
andrew.sources.source1.bind=localhost
andrew.sources.source1.port=22222

# channel information

andrew.channels.channel1.type=memory

# sink information

andrew.sinks.sink1.type=logger

# connection between source and sink with the help of channel

andrew.sources.source1.channels=channel1
andrew.sinks.sink1.channel=channel1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# names of agent,source,channel,sink

andrew.sources=source1 # 这里可以用任何名字,Andrew 是 agentname
andrew.channels=channel1
andrew.sinks=sink1

# source information

andrew.sources.source1.type=netcat
andrew.sources.source1.bind=localhost # 接收所有来自这个 IP 的信息,不是这个 IP 发出的信息
andrew.sources.source1.port=22222 # 阈值是 1-65000

# channel information

andrew.channels.channel1.type=memory
# 可选值为 memory、file

# sink information

andrew.sinks.sink1.type=logger
# logger 的意思是会把数据显示在终端上(另一个可选:HDFS)

# connection between source and sink with the help of channel

andrew.sources.source1.channels=channel1
andrew.sinks.sink1.channel=channel1 # 只有这里用channel,其他所有都加s

启动 Flume

1
$ flume-ng agent -n agentname -f filename

指令截图

1
2
3
4
$ flume-ng agent -n agentname -f filename

# -n 后面的是 agentname,一定要跟文件里的一样,不然会报错
# -f 后面的是 数据源文件

agentname输错结果报错

运行指令后出现端口号表示执行成功

执行并互通

我们启动另一个窗口来当做另一台机器来链接 Flume

输入指令来链接 Flume

1
$ nc localhost 22222
1
2
3
4
$ nc localhost 22222

# local 是文件中绑定的 IP 地址
# 22222 是文件中设定的端口号

链接Flume

互通成功

使用 Windows 作为数据源

现在我们用Windows来当做source源来运行flume

在 Linux 中写入配置文件,带不带 .config 都可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
andrew.sources=win_source
andrew.channels=win_channel
andrew.sinks=win_sink

andrew.sources.win_source.type=netcat
andrew.sources.win_source.bind=192.168.1.130
andrew.sources.win_source.port=22222

andrew.channels.win_channel.type=memory

andrew.sinks.win_sink.type=logger

andrew.sources.win_source.channels=win_channel
andrew.sinks.win_sink.channel=win_channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
andrew.sources=win_source
andrew.channels=win_channel
andrew.sinks=win_sink

andrew.sources.win_source.type=netcat
andrew.sources.win_source.bind=192.168.1.130

# 因为是要与其他机器相连,所以不能用localhost,要用本机地址
# localhost 是相对于本机而言的,在网络上无法访问 localhost,所以这里必须使用本机 IP

andrew.sources.win_source.port=22222

andrew.channels.win_channel.type=memory

andrew.sinks.win_sink.type=logger

andrew.sources.win_source.channels=win_channel
andrew.sinks.win_sink.channel=win_channel

配置文件这样写

启动 flume

1
$ flume-ng agent -n andrew -f win

启动成功

在 Windows 中断中输入指令,连接虚拟机

1
$ ncat 192.168.1.130 22222

输点什么

连接成功

Flume 与防火墙

这一部分是以后工作中可能需要用到的

注意一下指定都需要使用 sudo 执行

查看防火墙状态

1
$ sudo systemctl status firewalld

启动防火墙

1
$ sudo systemctl start firewalld

启动防火墙之后我们饿就不能接收这个 IP 地址的消息了

防火墙打开,端口就被封闭住了

连得上但是不能接收消息

查看端口是否开放(允许使用)

1
$ sudo firewall-cmd --list-ports

啥都没显示,没有一个端口在白名单

开放 22222 端口

1
$ sudo firewall-cmd --add-port=22222/tcp --permanent

image-20230426090742090

重新加载

1
$ sudo firewall-cmd --reload

成功开放22222端口

再次关闭防火墙,方便日后学习

1
$ sudo systemctl stop firewalld

使用 Linux 指令作为数据源

exec source:

我们使用Linux指令来收集数据(Linux指令作为源代码)

通过命令产生数据,flume 会监听这个产生的数据

cat

我们修改这个文件,cat 不会更新,只能重新使用这个命令来显示

tail

tail -F

会追踪这个文件,文件更新,这个指令的显示结果就回更新

适合用于流处理

我们的数据源是执行 -cat 命令来读取这些命令

我们的source会收集数据传输到channel然后再传输到sink,再在终端上输出数据

1
2
3
4
5
6
7
8
9
10
11
12
13
andrew.sources=lin_source
andrew.channels=lin_channel
andrew.sinks=lin_sink

andrew.sources.lin_source.type=exec
andrew.sources.lin_source.command=cat /root/bd3/data.txt

andrew.channels.lin_channel.type=memory

andrew.sinks.lin_sink.type=logger

andrew.sources.lin_source.channels=lin_channel
andrew.sinks.lin_sink.channel=lin_channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
andrew.sources=lin_source
andrew.channels=lin_channel
andrew.sinks=lin_sink

# 注意这两句跟之前的不一样
andrew.sources.lin_source.type=exec # source 的类型为 exec,表示执行(指令)
# 只适配一个文件,只能读取一个文件,2 个就不行了

andrew.sources.lin_source.command=cat /root/bd3/data.txt
# 这里是要写执行的指令,这次是 cat 指令,后面加上要抓取文件的路径

andrew.channels.lin_channel.type=memory
# memory 表示 channel 将数据暂存在内存够中

andrew.sinks.lin_sink.type=logger
# sink 的数据类型是日志

andrew.sources.lin_source.channels=lin_channel
andrew.sinks.lin_sink.channel=lin_channel

要抓取文件的内容

运行指令

1
$ flume-ng agent -n andrew -f netcat2.conf

抓取文件内容作为数据源

使用 tail 指令

1
2
3
4
5
6
7
8
9
10
11
12
13
andrew.sources=lin_source
andrew.channels=lin_channel
andrew.sinks=lin_sink

andrew.sources.lin_source.type=exec
andrew.sources.lin_source.command=tail -F /root/bd3/data.txt

andrew.channels.lin_channel.type=memory

andrew.sinks.lin_sink.type=logger

andrew.sources.lin_source.channels=lin_channel
andrew.sinks.lin_sink.channel=lin_channel

刚启动的时候

1
$ echo "curry" >> data.txt

插入些东西

追踪显示

更换追踪目录为 /var/log/message

这个文件是记录我们 Linux 的系统日程的

随着我们对 Linux 系统的操作,这个文件会不断更新日程

借助 tail -F 我们可以追踪更新实时数据

无论是什么系统级的操作,它都会记录下来,我们都会实时读取到

1
2
3
4
5
6
7
8
9
10
11
12
13
andrew.sources=lin_source
andrew.channels=lin_channel
andrew.sinks=lin_sink

andrew.sources.lin_source.type=exec
andrew.sources.lin_source.command=tail -F /var/log/messages

andrew.channels.lin_channel.type=memory

andrew.sinks.lin_sink.type=logger

andrew.sources.lin_source.channels=lin_channel
andrew.sinks.lin_sink.channel=lin_channel

image-20230427181146687

使用一整个文件夹作为数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
andrew.sources = fireworm_source
andrew.channels = fireworm_channel
andrew.sinks = fireworm_sink

andrew.sources.fireworm_source.type = spooldir
andrew.sources.fireworm_source.spoolDir = /root/bd3

andrew.channels.fireworm_channel.type = memory

andrew.sinks.fireworm_sink.type = logger

andrew.sources.fireworm_source.channels = fireworm_channel
andrew.sinks.fireworm_sink.channel = fireworm_channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
andrew.sources = fireworm_source
andrew.channels = fireworm_channel
andrew.sinks = fireworm_sink

andrew.sources.fireworm_source.type = spooldir # 这里的类型就写 spooldir 即可

andrew.sources.fireworm_source.spoolDir = /root/bd3 # 这里要写一个文件夹,不能是一个文件
# 这里的 spoolDir 的 D 要大写,这是大小写敏感的

andrew.channels.fireworm_channel.type = memory

andrew.sinks.fireworm_sink.type = logger

andrew.sources.fireworm_source.channels = fireworm_channel
andrew.sinks.fireworm_sink.channel = fireworm_channel

Flume 的一个文件被传输之后会被加上 .COMPLETED 标识,Flume 之后就不会传输有这个标识的文件

(传输了我所有的 .conf 文件)

标识传输过的文件

配置文件被重命名之后,文件就会被重命名,下次用它运行 Flume 的时候要用新的、被重命名之后的、后缀为 .COMPLETED 的文件

尝试一下使用之前的文件名,报错

使用新文件名,成功

只要我们不退出程序,flume就回一直监听这个文件夹,只要有新的文件就回继续传输,不需要重启flume

新建个文件

成功监听

如果我们手动删除了这个标识,这个文件会被当成新文件重新传输

手动去掉Flume标识

又把这个文件重新传输了一遍

修改 sink

得先启动 Hadoop

这里也是可以监听文件夹的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
andrew.sources = fireworm_source
andrew.channels = fireworm_channel
andrew.sinks = fireworm_sink

andrew.sources.fireworm_source.type = spooldir
andrew.sources.fireworm_source.spoolDir = /root/bd3

andrew.channels.fireworm_channel.type = memory

andrew.sinks.fireworm_sink.type = hdfs
andrew.sinks.fireworm_sink.hdfs.path = /batch3/spool
andrew.sinks.fireworm_sink.hdfs.fileType = DataStream
andrew.sinks.fireworm_sink.hdfs.hdfs.writeFormat = Text

andrew.sources.fireworm_source.channels = fireworm_channel
andrew.sinks.fireworm_sink.channel = fireworm_channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
andrew.sources = fireworm_source
andrew.channels = fireworm_channel
andrew.sinks = fireworm_sink

andrew.sources.fireworm_source.type = spooldir
andrew.sources.fireworm_source.spoolDir = /root/bd3

andrew.channels.fireworm_channel.type = memory

andrew.sinks.fireworm_sink.type = hdfs # 修改类型为 hdfs

andrew.sinks.fireworm_sink.hdfs.path = /batch3/spool # 指定要上传的 hdfs 路径

andrew.sinks.fireworm_sink.hdfs.fileType = DataStream # 指定传输格式为数据流传输
# DataStream 大小写敏感

andrew.sinks.fireworm_sink.hdfs.hdfs.writeFormat = Text # 指定上传格式为 .txt 文件
# writeFormat 的 F 与 Text 的 T 都大小写敏感

andrew.sources.fireworm_source.channels = fireworm_channel
andrew.sinks.fireworm_sink.channel = fireworm_channel

这是在等待上传数据

这是传输完成

这里也在等待上传数据

上传完成

如果文件夹里有同名的文件,会报错

现在我们假设有一个 log1.txt 文件,我们在这个文件夹中使用 Flume 监听后,这个文件的文件名变成了 log1.txt.COMPLETED

现在我们尝试将另一个 log1.txt 文件从根目录中移动到这个目录下,移动过程是不会报错的

因为一个文件是 log1.txt 另一个文件是 log1.txt.COMPLETED

但是此时 Flume 监听程序会报错,因为它给这个新文件 log1.txt 加上传输标识之后,一个目录下的文件会出现重名,此时不会进行数据传输,也不会将文件改名

这时将已经有标识的 log1.txt.COMPLETED 文件删除,依然不会将新文件 log1.txt 传输,必须重新运行 Flume

报错信息

使用 exec 作为数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
andrew.sources = lin_source
andrew.channels = total_channel
andrew.sinks = total_sink

andrew.sources.lin_source.type = exec
andrew.sources.lin_source.command = tail -F /root/batch3/log1.txt

andrew.channels.total_channel.type = memory

andrew.sinks.total_sink.type = hdfs
andrew.sinks.total_sink.hdfs.path = /batch3/total
andrew.sinks.total_sink.hdfs.fileType = DataStream
andrew.sinks.total_sink.hdfs.hdfs.writeFormat = Text

andrew.sources.lin_source.channels = total_channel
andrew.sinks.total_sink.channel = total_channel

追踪成功

追踪成功

使用 netcat 作为数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
andrew.sources = win_source
andrew.channels = total_channel
andrew.sinks = total_sink

andrew.sources.win_source.type = netcat
andrew.sources.win_source.bind = 192.168.1.130
andrew.sources.win_source.port = 12345

andrew.channels.total_channel.type = memory

andrew.sinks.total_sink.type = hdfs
andrew.sinks.total_sink.hdfs.path = /batch3/total
andrew.sinks.total_sink.hdfs.fileType = DataStream
andrew.sinks.total_sink.hdfs.hdfs.writeFormat = Text

andrew.sources.win_source.channels = total_channel
andrew.sinks.total_sink.channel = total_channel

传输成功

传输成功

tailDir 使用文件,可以指定一个文件夹下的文件,一直传输一直上传

taildir 不会重复传输一个文件

有一个Jason文件会存储我们发送的数据和位置

要么删除文件要么修改JSON文件,否则不能重复发送文件

1
$ ls -a
1
$ cd .flume

JSON文件在这个文件夹下

taildir和spoodir //使用taildir时必须全部大写表示
ipoodir有一些限制,比如当文件传输完成后,文件后缀会变成,completed,此时当你再向这个文件中写入新数据时,fume并不会将这些新数据传输poodir: 我们可以多个文件,只要文件在我们要进行传输的路径中,flume就会全部进行传输aildir:我们可以针对特定的文件,而且它不会传输重复的内容,而且当心内容呗增加到我们传输的文件中时,它也会同步更新
gentname.sources.sourcename.tvpe=TAILDIRagentname.sources.sourcename.filegroups=f1//你想要传输多少个文件,想要一个就f1,想要两个就f2aqentname.sources.sourcename.filegroups.f1= //文件路径positionFile= //因为taildir会把传输的文件位置储存在-个ison 文件中,这就是规定这个json文件的位置
如果你要传输多个名字近似的文件,比og1.,og2.,你可以后面写”og或者说你没有这个文件,你在echo的时候添加了新文件,你也可以用这个,但是新文件必须也得是差不多这样子

HTTP Source

post 和 get 是两个方法,可以用来接收 flume 请求

.type =

.bind = 产生数据的 IP 地址

.port = 1-65535

1
2
3
4
5
6
7
8
9
10
11
12
13
14
andrew.sources = win_source
andrew.channels = total_channel
andrew.sinks = total_sink

andrew.sources.win_source.type = http
andrew.sources.win_source.bind = 192.168.1.130
andrew.sources.win_source.port = 12345

andrew.channels.total_channel.type = memory

andrew.sinks.total_sink.type = logger

andrew.sources.win_source.channels = total_channel
andrew.sinks.total_sink.channel = total_channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
andrew.sources = win_source
andrew.channels = total_channel
andrew.sinks = total_sink

andrew.sources.win_source.type = http # 要用 http
andrew.sources.win_source.bind = 192.168.1.130 # 接收消息的 IP 地址
andrew.sources.win_source.port = 12345

andrew.channels.total_channel.type = memory

andrew.sinks.total_sink.type = logger

andrew.sources.win_source.channels = total_channel
andrew.sinks.total_sink.channel = total_channel

用postman发送数据

接收成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
andrew.sources = win_source
andrew.channels = total_channel
andrew.sinks = total_sink

andrew.sources.win_source.type = http
andrew.sources.win_source.bind = 192.168.1.130
andrew.sources.win_source.port = 12347

andrew.channels.total_channel.type = memory

andrew.sinks.total_sink.type = hdfs
andrew.sinks.total_sink.hdfs.path = /batch3/spool1
andrew.sinks.total_sink.hdfs.filePrefix = httpdata
andrew.sinks.total_sink.hdfs.fileType = DataStream
andrew.sinks.total_sink.hdfs.rollInterval = 60
andrew.sinks.total_sink.hdfs.rollCount = 0
andrew.sinks.total_sink.hdfs.rollSize = 0

andrew.sources.win_source.channels = total_channel
andrew.sinks.total_sink.channel = total_channel

用postman发送数据

创建文件成功

在hdfs中检查

channels

capacity = 100 (默认 100) 存储容量

transactionCapacity = 100 传输容量

Hbase

sinks.sink1.type = hbase2

使用HBASE进行数据存储

sinks.sink1.table =

sinks.sink1.columnFamily

1
$ start-hbase.sh
1
$ hbase shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
andrew.sources = hbase_source
andrew.channels = hbase_channel
andrew.sinks = hbase_sink

andrew.sources.hbase_source.type = TAILDIR
andrew.sources.hbase_source.filegroups = f1 f2
andrew.sources.hbase_source.filegroups.f1 = /root/batch3/."log1."
andrew.sources.habse_source.filegroups.f2 = /root/batch3/."log2."

andrew.channels.hbase_channel.type = memory
andrew.channels.hbase_channel.capacity = 1000
andrew.channels.hbase_channel.transactionCapacity = 100

andrew.sinks.hbase_sink.type = hbase2
andrew.sinks.hbase_sink.table = batche
andrew.sinks.hbase_sink.columnFamile = data

andrew.sources.hbase_source.channels = hbase_channel
andrew.sinks.hbase_sink.channel = hbase_channel

将 win 中 flume 输出的数据作为 lin 下 flume 的输入

Windows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
andrew.sources = source1
andrew.channels = channel1
andrew.sinks = sink1

andrew.sources.source1.type = spooldir
andrew.sources.source1.spoolDir = D:\spool

andrew.channels.channel1.type = memory
andrew.channels.channel1.capacity = 1000

andrew.sinks.sink1.type = avro
andrew.sinks.sink1.hostname = 192.168.1.130
andrew.sinks.sink1.port = 22222

andrew.sources.source1.channels = channel1
andrew.sinks.sink1.channel = channel1

Linux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
andrew.sources = source1
andrew.channels = channel1
andrew.sinks = sink1

andrew.sources.source1.type = avro
andrew.sources.source1.bind = 192.168.1.130
andrew.sources.source1.port = 22222

andrew.channels.channel1.type = memory
andrew.channels.channel1.capacity = 1000

andrew.sinks.sink1.type = logger

andrew.sources.source1.channels = channel1
andrew.sinks.sink1.channel = channel1

在Windows上运行flume

sink为logger

sink 修改为 ``hdfs`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
andrew.sources = source1
andrew.channels = channel1
andrew.sinks = sink1

andrew.sources.source1.type = avro
andrew.sources.source1.bind = 192.168.1.130
andrew.sources.source1.port = 22222

andrew.channels.channel1.type = memory
andrew.channels.channel1.capacity = 1000

andrew.sinks.sink1.type = hdfs
andrew.sinks.sink1.hdfs.path = /batch3/spool1
andrew.sinks.sink1.hdfs.fileType = DataStream
andrew.sinks.sink1.hdfs.hdfs.writeFormat = Text

andrew.sources.source1.channels = channel1
andrew.sinks.sink1.channel = channel1

sink为hdfs

多路复用扇出

传输同一个数据到两个不同的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
andrew.sources = source1
andrew.channels = channel1 channel2
andrew.sinks = sink1 sink2

andrew.sources.source1.type = spooldir
andrew.sources.source1.spoolDir = /root/batch3
andrew.sources.source1.batchSize = 500
andrew.sources.source1.selector.type = replicating

andrew.channels.channel1.type = memory
andrew.channels.channel1.transactionCapacity = 1000
andrew.channels.channel1.capacity = 1000

andrew.channels.channel2.type = memory
andrew.channels.channel2.transactionCapacity = 1000
andrew.channels.channel2.capacity = 1000

andrew.sinks.sink1.type = hdfs
andrew.sinks.sink1.hdfs.path = /batch3/spool
andrew.sinks.sink1.hdfs.fileType = DataStream
andrew.sinks.sink1.hdfs.filePrefix = repdata

andrew.sinks.sink2.type = logger
# andrew.sinks.sink2.type = hbase
# andrew.sinks.sink2.table =
# andrew.sinks.sink2.columnFamily =

andrew.sources.source1.channels = channel1 channel2
andrew.sinks.sink1.channel = channel1
andrew.sinks.sink2.channel = channel2

传输完成logger

传输完成hdfs