fluent-plugin-datahub
DataHub基本介绍
DataHub服务是阿里云提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。
DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,用户可以轻松使用SQL进行流数据分析。
DataHub服务也提供流式数据归档的功能,支持流式数据归档进入MaxCompute(原ODPS)。
环境要求
使用此插件,需要具备如下环境:
- Ruby 2.1.0 或更新
- Gem 2.4.5 或更新
- Fluentd-0.12 或更新 (Home Page)
- Ruby-devel
安装部署
安装部署Fluentd可以选择以下两种方式之一。
- 一键安装包适用于第一次安装Ruby&Fluentd环境的用户或局域网用户,一键安装包包含了所需的Ruby环境以及Fluentd。目前一键安装包仅支持Linux环境。
- 通过网络安装适用于对Ruby有了解的用户,需要提前确认Ruby版本,若低于2.1.0则需要升级或安装更高级的Ruby环境,然后通过RubyGem安装Fluentd。
注:
gem install --local fluent-plugin-datahub-0.0.1.gem
安装方式一:一键安装包安装
- 下载解压 fluentd-with-datahub-0.12.23.tar.gz
- 可以修改install.sh中$DIR为你想安装ruby的路径,默认会安装在当前路径下面
- 执行如下命令,提示“Success”表示安装成功
bash install.sh
- fluentd程序会被安装在当前目录的bin目录下面
安装方式二:通过网络安装
- Ruby安装(已经存在Ruby 2.1.0以上环境可忽略此步骤):
wget https://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.0.tar.gz
tar xzvf ruby-2.3.0.tar.gz
cd ruby-2.3.0
./configure --prefix=DIR
make
make install
2 Fluentd以及插件安装
$ gem install fluent-plugin-datahub
插件使用示例
示例一 上传csv文件中的数据
配置
<source>
@type tail
path ${DIR}/csv_sample.csv
tag test1
format csv
keys id,name,gender,salary,my_time
</source>
<match test1>
@type datahub
access_id
access_key
endpoint
project_name test_project
topic_name fluentd_out_7
# shard_id 6
column_names ["id", "name", "gender", "salary", "my_time"]
flush_interval 10s
dirty_data_continue true
dirty_data_file ${DIR}/dirty.data
retry_times 3
</match>
示例二 上传日志文件中的数据
配置
source>
@type tail
path ${DIR}/log_sample.log
tag test
format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
</source>
<match test>
@type datahub
access_id yourAccessId
access_key yourAccessKey
endpoint yourEndpoint
project_name test_project
topic_name datahub_fluentd_out_1
column_names ["thread_id", "log_level", "class"]
</match>
1、source标签中的keys为源数据,会根据key对应fields中字段<br>
2、match标签中的column_names为要写入datahub的字段
3、具体数据样例可参见gem包中的sample文件
参数说明
access_id :阿里云access_id.
access_key :阿里云access key.
endpoint :DataHub Endpoint
project_name :datahub project name
topic_name :datahub topic name
retry_times :重试次数, 默认1
retry_interval :重试周期,下一次重试的间隔,单位为秒, 默认3
column_names :提交的列名,用户可以配置topic的列,采集部分列或者全部列,默认为空数组,表示按照topic的顺序及全字段提交,另外:列的配置不用保序,但是要求该字段在topic的schema中存在
source_keys :指定源头采集的keys, record 按照这些keys 获取数据, 写入datahub, 默认空数组, 此时record使用column_names 获取数据, 写入datahub
dirty_data_continue :当出现脏数据时,是否继续写入,当开启该开关,必须指定@dirty_data_file文件
dirty_data_file :脏数据文件名称,当数据文件名称,在@dirty_data_continue开启的情况下,需要指定该值,特别注意:脏数据文件将被分割成两个部分.part1和.part2,part1作为更早的脏数据,part2作为更新的数据
shard_id :写入指定的 shard_id,默认轮询发送
shard_keys :按照指定字段的值计算hash,依据于该hash值落某个shard
retry_limit :fluentd自带的 retry次数, 由于可能导致数据重写,该参数默认设置为0
put_data_batch_size :多少条数据 写一次datahub, 默认100条,请不要超出1000条。
data_encoding :默认使用源数据标示的encode方式,根据string.encoding获取,如果需要指定源数据编码方式,请设置该值,支持的类型:"US-ASCII", "ASCII-8BIT", "UTF-8",