当前位置:首页 >> 计算机软件及应用 >> Flume1.6.0入门:安装、部署、及flume的案例

Flume1.6.0入门:安装、部署、及flume的案例


问题导读 1.什么是 flume 2.flume 的官方网站在哪里? 3.flume 有哪些术语? 4.如何配置 flume 数据源码?

一、什么是 Flume?
flume 作为 cloudera 开发的实时日志收集系统, 受到了业界的认可与广泛应用。 Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等 缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的 现象尤为严重, 为了解决这些问题, 2011 年 10 月 22 号, cloudera 完成了 Flume-728, 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本 统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下, cloudera Flume 改名为 Apache Flume。 flume 的特点: flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日 志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并 写到各种数据接受方(比如文本、HDFS、Hbase 等)的能力。 flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志 数据(字节数组形式)并且携带有头信息, 这些 Event 由 Agent 外部的 Source 生成, 当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你 可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久 化日志或者把事件推向另一个 Source。 flume 的可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume 提供了三种级别 的可靠性保障,从强到弱依次分别为:end-to-end(收到数据 agent 首先将 event 写到磁盘 上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure (这也是 scribe 采用的策略,当数据接收方 crash 时,将数据写到本地,待恢复后,继续发 送),Besteffort(数据发送到接收方后,不会进行确认)。 flume 的可恢复性: 还是靠 Channel。推荐使用 FileChannel,事件持久化在本地文件系统里(性能较差)。

flume 的一些核心概念: 1. Agent 使用 JVM 运行 Flume。每台机器运行一个 agent,但是可以在一个 agent 中包含多个 sources 和 sinks。 2. Client 生产数据,运行在一个独立的线程。 3. Source 从 Client 收集数据,传递给 Channel。 4. Sink 从 Channel 收集数据,运行在一个独立线程。 5. Channel 连接 sources 和 sinks ,这个有点像一个队列。 6. Events 可以是日志记录、 avro 对象等。

Flume 以 agent 为最小的独立运行单位。 一个 agent 就是一个 JVM。 单 agent 由 Source、 Sink 和 Channel 三大组件构成,如下图:

值得注意的是,Flume 提供了大量内置的 Source、Channel 和 Sink 类型。不同类型的 Source,Channel 和 Sink 可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比 如:Channel 可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink 可以把日志写入 HDFS, HBase,甚至是另外一个 Source 等等。Flume 支持用户建立多级流,也就是说,多 个 agent 可以协同工作,并且支持 Fan-in、Fan-out、Contextual Routing、Backup Routes, 这也正是 NB 之处。如下图所示:

二、flume 的官方网站在哪里?
http://flume.apache.org/

三、在哪里下载?
http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-b in.tar.gz

四、如何安装?
1)将下载的 flume 包,解压到/home/hadoop 目录中,你就已经完成了 50%:)简 单吧 2)修改 flume-env.sh 配置文件,主要是 JAVA_HOME 变量设置 1. #/etc/profile 增加环境变量

export FLUME_HOME=/home/hadoop/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=.:$PATH::$FLUME_HOME/bin

3)验证是否安装成功
1. root@m1:/home/hadoop# flume-ng version 2. Flume 1.6.0 3. Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git 4. Revision: 8633220df808c4cd0c13d1cf0320454a94f1ea97 5. Compiled by hshreedharan on Wed May 7. root@m1:/home/hadoop# 复制代码 7 14:49:18 PDT 2014 6. From source with checksum a01fe726e4380ba0c9f7a7d222db961f

出现上面的信息,表示安装成功了

五、flume 的案例

1)案例 1:Avro
Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。 a)创建 agent 配置文件
1. root@m1:/home/hadoop#vi /home/hadoop/flume-1.6.0-bin/conf/avro.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = avro 9. a1.sources.r1.channels = c1 10. a1.sources.r1.bind = 0.0.0.0 11. a1.sources.r1.port = 4141 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1

复制代码

b)启动 flume agent a1

1.
复制代码

root@m1:flume-1.6.0-bin # flume-ng agent -c conf -f conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

#命令参数说明 -c conf 指定配置目录为 conf -f conf/avro.conf 指定配置文件为 conf/avro.conf (注意路径,当前路径与配置所指的路径) -n a1 指定 agent 名字为 a1,需要与 avro.conf 中的一致 -Dflume.root.logger=INFO,console 指定 DEBUF 模式在 console 输出 INFO 信息 c)创建指定文件
1. root@m1:/home/hadoop# echo "hello world" > /home/hadoop/flume-1.6.0-bin/log.00 复制代码

d)使用 avro-client 发送文件
1. root@m1:/home/hadoop# flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00 复制代码

注意参数中 m1

e)在 m1 的控制台,可以看到以下信息,注意最后一行:

1. root@m1:/home/hadoop/flume-1.5.0-bin/conf# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console 2. Info: Sourcing environment configuration script /home/hadoop/flume-1.5.0-bin/conf/flume-env.sh

