Flume自定义数据源

大数据采集终端使用Flume时可以直接连接数据库,但是提供的数据采集方式较为简单。还好我们可以自定义flume的source插件,下面为大家讲解一下使用的过程。

maven项目构建

pom.xml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>flume</groupId>
<artifactId>com.source</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>com.source</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>

<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180813</version>
</dependency>

<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>test</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>
target/classes/lib
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>
flume.com.source。ApiSource
</mainClass>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package flume.com.source;

import org.json.JSONObject;
/*
* 采集工具配置类
* */
public class Configure {

/*
* API接口中心配置
* 数据库连接参数配置
* */
public static JSONObject getApiConf(){
JSONObject json = new JSONObject();
json.put("driver","com.mysql.jdbc.Driver");
json.put("url","jdbc:mysql://localhost:3306/tickapi");
json.put("user","root");
json.put("password","123456");
return json;
}

}

数据库连接类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package flume.com.source;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.json.JSONObject;

public class MysqlCon {
public static Connection getConn(){
//声明Connection对象
Connection con = null;
Statement statement = null;
//遍历查询结果集
try {
//加载驱动程序
JSONObject conf = Configure.getApiConf();
Class.forName(conf.optString("driver"));
//1.getConnection()方法,连接MySQL数据库!!
con = DriverManager.getConnection(conf.optString("url"),conf.optString("user"),conf.optString("password"));
if(!con.isClosed()){
System.out.println("采集目标数据库连接成功!");
}
} catch(ClassNotFoundException e) {
//数据库驱动类异常处理
System.out.println("Sorry,can`t find the Driver! 找不到数据库连接驱动! ");
e.printStackTrace();
} catch(SQLException e) {
//数据库连接失败异常处理
e.printStackTrace();
}catch (Exception e) {
// TODO: handle exception
e.printStackTrace();

}finally{
//System.out.println("数据连接结束!!");
}
return con;
}
}

flume数据源类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package flume.com.source;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.json.JSONObject;

/*
* API客票接口数据库业务数据实时采集类
* */
public class ApiSource extends AbstractSource implements Configurable, PollableSource{

@Override
public long getBackOffSleepIncrement() {
// TODO Auto-generated method stub
return 0;
}

@Override
public long getMaxBackOffSleepInterval() {
// TODO Auto-generated method stub
return 0;
}

@Override
public Status process() throws EventDeliveryException {
try {
while (true) {
List<JSONObject> list = readData();
for(JSONObject obj : list) {
long s=System.currentTimeMillis();
HashMap<String, String> header = new HashMap<String, String>();
header.put("id", Long.toString(s));
this.getChannelProcessor()
.processEvent(EventBuilder.withBody(obj.toString(), Charset.forName("UTF-8"), header));
}

Thread.sleep(5000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

@Override
public void configure(Context arg0) {

}
/*
* 读取数据库的数据信息
* */
public List<JSONObject> readData(){
List<JSONObject> list = new ArrayList<>();
try {
//2.创建statement类对象,用来执行SQL语句!!
Connection con = MysqlCon.getConn();
if(con != null){
Statement statement = con.createStatement();
//要执行的SQL语句
String sql = "select * from lt_api_getallstops limit 0,5 ";
//3.ResultSet类,用来存放获取的结果集!!
ResultSet rs = statement.executeQuery(sql);
String job = null;
String id = null;
while(rs.next()){
JSONObject obj = new JSONObject();
//数据封装逻辑
obj.put("id", rs.getString("id"));
obj.put("Area", rs.getString("Area"));
obj.put("StopName", rs.getString("StopName"));
list.add(obj);
}
rs.close();
con.close();
}

} catch(SQLException e) {
//数据库驱动类异常处理
System.out.println("Sorry,can`t find the Driver! 找不到数据库连接驱动! ");
e.printStackTrace();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}finally{
//System.out.println("数据连接结束 -- 业务 !!");
}
return list;
}
}

Flume配置

  • 配置较为简单首先把生成的插件导入/flume/lib/ 文件夹内
  • mysql-source.conf 配置->agent.sources.s1.type = flume.com.source.ApiSource

启动

  • /usr/local/flume/bin/flume-ng agent –conf-file /usr/local/flume/conf/mysql-source.conf -c conf/ –name agent -Dflume.root.logger=DEBUG,console

接收效果

1
2
3
4
5
{"id":"1085","Area":"昆明西站","StopName":"大姚"}
{"id":"1084","Area":"昆明西站","StopName":"楚雄"}
{"id":"1083","Area":"昆明西站","StopName":"孟定"}
{"id":"1082","Area":"昆明西站","StopName":"弥渡"}
{"id":"1081","Area":"昆明西站","StopName":"苗尾"}

×

谢谢客官

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. maven项目构建
    1. 1.1. pom.xml配置
    2. 1.2. 配置类
    3. 1.3. 数据库连接类
    4. 1.4. flume数据源类
  2. 2. Flume配置
  3. 3. 启动
  4. 4. 接收效果
,