大数据采集终端使用Flume时可以直接连接数据库,但是提供的数据采集方式较为简单。还好我们可以自定义flume的source插件,下面为大家讲解一下使用的过程。
maven项目构建
pom.xml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>flume</groupId> <artifactId>com.source</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>com.source</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20180813</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.8.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>test</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory> target/classes/lib </outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass> flume.com.source。ApiSource </mainClass> <classpathPrefix>lib/</classpathPrefix> </manifest> <manifestEntries> <Class-Path>.</Class-Path> </manifestEntries> </archive> </configuration> </plugin> </plugins> </build> </project>
|
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package flume.com.source;
import org.json.JSONObject; /* * 采集工具配置类 * */ public class Configure { /* * API接口中心配置 * 数据库连接参数配置 * */ public static JSONObject getApiConf(){ JSONObject json = new JSONObject(); json.put("driver","com.mysql.jdbc.Driver"); json.put("url","jdbc:mysql://localhost:3306/tickapi"); json.put("user","root"); json.put("password","123456"); return json; }
}
|
数据库连接类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package flume.com.source;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement;
import org.json.JSONObject;
public class MysqlCon { public static Connection getConn(){ //声明Connection对象 Connection con = null; Statement statement = null; //遍历查询结果集 try { //加载驱动程序 JSONObject conf = Configure.getApiConf(); Class.forName(conf.optString("driver")); //1.getConnection()方法,连接MySQL数据库!! con = DriverManager.getConnection(conf.optString("url"),conf.optString("user"),conf.optString("password")); if(!con.isClosed()){ System.out.println("采集目标数据库连接成功!"); } } catch(ClassNotFoundException e) { //数据库驱动类异常处理 System.out.println("Sorry,can`t find the Driver! 找不到数据库连接驱动! "); e.printStackTrace(); } catch(SQLException e) { //数据库连接失败异常处理 e.printStackTrace(); }catch (Exception e) { // TODO: handle exception e.printStackTrace(); }finally{ //System.out.println("数据连接结束!!"); } return con; } }
|
flume数据源类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| package flume.com.source;
import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Random; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.json.JSONObject;
/* * API客票接口数据库业务数据实时采集类 * */ public class ApiSource extends AbstractSource implements Configurable, PollableSource{ @Override public long getBackOffSleepIncrement() { // TODO Auto-generated method stub return 0; } @Override public long getMaxBackOffSleepInterval() { // TODO Auto-generated method stub return 0; } @Override public Status process() throws EventDeliveryException { try { while (true) { List<JSONObject> list = readData(); for(JSONObject obj : list) { long s=System.currentTimeMillis(); HashMap<String, String> header = new HashMap<String, String>(); header.put("id", Long.toString(s)); this.getChannelProcessor() .processEvent(EventBuilder.withBody(obj.toString(), Charset.forName("UTF-8"), header)); } Thread.sleep(5000); } } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Override public void configure(Context arg0) { } /* * 读取数据库的数据信息 * */ public List<JSONObject> readData(){ List<JSONObject> list = new ArrayList<>(); try { //2.创建statement类对象,用来执行SQL语句!! Connection con = MysqlCon.getConn(); if(con != null){ Statement statement = con.createStatement(); //要执行的SQL语句 String sql = "select * from lt_api_getallstops limit 0,5 "; //3.ResultSet类,用来存放获取的结果集!! ResultSet rs = statement.executeQuery(sql); String job = null; String id = null; while(rs.next()){ JSONObject obj = new JSONObject(); //数据封装逻辑 obj.put("id", rs.getString("id")); obj.put("Area", rs.getString("Area")); obj.put("StopName", rs.getString("StopName")); list.add(obj); } rs.close(); con.close(); } } catch(SQLException e) { //数据库驱动类异常处理 System.out.println("Sorry,can`t find the Driver! 找不到数据库连接驱动! "); e.printStackTrace(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); }finally{ //System.out.println("数据连接结束 -- 业务 !!"); } return list; } }
|
Flume配置
- 配置较为简单首先把生成的插件导入/flume/lib/ 文件夹内
- mysql-source.conf 配置->agent.sources.s1.type = flume.com.source.ApiSource
启动
- /usr/local/flume/bin/flume-ng agent –conf-file /usr/local/flume/conf/mysql-source.conf -c conf/ –name agent -Dflume.root.logger=DEBUG,console
接收效果
1 2 3 4 5
| {"id":"1085","Area":"昆明西站","StopName":"大姚"} {"id":"1084","Area":"昆明西站","StopName":"楚雄"} {"id":"1083","Area":"昆明西站","StopName":"孟定"} {"id":"1082","Area":"昆明西站","StopName":"弥渡"} {"id":"1081","Area":"昆明西站","StopName":"苗尾"}
|