3. Info: Including Hadoop libraries found via (/home/hadoop/hadoop-2.2.0/bin/hadoop) for HDFS access 4. Info: Excluding /home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-api-1.7.5.jar from classpath 5. Info: Excluding /home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.j ar from classpath 6. ... 7. 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(N ettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] UNBOUND 8. 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(N ettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] CLOSED 9. 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(Ne ttyServer.java:209)] Connection to /192.168.1.50:59850 disconnected. 10. 2014-08-10 10:43:26,718 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 world } 复制代码 hello

2)案例 2:Spool
Spool 监测配置的目录下新增的文件, 并将文件中的数据读取出来。 需要注意两点: 1) 拷贝到 spool 目录下的文件不可以再打开编辑。 2) spool 目录下不可包含相应的子目录

a)创建 agent 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = spooldir

9. a1.sources.r1.channels = c1 10. a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs 11. a1.sources.r1.fileHeader = true 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码

b)启动 flume agent a1

1.
复制代码

flume-ng agent -c conf -f conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
root@m1:apache-flume-1.6.0-bin#

c)追加文件到/home/hadoop/flume-1.5.0-bin/logs 目录
1. root@m1:/home/hadoop# echo "spool test1" > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log 复制代码

d)在 m1 的控制台,可以看到以下相关信息:
1. 14/08/10 11:37:13 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 2. 14/08/10 11:37:13 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 3. 14/08/10 11:37:14 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/flume-1.5.0-bin/logs/spool_text.log to /home/hadoop/flume-1.5.0-bin/logs/spool_text.log.COMPLETED 4. 14/08/10 11:37:14 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 5. 14/08/10 11:37:14 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.

6. 14/08/10 11:37:14 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 Source runner has shutdown. 8. 14/08/10 11:37:15 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 9. 14/08/10 11:37:16 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 10. 14/08/10 11:37:16 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 11. 14/08/10 11:37:17 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 复制代码 spool test1 } 7. 14/08/10 11:37:15 INFO source.SpoolDirectorySource: Spooling Directory

3)案例 3:Exec
EXEC 执行一个给定的命令获得输出的源,如果要使用 tail 命令,必选使得 file 足够 大才能看到输出内容 a)创建 agent 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = exec 9. a1.sources.r1.channels = c1 10. a1.sources.r1.command = tail -F /home/hadoop/flume-1.5.0-bin/log_exec_tail 11. 12. # Describe the sink 13. a1.sinks.k1.type = logger 14. 15. # Use a channel which buffers events in memory 16. a1.channels.c1.type = memory 17. a1.channels.c1.capacity = 1000 18. a1.channels.c1.transactionCapacity = 100

19. 20. # Bind the source and sink to the channel 21. a1.sources.r1.channels = c1 22. a1.sinks.k1.channel = c1 复制代码

b)启动 flume agent a1
1. root@m1:apache-flume-1.6.0-bin# flume-ng agent -c conf -f conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

c)生成足够多的内容在文件里

1.

root@m1:/home/hadoop#

for i in {1..1000}

> do > echo "exec tail$i" >> /home/hadoop/apache-flume-1.6.0-bin/log_exec_tail > done
e)在 m1 的控制台,可以看到以下信息:
1. 2014-08-10 10:59:25,513 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 tail test } 2. 2014-08-10 10:59:34,535 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 tail test } 3. 2014-08-10 11:01:40,557 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 tail1 } 4. 2014-08-10 11:01:41,180 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 32 tail2 } 5. 2014-08-10 11:01:41,180 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 33 tail3 } 6. 2014-08-10 11:01:41,181 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] exec exec exec exec exec

Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 34 tail4 }

exec

7. 2014-08-10 11:01:41,181 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 35 tail5 } 8. 2014-08-10 11:01:41,181 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 36 tail6 } 9. .... 10. .... 11. .... 12. 2014-08-10 11:01:51,550 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 36 tail96 } 13. 2014-08-10 11:01:51,550 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 37 tail97 } 14. 2014-08-10 11:01:51,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38 tail98 } 15. 2014-08-10 11:01:51,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39 tail99 } 16. 2014-08-10 11:01:51,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30 tail100 } 复制代码 exec exec exec exec exec exec exec

4)案例 4:Syslogtcp
Syslogtcp 监听 TCP 的端口做为数据源 a)创建 agent 配置文件

1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = syslogtcp 9. a1.sources.r1.port = 5140 10. a1.sources.r1.host = localhost 11. a1.sources.r1.channels = c1 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码

b)启动 flume agent a1

1. root@m1:apache-flume-1.6.0-bin# flume-ng agent -c conf -f conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

c)测试产生 syslog

1.
复制代码

root@m1:apache-flume-1.6.0-bin # echo "hello idoall.org syslog" | nc localhost 5140

注:以上命令需安装 netcat, 安装方法:yum search nc 再执行:yum install nc.i686

查找相关 nc 源,如找到 nc.i686

d)在 m1 的控制台,可以看到以下信息:

