Flume ElasticSearch Sink 配置参数与Agent配置文件示例


1. ElasticSearch Sink 介绍

ElasticSearchSink 可以向elasticsearch 集群中写入数据。默认情况下写入event类型数据以便 Kibana 能够以图形化方式展示他们,就像 logstash 那样。

在使用时需要将elasticsearch和lucence 所需jar文件放进flume 的lib目录下。elasticsearch 要求客户端jar版本与服务器版本一致,并且运行在相同的JVM 版本中。如果版本对应不上将会出现 SerializationExceptions 异常。要选择所需的版本,请首先确定elasticsearch的版本以及目标群集正在运行的JVM版本。然后选择与主要版本匹配的elasticsearch客户端库。0.19.x客户端可以与0.19.x群集通信; 0.20.x可以与0.20.x对话,0.90.x可以与0.90.x对话。 确定elasticsearch版本后,读取pom.xml文件以确定要使用的正确lucene-core JAR版本。运行ElasticSearchSink的Flume agent 程序也应该与目标集群运行的次要版本的JVM匹配。

Events 每天都会写进新的index,新的索引的名称是 -yyyy-MM-dd 是索引名称参数。sink 会在午夜将数据写进新的index。

默认通过ElasticSearchLogStashEventSerializer将 events序列化。可以使用 serializer 参数指定序列化方式。此参数接受 org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializerorg.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory 的实现。为了支持更强大的ElasticSearchIndexRequestBuilderFactory,不t推荐实现ElasticSearchEventSerializer。

类型: org.apache.flume.sink.elasticsearch.ElasticSearchSink

2. ElasticSearch Sink 配置参数

必须的配置用粗体显示

配置名称 默认值 说明
channel
type 组件名称, org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNames 使用逗号分隔 hostname:port 列表,如果不输入端口号默认使用 9300
indexName flume 追加日期的索引名称. 如‘flume’ 追加日期为 ‘flume-yyyy-MM-dd’。
支持任意的标头替换, 例如 %{header} 替换为命名事件头的值。
indexType logs 索引文档类型,默认为log,支持任意标头替换,
如:%{header}替换为命名事件标头的值。
clusterName elasticsearch 要连接的ElasticSearch集群的名称
batchSize 100 每个txn要写入的事件数。
ttl TTL in days, when set will cause the expired documents to be deleted automatically,
if not set documents will never be automatically deleted.
TTL is accepted both in the earlier form of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute), h (hour), d (day) and w (week).
Example a1.sinks.k1.ttl = 5d will set TTL to 5 days.
Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.
serializer org.apache.flume.sink.
elasticsearch.
ElasticSearchLogStashEventSerializer
要使用的ElasticSearchIndexRequestBuilderFactoryElasticSearchEventSerializer。 接受任一类的实现,但首选ElasticSearchIndexRequestBuilderFactory
serializer.* 要传递给序列化程序的属性。

使用标头替换可以方便地使用事件标头的值来动态决定在存储事件时要使用的indexName和indexType。使用此功能时应谨慎,因为事件提交者现在可以控制indexName和indexType。此外,如果使用elasticsearch REST客户端,则事件提交者可以控制使用的URL路径。

3. 测试 Elasticsearch Sink

3.1 配置 agent

从kafka中取出数据保存到es中

#定义source名称
a1.sources = kafka-source
#定义channel名称
a1.channels = memory-channel
#定义sink名称
a1.sinks = es-sink

#配置source
#指定source类型
a1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
#指定zookeeper
a1.sources.kafka-source.zookeeperConnect = node1.com:2181,node2.com:2181,node3.com:2181
#指定kafka topic
a1.sources.kafka-source.topic = es_test
#指定 group
a1.sources.kafka-source.groupId = flume
#设置超时时间
a1.sources.kafka-source.kafka.consumer.timeout.ms = 100
a1.sources.kafka-source.batchSize = 20

a1.sources.kafka-source.interceptors=i1
a1.sources.kafka-source.interceptors.i1.type=regex_extractor
a1.sources.kafka-source.interceptors.i1.regex = (\\w.*):(\\w.*):(\\w.*)\\s 
a1.sources.kafka-source.interceptors.i1.serializers = s1 s2 s3
a1.sources.kafka-source.interceptors.i1.serializers.s1.name = name1
a1.sources.kafka-source.interceptors.i1.serializers.s2.name = name2 
a1.sources.kafka-source.interceptors.i1.serializers.s3.name = name3

#配置channel
a1.channels.memory-channel.type = memory
#通道容量
a1.channels.memory-channel.capacity = 1000
#a1.channels.memory-channel.keep-alive = 100
#a1.channels.memory-channel.capacity = 1000
#通道事物容量
a1.channels.memory-channel.transactionCapacity = 100

#配置sink
a1.sinks.es-sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
#配置es服务地址
a1.sinks.es-sink.hostNames = node3:9300,node3:9300
#配置es集群名称
a1.sinks.es-sink.clusterName = SHANGHAI_XCLOUD_CLUSTER
a1.sinks.es-sink.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
#配置索引名称
a1.sinks.es-sink.indexName = kafka_es
a1.sinks.es-sink.indexType = bar_type
a1.sinks.es-sink.batchSize = 10

#指定source 数据发送到哪个channel
a1.sources.kafka-source.channels = memory-channel
#指定sink 取哪个channel的数据
a1.sinks.es-sink.channel = memory-channel

3.2 启动agent

bin/flume-ng agent -c . -f conf/es-sink-agent.properties -n a1 -Dflume.root.logger=INFO,console

3.3 异常处理


java.lang.NoClassDefFoundError: org/elasticsearch/common/io/BytesStream
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure(ElasticSearchSink.java:294)
    at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:453)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.io.BytesStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 14 more
  • 解决问题

需要将 lucene-core 与 elasticsearch jar 包放进flume_home/lib目录下

3.4 重新启动agent

bin/flume-ng agent -c . -f conf/es-sink-agent.properties -n a1 -Dflume.root.logger=INFO,console

3.5 通过kafka生产者发送数据

website:weblog:transaction_page weblog data3
website:weblog:docs_page weblog data4
syslog:syslog:sysloggroup syslog data1
syslog:syslog:sysloggroup syslog data2
syslog:syslog:sysloggroup syslog data3
syslog:syslog:sysloggroup syslog data4
syslog:syslog:sysloggroup syslog data5

3.6 到es对应index查看数据


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Table 'SYSTEM.CATALOG' was not found, got Table 'SYSTEM.CATALOG' was not found, got
1. 异常日志 Error: Table 'SYSTEM.CATALOG' was not found, got: IP. (state=08000,code=101) org.apache.phoenix.exception.Phoeni
2017-05-20
下一篇 
Flume HBase Sink 配置参数与Agent配置文件示例 Flume HBase Sink 配置参数与Agent配置文件示例
1. HBaseSink这个Sink将数据写入 HBase 。Hbase 配置是从类路径中遇到的第一个hbase-site.xml中获取的。由配置指定的 HbaseEventSerializer 的类用于将事件转换为 HBase put 或
2017-05-19
  目录