使用Flume 1.8 与 ES6 实现系统监控

2,240 阅读10分钟

著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

Flume 简介

Flume 是 Apache Software Foundation 的顶级项目,是一个分布式,可靠且可用的系统,是对大数据量的日志进行高效收集、聚集、移动的服务,Flume 只能在 Unix 环境下运行。 它具有基于流数据的简单灵活的架构,具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性。它使用简单的可扩展数据模型,允许在线分析应用程序。可以有效地从许多不同的 Source 收集数据,便于聚合和移动大量日志数据到集中式数据存储。

Flume 原理

首先了解以下两种角色分别表示什么:

  • Flume Event 被定义为具有字节有效负载和可选字符串属性集的数据流单元。
  • Flume agent 是一个(JVM)进程,它承载 Event 从外部 Source 流向下一个目标(跃点)的组件。

下图为 Flume 的原理工作流程图:

在这里插入图片描述

从图可以看出,Source 监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个 Event 中,并 put 到 Channel 后 commit 提交,Channel 队列先进先出,Sink 去 Channel 队列中拉取数据,然后写入到 HDFS 中。

  • Source:用于采集数据,Source 是产生数据流的地方,同时 Source 会将产生的数据流传输到 Channel,这个有点类似于 Java IO 部分的 Channel。
  • Channel:用于桥接 Sources 和 Sinks,类似于一个队列。
  • Sink:从 Channel 收集数据,将数据写到目标源(可以是下一个 Source,也可以是 HDFS 或者 HBase)。
  • Event:传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。

一个简单的例子

这里给出一个示例配置文件,描述单节点 Flume 部署。此配置允许用户生成 Event,然后将其记录输出显示到控制台。

example.conf

# 给a1的sources、sinks、channels定义名称(自定义)
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 设置数据源类型
a1.sources.r1.type = netcat
# 设置数据源绑定的 IP
a1.sources.r1.bind = localhost
# 设置数据源监听的端口
a1.sources.r1.port = 44444

# 设置采集的数据类型为logger
a1.sinks.k1.type = logger

# 设置管道类型为memory内存
a1.channels.c1.type = memory
# 设置管道容量大小
a1.channels.c1.capacity = 1000
# 设置管道吞吐量大小
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

温馨提示:如果输出到本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

此配置定义名为 a1 的单个 agent。a1 有一个监听端口 44444 上的数据的 Source,一个缓冲内存中 Event 数据的 Channel,以及一个将 Event 数据记录到控制台的 Sink。

1. 鉴于此配置文件,我们可以按如下方式启动 Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

2. 开启一个单独的终端,我们可以通过 telnet 端口 44444,向 Flume 发送一个 Event:

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

3. 原 Flume 终端将在控制台中输出日志消息中输出 Event 内容。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

关于更多使用内容可参考 Flume 官网:

Flume 1.8.0 User Guide

Flume1.8 与 ES6 集成

因为当前 (2019-09-01) Flume 的源码支持的 Elasticsearch 的客户端版本是 0.90.1,通过查看 Flume 的 pom.xml 文件内容可以知道。所以如果我们想要集成高版本的 Elasticsearch 就需要修改对应 pom.xml 文件中的 Elasticsearch 版本,对应修改内容如下:

显示的指定依赖中 Elasticsearch 依赖的版本,并加上和 Elasticsearch 相关的其它依赖 jar。

由于修改了 jar 的版本,项目中相关的类也会出现错误,需要修改的地方也不少,但主要修改有 2 个类,因为发起调用的就是这 2 个类,分别是:

  • ElasticSearchRestClient.class
  • ElasticSearchTransportClient.class

下图是修改过的源码所对应的文件夹目录,被红框所圈住的是单元测试的代码,不做重点讲解。

我们先看 flume-parent 下的 pom.xml 的文件变化,通过 IDEA 工具可以发现,修改过的 pom.xml 相比原 pom.xml 有 2 处改动的地方;

  1. 增加了 log4j-core.jar 用户日志打印,给 @Slf4j 使用
  2. 配置将依赖包一并打入到项目的 jar 包中的 Maven 插件(用于构建一个包含应用所有依赖 jar 的 jar 包,方便使用)

接下来以此讲解剩下的内容:

ContentBuilderUtil