1. 14/08/10 11:41:45 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf 2. 14/08/10 11:41:45 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 3. 14/08/10 11:41:45 INFO conf.FlumeConfiguration: Processing:k1 4. 14/08/10 11:41:45 INFO conf.FlumeConfiguration: Processing:k1 5. 14/08/10 11:41:45 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 6. 14/08/10 11:41:45 INFO node.AbstractConfigurationProvider: Creating channels 7. 14/08/10 11:41:45 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 8. 14/08/10 11:41:45 INFO node.AbstractConfigurationProvider: Created channel c1 9. 14/08/10 11:41:45 INFO source.DefaultSourceFactory: Creating instance of source r1, type syslogtcp 10. 14/08/10 11:41:45 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 11. 14/08/10 11:41:45 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 12. 14/08/10 11:41:45 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6538b14 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 13. 14/08/10 11:41:45 INFO node.Application: Starting Channel c1 14. 14/08/10 11:41:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 15. 14/08/10 11:41:45 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 16. 14/08/10 11:41:45 INFO node.Application: Starting Sink k1 17. 14/08/10 11:41:45 INFO node.Application: Starting Source r1 18. 14/08/10 11:41:45 INFO source.SyslogTcpSource: Syslog TCP Source starting... 19. 14/08/10 11:42:15 WARN source.SyslogUtils: Event created from Invalid Syslog data. 20. 14/08/10 11:42:15 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org } 复制代码

5)案例 5:JSONHandler
a)创建 agent 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 9. a1.sources.r1.port = 8888 10. a1.sources.r1.channels = c1 11. 12. # Describe the sink 13. a1.sinks.k1.type = logger 14. 15. # Use a channel which buffers events in memory 16. a1.channels.c1.type = memory 17. a1.channels.c1.capacity = 1000 18. a1.channels.c1.transactionCapacity = 100 19. 20. # Bind the source and sink to the channel 21. a1.sources.r1.channels = c1 22. a1.sinks.k1.channel = c1 复制代码

b)启动 flume agent a1

1. root@m1:/home/hadoop# flume-ng agent -c conf -f /home/hadoop/flume-1.5.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

c)生成 JSON 格式的 POST request
1. root@m1:/home/hadoop# curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888 复制代码

d)在 m1 的控制台,可以看到以下信息:
1. 14/08/10 11:49:59 INFO node.Application: Starting Channel c1 2. 14/08/10 11:49:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 3. 14/08/10 11:49:59 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 4. 14/08/10 11:49:59 INFO node.Application: Starting Sink k1 5. 14/08/10 11:49:59 INFO node.Application: Starting Source r1 6. 14/08/10 11:49:59 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog 7. 14/08/10 11:49:59 INFO mortbay.log: jetty-6.1.26 8. 14/08/10 11:50:00 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:8888 9. 14/08/10 11:50:00 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 10. 14/08/10 11:50:00 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 11. 14/08/10 12:14:32 INFO sink.LoggerSink: Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79 复制代码 idoall.org_body }

6)案例 6:Hadoop sink (需配置 Hadoop) 以下还未 测试
a)创建 agent 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = syslogtcp 9. a1.sources.r1.port = 5140 10. a1.sources.r1.host = localhost 11. a1.sources.r1.channels = c1 12. 13. # Describe the sink

14. a1.sinks.k1.type = hdfs 15. a1.sinks.k1.channel = c1 16. a1.sinks.k1.hdfs.path = hdfs://m1:9000/user/flume/syslogtcp 17. a1.sinks.k1.hdfs.filePrefix = Syslog 18. a1.sinks.k1.hdfs.round = true 19. a1.sinks.k1.hdfs.roundValue = 10 20. a1.sinks.k1.hdfs.roundUnit = minute 21. 22. # Use a channel which buffers events in memory 23. a1.channels.c1.type = memory 24. a1.channels.c1.capacity = 1000 25. a1.channels.c1.transactionCapacity = 100 26. 27. # Bind the source and sink to the channel 28. a1.sources.r1.channels = c1 29. a1.sinks.k1.channel = c1 复制代码

b)启动 flume agent a1

1.

root@m1:apache-flume-1.6.0-bin# flume-ng agent -c conf -f /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console

复制代码

c)测试产生 syslog
1. root@m1:/home/hadoop# echo "hello idoall flume -> hadoop testing one" | nc localhost 5140 复制代码

d)在 m1 的控制台,可以看到以下信息:
1. 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2. 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 3. 14/08/10 12:20:39 INFO node.Application: Starting Sink k1 4. 14/08/10 12:20:39 INFO node.Application: Starting Source r1 5. 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 6. 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started

