flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
核心概念:
Agent:一个agent就是一个JVM进程,一个agent中包含多个sources和sinks。
Client:生产数据。
Source:从Client收集数据,传递给Channel。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:从Channel收集数据,运行在一个线程。
Events:可以是日志记录、avro对象等。
基本结构:
Flume以agent为最小的运行单位。单agent由Source、Sink和Channel三大组件构成,如下图:
复杂结构:
Flume agent可以根据需求配置成多种形式的拓扑结构
Agent顺序连接
Agent联合
多路复用
网络流:
Flume支持Avro、Thrift、Syslog、Netcat流数据采集
Source
表1 Source类型
| 类型 | 描述 |
| Avro Source | 监听Avro端口并接收Avro Client的流数据 |
| Thrift Source | 监听Thrift端口并接收Thrift Client的流数据 |
| Exec Source | 基于Unix的command在标准输出上生产数据 |
| JMS Source | 从JMS(Java消息服务)采集数据 |
| Spooling Directory Source | 监听指定目录 |
| Twitter 1% firehose Source (experimental) | 通过API持续下载Twitter数据(实验阶段) |
| Kafka Source | 采集Kafka topic中的message |
| NetCat Source | 监听端口(要求所提供的数据是换行符分隔的文本) |
| Sequence Generator Source | 序列产生器,连续不断产生event,用于测试 |
| Syslog Sources | 采集syslog日志消息,支持单端口TCP、多端口TCP和UDP日志采集 |
| HTTP Source | 接收HTTP POST和GET数据 |
| Stress Source | 用于source压力测试 |
| Legacy Sources | 向下兼容,接收低版本Flume的数据 |
| Custom Source | 自定义source的接口 |
| Scribe Source | 从 Scribe采集数据 |
接收Avro Client的流数据,客户端与采集端需要统一序列化模板
主要参数配置
其中threads决定最多有多少个线程来处理RPC请求
源码分析
通过org.jboss.netty实现,启动一个Netty Server来提供RPC服务
socketChannelFactory:主要负责生产网络通信相关的Channel和ChannelSink实例,NIO Server端一般使用NioServerSocketChannelFactory,用户也可以定制自己的ChannelFactory。
pipelineFactory:主要用来对传输数据的处理,由于对数据的处理属于业务相关,用户应自己实现ChannelPipelineFactory,然后往ChannelPipelineFactory添加自定义的Handler
启动Netty的Server端时都会设置两个ExecutorService对象,我们都习惯用boss,worker两个变量来引用这两个对象。在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。
Spooling Directory Source
监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到spool目录下的文件不可以再打开编辑;不具备目录递归功能。另外需要一个程序对监测目录定期维护,如:备份文件、删除错误文件等。
主要参数配置
spoolDir:监听目录
deletePolicy:当文件中数据全部读取到channel后,源文件处理(立即删除或者重命名)
batchSize:source缓存大小
deserializer:序列化,默认按行划分event,可自定义序列化规则
Channel
表2 Channel类型
| 类型 | 说明 |
| Memory Channel | Event数据存储在内存中 |
| JDBC Channel | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
| File Channel | Event数据存储在磁盘文件中 |
| Spillable Memory Channel | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
| Pseudo Transaction Channel | 测试用途 |
| Custom Channel | 自定义Channel实现 |
MemoryChannel: 所有的events被保存在内存中。优点是高吞吐。缺点是容量有限并且Agent死掉时会丢失内存中的数据。
FileChannel: 所有的events被保存在文件中。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。
上述两种Channel,优缺点相反,分别有自己适合的场景。然而,对于大部分应用来说,我们希望Channel可以同提供高吞吐和大缓存。美团开发DualChannel:
DualChannel:基于 MemoryChannel和 FileChannel开发。当堆积在Channel中的events数小于阈值时,所有的events被保存在MemoryChannel中,Sink从MemoryChannel中读取数据; 当堆积在Channel中的events数大于阈值时, 所有的events被自动存放在FileChannel中,Sink从FileChannel中读取数据。这样当系统正常运行时,我们可以使用MemoryChannel的高吞吐特性;当系统有异常时,我们可以利用FileChannel的大缓存的特性。
多路复用
一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的。Multiplexing方式是根据特定类型分流,每个channel中数据是不同的,适合做类型区分。
Sink
表3 Sink类型
| 类型 | 说明 |
| HDFS Sink | 数据写入HDFS |
| Logger Sink | 数据写入日志文件 |
| Avro Sink | 数据被转换成Avro Event,然后发送到配置的RPC端口上 |
| Thrift Sink | 数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
| IRC Sink | 数据在IRC上进行回放 |
| File Roll Sink | 存储数据到本地文件系统 |
| Null Sink | 丢弃到所有数据 |
| Hive Sink | 数据写入Hive |
| HBase Sink | 数据写入HBase数据库 |
| Morphline Solr Sink | 数据发送到Solr搜索服务器(集群) |
| ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群) |
| Kite Dataset Sink | 写数据到Kite Dataset,试验性质的 |
| Kafka Sink | 数据写到Kafka Topic |
| Custom Sink | 自定义Sink实现 |
把events写入HDFS中,目前支持文本文件和SequenceFile,同时支持数据压缩。文件可以根据运行时间、消息数目和文件大小同时作用来控制文件关闭和新建。
主要参数配置
hdfs.path:路径
hdfs.rollInterval:多长时间新建文件
hdfs.rollSize:多大时新建文件
hdfs.rollCount:有多少条消息时新建文件
hdfs.batchSize:批量写入hdfs的event个数
hdfs.fileType:文件类型,支持SequenceFile、DataStream和CompressedStream
hdfs.threadsPoolSizeflume:操作hdfs的线程数(包括新建,写入等)
Kafka Sink
可以把数据发布到一个Kafka topic,有效得整合Flume与Kafka
主要参数配置
topic:对应Kafka的topic
batchSize:批量写入Kafka的event个数
Other Kafka Producer Properties:其它Kafka参数
内部负载均衡
负载均衡片处理器提供在多个Sink之间负载平衡的能力。实现支持通过round_robin(轮询)或者random(随机)参数来实现负载分发,默认情况下使用round_robin,但可以通过配置覆盖这个默认值。还可以通过集成AbstractSinkSelector类来实现用户自己的选择机制。
故障转移
FailoverSink Processor会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。
故障转移的工作原理是将连续失败sink分配到一个池中,在那里被分配一个冷冻期,在这个冷冻期里,这个sink不会做任何事。一旦sink成功发送一个event,sink将被还原到live 池中。
在这配置中,要设置sinkgroups processor为failover,需要为所有的sink分配优先级,所有的优先级数字必须是唯一的,这个得格外注意。此外,failover time的上限可以通过maxpenalty 属性来进行设置。
flume集群管理
flume监控
Flume本身提供了http, ganglia的监控服务,而目前主要使用zabbix做监控。另一方面,净化Flume的metrics。只将我们需要的metrics发送给zabbix,避免 zabbix server造成压力。
表4 可监控指标(metrics)
| Source | |
| Type | 类型(SOURCE) |
| OpenConnectionCount | 当前有效的连接数 |
| AppendBatchAcceptedCount | source端刚刚追加放入channel的批量数 |
| AppendBatchReceivedCount | source端刚刚追加的批量的数量,比如一批100,该度量为2,就是source端收到了200个events |
| EventAcceptedCount | source端目前成功放入channel的event数量 |
| EventReceivedCount | source端已经收到的event数量 |
| AppendReceivedCount | source端刚刚追加的目前收到的event数量 |
| AppendAcceptedCount | source端刚刚追加放入channel的event数量 |
| StartTime | 开始时间 |
| StopTime | 停止时间 |
| Channel | |
| Type | 类型(CHANNEL) |
| EventPutAttemptCount | 正在放进通道的event数量 |
| EventPutSuccessCount | 成功放入通道的event数量 |
| EventTakeSuccessCount | 从通道中成功取出event的数量 |
| EventTakeAttemptCount | 正在从通道中取event的数量 |
| ChannelSize | Queue(take与put)队列的大小之和 |
| ChannelCapacity | 容量 |
| ChannelFillPercentage | 通道使用比例 |
| StartTime | 开始时间 |
| StopTime | 停止时间 |
| Sink | |
| Type | 类型(SINK) |
| ConnectionFailedCount | sink端连接失败的次数 |
| ConnectionCreatedCount | sink端连接数 |
| ConnectionClosedCount | 连接关闭的次数 |
| BatchEmptyCount | 批量取空的次数 |
| BatchCompleteCount | 成功完成输出的批量事件个数 |
| BatchUnderflowCount | 没有达到batchsize的批量event数目,也就是这一批没有达到batchsize就处理了,根据这个值可调整batchsize |
| EventDrainSuccessCount | 成功处理的event数量 |
| EventDrainAttemptCount | 试图消耗的事件数量,从通道中拿来消耗 |
| StartTime | 开始时间 |
| StopTime | 停止时间 |