Flume增量采集Mysql数据

由于业务系统使用的数据库是Mysql,需要对业务进行大数据分析这就要求我们实时采集MySQL的数据。使用flume采集MySQL数据配置较简单,下面是配置的过程。

插件下载

需要的插件

  • mysql-connector-java-5.1.46-bin.jar
  • flume-ng-sql-source-1.4.1.jar
  • 这两个软件需要拷贝到 /usr/local/flume/lib

flume配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
agent.sources.s1.type=org.keedio.flume.source.SQLSource
agent.sources.s1.hibernate.connection.url=jdbc:mysql://localhost:3306/tickapi?useOldAliasMetadataBehavior=true
agent.sources.s1.hibernate.connection.user=root
agent.sources.s1.hibernate.connection.password=123456
agent.sources.s1.hibernate.connection.autocommit=true
agent.sources.s1.hibernate.connection.driver_calss=com.mysql.jdbc.Driver
agent.sources.s1.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
agent.sources.s1.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider
agent.sources.s1.run.query.delay=5000

# 增量配置
# agent.sources.s1.table=lt_api_getallstops
# agent.sources.s1.columns.to.select=*
# agent.sources.s1.incremental.column.name=id
# agent.sources.s1.incremental.value=0
agent.sources.s1.custom.query=select * from lt_api_getallstops order by id desc
# where id > $@$ order by id 注意加了此段SQL会报错:SQL语句异常。
agent.sources.s1.start.from=0 #增量列的初始值
agent.sources.s1.status.file.path=/home/flume
agent.sources.s1.status.file.name=sql-source.status

采集到的效果

1
2
3
4
5
# kafka发送的数据
"1121","大理州","99999999","CES","测试","","7214","大理兴盛汽车客运站","826f023cfa2a50feb92a958bcb16cdf0"
"1120","大理州","75010101","BS","保山","","7214","大理兴盛汽车客运站","826f023cfa2a50feb92a958bcb16cdf0"
"1119","大理州","71010101","KM","昆明","","7213","大理汽车客运北站","826f023cfa2a50feb92a958bcb16cdf0"
"1118","昆明西站","4659","","保山","","71010101","昆明西部客运站","826f023cfa2a50feb92a958bcb16cdf0"

×

谢谢客官

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 插件下载
    1. 1.1. 需要的插件
  2. 2. flume配置
  3. 3. 采集到的效果
,