7. 14/08/10 12:20:39 INFO source.SyslogTcpSource: Syslog TCP Source starting... 8. 14/08/10 12:21:46 WARN source.SyslogUtils: Event created from Invalid Syslog data. 9. 14/08/10 12:21:49 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false 10. 14/08/10 12:21:49 INFO hdfs.BucketWriter: Creating hdfs://m1:9000/user/flume/syslogtcp//Syslog.1407644509504.tmp 11. 14/08/10 12:22:20 INFO hdfs.BucketWriter: Closing hdfs://m1:9000/user/flume/syslogtcp//Syslog.1407644509504.tmp 12. 14/08/10 12:22:20 INFO hdfs.BucketWriter: Close tries incremented 13. 14/08/10 12:22:20 INFO hdfs.BucketWriter: Renaming hdfs://m1:9000/user/flume/syslogtcp/Syslog.1407644509504.tmp to hdfs://m1:9000/user/flume/syslogtcp/Syslog.1407644509504 14. 14/08/10 12:22:20 INFO hdfs.HDFSEventSink: Writer callback called. 复制代码

e)在 m1 上再打开一个窗口,去 hadoop 上检查文件是否生成
1. root@m1:/home/hadoop# /home/hadoop/hadoop-2.2.0/bin/hadoop fs -ls /user/flume/syslogtcp 2. Found 1 items 3. -rw-r--r-3 root supergroup 155 2014-08-10 12:22 /user/flume/syslogtcp/Syslog.1407644509504 4. root@m1:/home/hadoop# /home/hadoop/hadoop-2.2.0/bin/hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504 5. SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritabl e^;>Gv$hello idoall flume -> hadoop testing one 复制代码

7)案例 7:File Roll Sink a)创建 agent 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = syslogtcp

9. a1.sources.r1.port = 5555 10. a1.sources.r1.host = localhost 11. a1.sources.r1.channels = c1 12. 13. # Describe the sink 14. a1.sinks.k1.type = file_roll 15. a1.sinks.k1.sink.directory = /home/hadoop/flume-1.5.0-bin/logs 16. 17. # Use a channel which buffers events in memory 18. a1.channels.c1.type = memory 19. a1.channels.c1.capacity = 1000 20. a1.channels.c1.transactionCapacity = 100 21. 22. # Bind the source and sink to the channel 23. a1.sources.r1.channels = c1 24. a1.sinks.k1.channel = c1 复制代码

b)启动 flume agent a1
1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

c)测试产生 log
1. root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5555 2. root@m1:/home/hadoop# echo "hello idoall.org syslog 2" | nc localhost 5555 复制代码

d)查看/home/hadoop/flume-1.5.0-bin/logs 下是否生成文件,默认每 30 秒生成 一个新文件
1. root@m1:/home/hadoop# ll /home/hadoop/flume-1.5.0-bin/logs 2. 总用量 272 3. drwxr-xr-x 3 root root 4. drwxr-xr-x 9 root root 5. -rw-r--r-- 1 root root 6. -rw-r--r-- 1 root root 7. -rw-r--r-- 1 root root 8. root@m1:/home/hadoop# cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-1 /home/hadoop/flume-1.5.0-bin/logs/1407646164782-2 4096 Aug 10 12:50 ./ 4096 Aug 10 10:59 ../ 50 Aug 10 12:49 1407646164782-1 0 Aug 10 12:49 1407646164782-2 0 Aug 10 12:50 1407646164782-3

9. hello idoall.org syslog 10. hello idoall.org syslog 2 复制代码

8)案例 8:Replicating Channel Selector Flume 支持 Fan out 流从一个源到多个通道。有两种模式的 Fan out,分别是复制 和复用。在复制的情况下,流的事件被发送到所有的配置通道。在复用的情况下,事件被发 送到可用的渠道中的一个子集。Fan out 流需要指定源和 Fan out 通道的规则。 这次我们需要用到 m1,m2 两台机器 a)在 m1 创建 replicating_Channel_Selector 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 k2 5. a1.channels = c1 c2 6. 7. # Describe/configure the source 8. a1.sources.r1.type = syslogtcp 9. a1.sources.r1.port = 5140 10. a1.sources.r1.host = localhost 11. a1.sources.r1.channels = c1 c2 12. a1.sources.r1.selector.type = replicating 13. 14. # Describe the sink 15. a1.sinks.k1.type = avro 16. a1.sinks.k1.channel = c1 17. a1.sinks.k1.hostname = m1 18. a1.sinks.k1.port = 5555 19. 20. a1.sinks.k2.type = avro 21. a1.sinks.k2.channel = c2 22. a1.sinks.k2.hostname = m2 23. a1.sinks.k2.port = 5555 24. 25. # Use a channel which buffers events in memory 26. a1.channels.c1.type = memory 27. a1.channels.c1.capacity = 1000 28. a1.channels.c1.transactionCapacity = 100 29.

30. a1.channels.c2.type = memory 31. a1.channels.c2.capacity = 1000 32. a1.channels.c2.transactionCapacity = 100 复制代码

b)在 m1 创建 replicating_Channel_Selector_avro 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.con f 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = avro 9. a1.sources.r1.channels = c1 10. a1.sources.r1.bind = 0.0.0.0 11. a1.sources.r1.port = 5555 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码

c)在 m1 上将 2 个配置文件复制到 m2 上一份
1. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector. conf 2. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.con f root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_ avro.conf 复制代码