ContentBuilderUtil 主要是在 createParser() 方法中新增了一个 NamedXContentRegistry.EMPTY 参数,具体说明请看图中注释部分:

ElasticSearchEventSerializer

相比官方源码的修改部分,把 BytesStream 替换为 XcontentBuilder。

ElasticSearchLogStashEventSerializer

相比官方源码的修改部分,追加 builder.endobject() 方法。

EventSerializerIndexRequestBuilderFactory

因为已经修改过 ElasticSearchEventSerializer 的方法返回类型,所以在 EventSerializerIndexRequestBuilderFactory 中只需要把 BytesStream 替换为 XcontentBuilder 即可。

Maps

这里其实没什么复杂逻辑,就是为了方便,封装了一个简单的 Map 工具类。

flume-ng-elasticsearch/pom.xml

ElasticSearchRestClient

相比官方源码修改部分,content.toBytesArray().toUtf8()); 修改为 content.utf8ToString():

ElasticSearchTransportClient

替换与导入类库。

GitHub 项目源码的修改部分与注释,下面附上整个修改好的项目代码地址,读者可以根据修改后的版本和官方原版本对比,查看出其中的差异。

集成高版本 ES 的 Flume:

flume-modify-es6

Docker 的环境准备

随着容器化技术的普及,越来越多的公司开始使用容器化技术,本文我们也紧跟潮流,使用 Docker 来部署我们的开发运行环境,但考虑部分人的需求,普通方式和容器方式都会介绍。

Flume 的 Linux 环境安装

参考:记一次 linux 上安装配置 flume1.8.0 过程

Flume 的 Docker 环境安装

1. 下载 Flume 镜像文件:

docker pull probablyfine/flume:latest

2. 基于 Flume 镜像文件创建一个容器示例:

# 创建一个名为flume且网络配置为mynetwork的容器,
# 并把容器中/opt/flume-config/flume.conf目录下的文件挂载到宿主机/Users/haha/Documents/flume-config目录下

docker run -d --name flume --net mynetwork -v /Users/haha/Documents/flume-config/: /opt/flume-config/flume.conf/ probablyfine/flume:latest

3. 查看 Docker 中网卡为 mynetwork 配置信息:

**温馨提示:**也可以不指定 --net mynetwork,则其使用默认的 bridge 网络模式。

docker inspect mynetwork

至此,关于 Flume 的环境准备与安装已经完成,接下来进行配置。

Flume 配置文件

因为我们使用的是 Docker 的方式安装,所以需要使用以下命令进入 Flume 容器:

docker exec -it flume bash

进入到容器后可以看到 Flume 是安装在 /opt/ 目录下的,这个位置是 dockerfile 文件中指定的。

如果有需要也可以自己写一份 dockerfile 文件,通过 dockerfile 文件来构建一个 Flume 的镜像,关于 Docker 的其它内容,平台已经有很多了,有兴趣的朋友可以订阅,当然也可以关注我,后期我会发布免费的 Docker 基础使用教程。

进入 flume.conf 目录中可以看到 2 个 .conf 配置文件,还记得之前启动容器时配置的 -v 参数吗?这 2 个文件是在宿主机中创建的,对应的同步到了容器中。

查看 flume-config 目录中后缀名为 .conf 配置文件的内容。

config.conf 输出内容到 ES

#定义sources,channel和sinks的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#配置source的详情
a1.sources.r1.type = exec
# 要读取的log路径
a1.sources.r1.command = tail -F /opt/flume-log/spring.log
# 自定义一个拦截器名 i1
a1.sources.r1.interceptors=i1
# 设置拦截器类型
a1.sources.r1.interceptors.i1.type=regex_extractor
# 设置拦截器参数内容
a1.sources.r1.interceptors.i1.regex =(Order(.*)) &(dataType(.*)&)
# 自定义声明拦截器中的序列化器 s1 s2 (2个)
a1.sources.r1.interceptors.i1.serializers = s1 s2
#a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = orderInfo
a1.sources.r1.interceptors.i1.serializers.s2.name = dataType
# 把source绑定到channel上
a1.sources.r1.channels = c1

# 设置管道类型为内存类型
a1.channels.c1.type = memory
# 设置管道容量大小
a1es.channels.c1.capacity = 10
#  设置管道吞吐量
a1es.channels.c1.transactionCapacity = 10
a1es.channels.c1.keep-alive = 10