d)打开 4 个窗口,在 m1 和 m2 上同时启动两个 flume agent
1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.con f -n a1 -Dflume.root.logger=INFO,console 2. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

e)然后在 m1 或 m2 的任意一台机器上,测试产生 syslog
1. root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5140 复制代码

f)在 m1 和 m2 的 sink 窗口,分别可以看到以下信息,这说明信息得到了同步:
1. 14/08/10 14:08:18 INFO ipc.NettyServer: Connection to /192.168.1.51:46844 disconnected. 2. 14/08/10 14:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f, /192.168.1.50:35873 => /192.168.1.50:5555] OPEN 3. 14/08/10 14:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f, /192.168.1.50:35873 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555 4. 14/08/10 14:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f, /192.168.1.50:35873 => /192.168.1.50:5555] CONNECTED: /192.168.1.50:35873 5. 14/08/10 14:08:59 INFO ipc.NettyServer: [id: 0xd6318635, /192.168.1.51:46858 => /192.168.1.50:5555] OPEN 6. 14/08/10 14:08:59 INFO ipc.NettyServer: [id: 0xd6318635, /192.168.1.51:46858 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555 7. 14/08/10 14:08:59 INFO ipc.NettyServer: [id: 0xd6318635, /192.168.1.51:46858 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:46858 8. 14/08/10 14:09:20 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org } 复制代码

9)案例 9:Multiplexing Channel Selector a)在 m1 创建 Multiplexing_Channel_Selector 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 k2

5. a1.channels = c1 c2 6. 7. # Describe/configure the source 8. a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 9. a1.sources.r1.port = 5140 10. a1.sources.r1.channels = c1 c2 11. a1.sources.r1.selector.type = multiplexing 12. 13. a1.sources.r1.selector.header = type 14. #映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。 15. a1.sources.r1.selector.mapping.baidu = c1 16. a1.sources.r1.selector.mapping.ali = c2 17. a1.sources.r1.selector.default = c1 18. 19. # Describe the sink 20. a1.sinks.k1.type = avro 21. a1.sinks.k1.channel = c1 22. a1.sinks.k1.hostname = m1 23. a1.sinks.k1.port = 5555 24. 25. a1.sinks.k2.type = avro 26. a1.sinks.k2.channel = c2 27. a1.sinks.k2.hostname = m2 28. a1.sinks.k2.port = 5555 29. 30. # Use a channel which buffers events in memory 31. a1.channels.c1.type = memory 32. a1.channels.c1.capacity = 1000 33. a1.channels.c1.transactionCapacity = 100 34. 35. a1.channels.c2.type = memory 36. a1.channels.c2.capacity = 1000 37. a1.channels.c2.transactionCapacity = 100 复制代码

b)在 m1 创建 Multiplexing_Channel_Selector_avro 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.co nf 2. 3. a1.sources = r1 4. a1.sinks = k1

5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = avro 9. a1.sources.r1.channels = c1 10. a1.sources.r1.bind = 0.0.0.0 11. a1.sources.r1.port = 5555 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码

c)将 2 个配置文件复制到 m2 上一份
1. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf conf 2. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.co nf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector _avro.conf 复制代码 r oot@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.

d)打开 4 个窗口,在 m1 和 m2 上同时启动两个 flume agent
1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.co nf -n a1 -Dflume.root.logger=INFO,console

2. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

e)然后在 m1 或 m2 的任意一台机器上,测试产生 syslog
1. root@m1:/home/hadoop# curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140 复制代码

f)在 m1 的 sink 窗口,可以看到以下信息:
1. 14/08/10 14:32:21 INFO node.Application: Starting Sink k1 2. 14/08/10 14:32:21 INFO node.Application: Starting Source r1 3. 14/08/10 14:32:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }... 4. 14/08/10 14:32:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 5. 14/08/10 14:32:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 6. 14/08/10 14:32:21 INFO source.AvroSource: Avro source r1 started. 7. 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] OPEN 8. 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555 9. 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] CONNECTED: /192.168.1.50:35916 10. 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] OPEN 11. 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555 12. 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:46945 13. 14/08/10 14:34:11 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31 69 64 6F 61 6C 6C 5F 54 45 53 54 33 复制代码 idoall_TEST1 } idoall_TEST3 } 14. 14/08/10 14:34:57 INFO sink.LoggerSink: Event: { headers:{type=qq} body:

g)在 m2 的 sink 窗口,可以看到以下信息:
1. 14/08/10 14:32:27 INFO node.Application: Starting Sink k1 2. 14/08/10 14:32:27 INFO node.Application: Starting Source r1 3. 14/08/10 14:32:27 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }... 4. 14/08/10 14:32:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 5. 14/08/10 14:32:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 6. 14/08/10 14:32:27 INFO source.AvroSource: Avro source r1 started. 7. 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec, /192.168.1.50:38104 => /192.168.1.51:5555] OPEN 8. 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec, /192.168.1.50:38104 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555 9. 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec, /192.168.1.50:38104 => /192.168.1.51:5555] CONNECTED: /192.168.1.50:38104 10. 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x3d36f553, /192.168.1.51:48599 => /192.168.1.51:5555] OPEN 11. 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x3d36f553, /192.168.1.51:48599 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555 12. 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x3d36f553, /192.168.1.51:48599 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48599 13. 14/08/10 14:34:33 INFO sink.LoggerSink: Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32 复制代码 idoall_TEST2 }

可以看到,根据 header 中不同的条件分布到不同的 channel 上 10)案例 10:Flume Sink Processors failover 的机器是一直发送给其中一个 sink,当这个 sink 不可用的时候,自动发送 到下一个 sink。 a)在 m1 创建 Flume_Sink_Processors 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 k2 5. a1.channels = c1 c2

6. 7. #这个是配置 failover 的关键,需要有一个 sink group 8. a1.sinkgroups = g1 9. a1.sinkgroups.g1.sinks = k1 k2 10. #处理的类型是 failover 11. a1.sinkgroups.g1.processor.type = failover 12. #优先级,数字越大优先级越高,每个 sink 的优先级必须不相同 13. a1.sinkgroups.g1.processor.priority.k1 = 5 14. a1.sinkgroups.g1.processor.priority.k2 = 10 15. #设置为 10 秒,当然可以根据你的实际状况更改成更快或者很慢 16. a1.sinkgroups.g1.processor.maxpenalty = 10000 17. 18. # Describe/configure the source 19. a1.sources.r1.type = syslogtcp 20. a1.sources.r1.port = 5140 21. a1.sources.r1.channels = c1 c2 22. a1.sources.r1.selector.type = replicating 23. 24. 25. # Describe the sink 26. a1.sinks.k1.type = avro 27. a1.sinks.k1.channel = c1 28. a1.sinks.k1.hostname = m1 29. a1.sinks.k1.port = 5555 30. 31. a1.sinks.k2.type = avro 32. a1.sinks.k2.channel = c2 33. a1.sinks.k2.hostname = m2 34. a1.sinks.k2.port = 5555 35. 36. # Use a channel which buffers events in memory 37. a1.channels.c1.type = memory 38. a1.channels.c1.capacity = 1000 39. a1.channels.c1.transactionCapacity = 100 40. 41. a1.channels.c2.type = memory 42. a1.channels.c2.capacity = 1000 43. a1.channels.c2.transactionCapacity = 100 复制代码

b)在 m1 创建 Flume_Sink_Processors_avro 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf 2.

3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = avro 9. a1.sources.r1.channels = c1 10. a1.sources.r1.bind = 0.0.0.0 11. a1.sources.r1.port = 5555 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码

c)将 2 个配置文件复制到 m2 上一份
1. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf 2. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.co nf 复制代码 root@m2:/

d)打开 4 个窗口,在 m1 和 m2 上同时启动两个 flume agent
1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console 2. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

e)然后在 m1 或 m2 的任意一台机器上,测试产生 log
1. root@m1:/home/hadoop# echo "idoall.org test1 failover" | nc localhost 5140

复制代码

f)因为 m2 的优先级高,所以在 m2 的 sink 窗口,可以看到以下信息,而 m1 没有:
1. 14/08/10 15:02:46 INFO ipc.NettyServer: Connection to /192.168.1.51:48692 disconnected. 2. 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0x09a14036, /192.168.1.51:48704 => /192.168.1.51:5555] OPEN 3. 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0x09a14036, /192.168.1.51:48704 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555 4. 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0x09a14036, /192.168.1.51:48704 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48704 5. 14/08/10 15:03:26 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } 复制代码

g)这时我们停止掉 m2 机器上的 sink(ctrl+c),再次输出测试数据:
1. root@m1:/home/hadoop# echo "idoall.org test2 failover" | nc localhost 5140 复制代码

h)可以在 m1 的 sink 窗口,看到读取到了刚才发送的两条测试数据:
1. 14/08/10 15:02:46 INFO ipc.NettyServer: Connection to /192.168.1.51:47036 disconnected. 2. 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0xbcf79851, /192.168.1.51:47048 => /192.168.1.50:5555] OPEN 3. 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0xbcf79851, /192.168.1.51:47048 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555 4. 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0xbcf79851, /192.168.1.51:47048 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:47048 5. 14/08/10 15:07:56 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } 6. 14/08/10 15:07:56 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 复制代码

i)我们再在 m2 的 sink 窗口中,启动 sink:
1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

j)输入两批测试数据:
1. root@m1:/home/hadoop# echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140 复制代码

k)在 m2 的 sink 窗口,我们可以看到以下信息,因为优先级的关系,log 消息会 再次落到 m2 上:
1. 14/08/10 15:09:47 INFO node.Application: Starting Sink k1 2. 14/08/10 15:09:47 INFO node.Application: Starting Source r1 3. 14/08/10 15:09:47 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }... 4. 14/08/10 15:09:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 5. 14/08/10 15:09:47 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 6. 14/08/10 15:09:47 INFO source.AvroSource: Avro source r1 started. 7. 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] OPEN 8. 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555 9. 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48741 10. 14/08/10 15:09:57 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 11. 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] OPEN 12. 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555 13. 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] CONNECTED: /192.168.1.50:38166 14. 14/08/10 15:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } 15. 14/08/10 15:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } 复制代码