#配置sink的详情
# 设置sink发送到elasticsearch
a1.sinks.k1.type=elasticsearch
# 设置sink的具体执行类
#a1.sinks.k1.type=org.apache.flume.sink.elasticsearch.ElasticSearchSink
# 设置批处理大小10
a1.sinks.k1.batchSize=10
a1.sinks.k1.hostNames=172.18.0.11:9300
a1.sinks.k1.indexName=order_index
a1.sinks.k1.indexType=order
a1.sinks.k1.clusterName=elasticsearch
# 设置sinks序列化的执行类
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
#a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
# 绑定sinks到channel上
a1.sinks.k1.channel = c1

flume-console-config.conf 输出到控制台

#定义sources,channel和sinks的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置源类型
a1.sources.r1.type = exec
# 配置源命令
a1.sources.r1.command = tail -f /opt/flume-log/spring.log
# 配置源命令脚本
a1.sources.r1.shell = /bin/bash -c
# 设置sink类型为logger
a1.sinks.k1.type = logger
# 使用一个通道来缓冲内存中的事件
a1.channels.c1.type = memory
# 将源sources和接收器sinks绑定到通道channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

关于 Flume 更多参数描述参考:

FlumeUserGuide

在本次示例当中,宿主机中的 log 日志文件也同步到了 Flume 容器当中,虽然看起来操作有点骚,但是你可以学到更多东西,对吧!其实 Fluem Docker 容器是无法对宿主机进行 IO 读写操作的,为了解决这个问题才把 log 文件同步到容器中便于 Flume 操作,以下是图片展示:

此时已经接近配置的尾声了,但是各位应该还记得我们之前修改过的 Flume 中关于 ES 的源码,到现在都没有用到,现在正式开始介绍如何让 Flume 把 log 数据发送到高版本的 Elasticsearch 中。

Flume 集成自定义版本 ES

使用 IDEA 打开从我的 GitHub 下载的源代码,打开 flume-ng-elasticsearch-sink 工程进行 mvn install,然后找到 mvn install 成功后生成的 flume-ng-elasticsearch-sink-1.8.0.jar, 因为我为 maven plugin 配置了把项目依赖的 jar 包都一并打包,所以只要替换 Flume 容器中对应 jar 包即可。

执行命令:

docker cp flume-ng-elasticsearch-sink-1.8.0.jar flume:/opt/lib/

最后通过如下命令启动 Flume 即可。

$ cd /opt/flume
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/flume-config/flume.conf/config.conf -Dflume.root.logger==INFO,console

ElasticSearch 安装与使用

本文的 ElasticSearch 也是基于 Docker 环境搭建,所以读者可执行如下命令:

# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1

docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

在浏览器中输入 http://localhost:9100/ 可以看到 Flume 已经在 ES 中创建了 index 索引,并将 log 内容也发送了过来。

elasticsearch-head 无法访问 Elasticsearch

ES 与 es-head 是两个独立的进程,当 es-head 访问 ES 服务时,会存在一个跨域问题。所以我们需要修改 ES 的配置文件,增加一些配置项来解决这个问题,如下:

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml  
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"

修改完配置文件后需重启 ES 服务。

elasticsearch-head 查询报 406 Not Acceptable

解决方法:

1. 进入 head 安装目录:

cd _site/

2. 编辑 vendor.js 共有两处:

#6886行   contentType: "application/x-www-form-urlencoded

改成

contentType: "application/json;charset=UTF-8"
#7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&

改成

var inspectData = s.contentType === "application/json;charset=UTF-8" &&

Granfa 的配置与使用

本文的 Granfa 也是基于 Docker 环境搭建,所以读者可执行如下命令:

# 下载对镜像
docker pull grafana/grafana:6.3.2
# 创建容器并运行
docker run -d --name grafana --net mynetwork --ip 172.18.0.12 -p 3000:3000 grafana/grafana:6.3.2

打开浏览器输入以下地址 localhost:3000:

在这里插入图片描述

首页配置数据源 add data source

在这里插入图片描述

在这里插入图片描述

点击 Learn more 可以跳转到 Granfa 使用指南

至此整个环境的部署连调过程完毕!