11)案例 11:Load balancing Sink Processor load balance type 和 failover 不同的地方是,load balance 有两个配置,一个是轮

询,一个是随机。两种情况下如果被选择的 sink 不可用,就会自动尝试发送到下一个可用 的 sink 上面。 a)在 m1 创建 Load_balancing_Sink_Processors 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 k2 5. a1.channels = c1 6. 7. #这个是配置 Load balancing 的关键,需要有一个 sink group 8. a1.sinkgroups = g1 9. a1.sinkgroups.g1.sinks = k1 k2 10. a1.sinkgroups.g1.processor.type = load_balance 11. a1.sinkgroups.g1.processor.backoff = true 12. a1.sinkgroups.g1.processor.selector = round_robin 13. 14. # Describe/configure the source 15. a1.sources.r1.type = syslogtcp 16. a1.sources.r1.port = 5140 17. a1.sources.r1.channels = c1 18. 19. 20. # Describe the sink 21. a1.sinks.k1.type = avro 22. a1.sinks.k1.channel = c1 23. a1.sinks.k1.hostname = m1 24. a1.sinks.k1.port = 5555 25. 26. a1.sinks.k2.type = avro 27. a1.sinks.k2.channel = c1 28. a1.sinks.k2.hostname = m2 29. a1.sinks.k2.port = 5555 30. 31. # Use a channel which buffers events in memory 32. a1.channels.c1.type = memory 33. a1.channels.c1.capacity = 1000 34. a1.channels.c1.transactionCapacity = 100 复制代码

b)在 m1 创建 Load_balancing_Sink_Processors_avro 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.c onf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = avro 9. a1.sources.r1.channels = c1 10. a1.sources.r1.bind = 0.0.0.0 11. a1.sources.r1.port = 5555 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码

c)将 2 个配置文件复制到 m2 上一份
1. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processor s.conf 2. root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.c onf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processor s_avro.conf 复制代码

d)打开 4 个窗口,在 m1 和 m2 上同时启动两个 flume agent

1. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.c onf -n a1 -Dflume.root.logger=INFO,console 2. root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

e)然后在 m1 或 m2 的任意一台机器上,测试产生 log,一行一行输入,输入 太快,容易落到一台机器上
1. root@m1:/home/hadoop# echo "idoall.org test1" | nc localhost 5140 2. root@m1:/home/hadoop# echo "idoall.org test2" | nc localhost 5140 3. root@m1:/home/hadoop# echo "idoall.org test3" | nc localhost 5140 4. root@m1:/home/hadoop# echo "idoall.org test4" | nc localhost 5140 复制代码

f)在 m1 的 sink 窗口,可以看到以下信息:
1. 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 2. 14/08/10 15:35:33 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } 复制代码

g)在 m2 的 sink 窗口,可以看到以下信息:
1. 14/08/10 15:35:27 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } 2. 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } 复制代码

说明轮询模式起到了作用。 12)案例 12:Hbase sink

a)在测试之前,请先参考 《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1 分布式环境部署》

将 hbase 启动 b)然后将以下文件复制到 flume 中:
1. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/protobuf-java-2.5.0.jar /home/hadoop/flume-1.5.0-bin/lib 2. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-client-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib 3. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-common-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib 4. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-protocol-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib 5. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-server-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib 6. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop2-compat-0.96.2-hadoo p2.jar /home/hadoop/flume-1.5.0-bin/lib 7. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop-compat-0.96.2-hadoop 2.jar /home/hadoop/flume-1.5.0-bin/lib@@@ 8. cp /home/hadoop/hbase-0.96.2-hadoop2/lib/htrace-core-2.04.jar /home/hadoop/flume-1.5.0-bin/lib 复制代码

c)确保 test_idoall_org 表在 hbase 中已经存在 d)在 m1 创建 hbase_simple 配置文件
1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = syslogtcp 9. a1.sources.r1.port = 5140 10. a1.sources.r1.host = localhost 11. a1.sources.r1.channels = c1 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. a1.sinks.k1.type = hbase

16. a1.sinks.k1.table = test_idoall_org 17. a1.sinks.k1.columnFamily = name 18. a1.sinks.k1.column = idoall 19. a1.sinks.k1.serializer = 21. 22. # Use a channel which buffers events in memory 23. a1.channels.c1.type = memory 24. a1.channels.c1.capacity = 1000 25. a1.channels.c1.transactionCapacity = 100 26. 27. # Bind the source and sink to the channel 28. a1.sources.r1.channels = c1 29. a1.sinks.k1.channel = c1 复制代码 org.apache.flume.sink.hbase.RegexHbaseEventSerializer 20. a1.sinks.k1.channel = memoryChannel

e)启动 flume agent
1. /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console 复制代码

f)测试产生 syslog
1. root@m1:/home/hadoop# echo "hello idoall.org from flume" | nc localhost 5140 复制代码

g)这时登录到 hbase 中,可以发现新数据已经插入
1. root@m1:/home/hadoop# /home/hadoop/hbase-0.96.2-hadoop2/bin/hbase shell 2. 2014-08-10 16:09:48,984 INFO [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available 3. HBase Shell; enter 'help<RETURN>' for list of supported commands. 4. Type "exit<RETURN>" to leave the HBase Shell 5. Version 0.96.2-hadoop2, r1581096, Mon Mar 24 16:03:18 PDT 2014 6. 7. hbase(main):001:0> list 8. TABLE

9. SLF4J: Class path contains multiple SLF4J bindings. 10. SLF4J: Found binding in [jar:file:/home/hadoop/hbase-0.96.2-hadoop2/lib/slf4j-log4j12-1.6.4.jar !/org/slf4j/impl/StaticLoggerBinder.class]

11. SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j 12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] 12. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 13. hbase2hive_idoall

14. hive2hbase_idoall

15. test_idoall_org

16. 3 row(s) in 2.6880 seconds 17. 18. => ["hbase2hive_idoall", "hive2hbase_idoall", "test_idoall_org"] 19. hbase(main):002:0> scan "test_idoall_org" 20. ROW COLUMN+CELL

21. 10086 timestamp=1406424831473, value=idoallvalue 22. 1 row(s) in 0.0550 seconds 23. 24. hbase(main):003:0> scan "test_idoall_org" 25. ROW

column=name:idoall,

COLUMN+CELL

26. 10086 timestamp=1406424831473, value=idoallvalue 27. 1407658495588-XbQCOZrKK8-0 flume 28. 2 row(s) in 0.0200 seconds 29. 30. hbase(main):004:0> quit 复制代码

column=name:idoall,

column=name:payloa

d, timestamp=1407658498203, value=hello idoall.org from

经过这么多 flume 的例子测试,如果你全部做完后,会发现 flume 的功能真的很强大,可以 进行各种搭配来完成你想要的工作,俗话说师傅领进门,修行在个人,如何能够结合你的产 品业务,将 flume 更好的应用起来,快去动手实践吧。


赞助商链接
更多相关文档:

Flume安装和配置

Flume安装和配置_计算机软件及应用_IT/计算机_专业资料。第一章 Flume 基础篇一...collect.flume.plugin 第三章 Flume1.6.0 单机版安装与配置环境:JDK1.6 及以上...

Flume安装使用手册

图 1 Flume 的基础结构 Source:采集数据日志,生成 ...因本 安装示例采用的 Flume1.6.0 版本,所以下载...备注:所有 flume 服务器,均需要完成上述安装配置工作...

整体认识flume:Flume介绍、分布式安装、常见问题及解决...

Flume 的分布式安装 ———此为目前集群的 flume 安装过程 部署 flume 在集群...jdk1.6.0_26 安装 flume 版本:flume-0.9.4 步骤 1: 下载 flume 最新版本,...

大数据实验1——一个简单的flume实验

只是想做一个简单的flume实验,为了后面的事情打基础...1.6.0版本 解压flume,flume的bin目录下执行flume...否则还需要再安 装java 写一个flume配置文件,文件...

Flume集群搭建

环境准备 a) Hadoop 的 HDFS,下载并解压 flume-1.6.0(3 台机器都需要安装)...即 master 的配置文件: 集群启动后,查看各自的进程:Flume 的进程名字叫做“...

flume知识点及安装

flume知识点及安装_计算机软件及应用_IT/计算机_专业...Agent 是 flume 流的基础部分。 Flume 为这些组件...1.6.0 6.切换到 conf 目录下,修改 flume 的配置...

flume总体集群建设方案

Flume 的分布式安装 ——— 此为目前集群的 flume 安装过程 部署 flume 在集群...实战-Flume基础 19页 1下载券 Flume1.6.0入门:安装、... 35页 免费 flume...

flume+kafka+storm安装文档

暂无评价|0人阅读|0次下载|举报文档flume+kafka+storm安装文档_计算机软件及应用_IT/计算机_专业资料。Chapter 1: Flume+Kafka+Storm 安装配置 1.1 实时日志分析...

linux搭建Flume

暂无评价|0人阅读|0次下载|举报文档 linux搭建Flume_计算机软件及应用_IT/计算机_专业资料。linux搭建Flume 常见问题解决linux 下 jdk 的安装与配置 一、安装 JD...

大数据技术之Flume

1.8.0_121 4.2、案例 4.2.1、案例一目标:Flume ...分步实现: 1) 创建 Flume Agent 配置文件 flume-...Flume 的 lib 目录下 (要学会根据自己的目录版本...

更多相关标签:
网站地图

文档资料共享网 nexoncn.com copyright ©right 2010-2020。
文档资料共享网内容来自网络,如有侵犯请联系客服。email:zhit325@126.com