日志分析系统(zookeeper+flume+kafka)之实时接收数据

日志实时分析系统(单机)

本次教程的主要目的是利用flume采集Apache的access.log数据通过kafka消息订阅服务转发至日志分析程序,其中需要的环境包括:linux系统,Java JDK,zookeeper,flume,kafka,接收数据的程序java程序。

一准备环境

软件下载

1
2
3
4
软件的下载
wget http://www.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
wget http://www.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

结果

1
2
3
4
5
-rw-r--r--. 1 root root  58688757 Oct 27  2018 apache-flume-1.8.0-bin.tar.gz
-rw-r--r--. 1 root root 286821827 Apr 4 14:13 flink-1.8.0-bin-scala_2.12.tgz
-rw-r--r--. 1 root root 63999924 Mar 23 08:57 kafka_2.11-2.2.0.tgz
-rw-r--r--. 1 root root 28678231 Mar 4 2016 scala-2.11.8.tgz
-rw-r--r--. 1 root root 37676320 Apr 1 22:44 zookeeper-3.4.14.tar.gz

环境安装

Java JDK 安装

1
2
3
4
5
6
yum -y install java-1.8.0-openjdk*
安装结果:
java -version
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)

安装zookeeper+flume+kafka

zookeeper安装配置

1
2
3
4
5
6
7
8
9
10
cd /usr/local
tar -zxvf zookeeper-3.4.14.tar.gz
mv ./zookeeper-3.4.14 ./zookeeper
vim ./zookeeper/conf/zoo.cfg
修改配置

tickTime=2000 initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181

flume安装配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cd /usr/local
tar -zxvf apache-flume-1.8.0-bin.tar.gz
mv ./apache-flume-1.8.0 ./flume
vim ./flume/conf/flume-conf.properties

# 配置修改

agent.sources = s1
agent.channels = c1
agent.sinks = k1

# 从指定文件读取数据
agent.sources.s1.type = exec
agent.sources.s1.command = tail -f /home/wwwlogs/access.log
agent.sources.s1.channels = c1

# 配置传输通道
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 100

# 配置kafka接收数据
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList = 10.1.1.35:9092
agent.sinks.k1.topic = test
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
agent.sinks.k1.channel = c1

kafka安装配置

1
2
3
4
5
6
7
8
9
cd /usr/local
tar -zxvf kafka_2.11-2.2.0.tgz
mv ./kafka_2.11-2.2.0 ./kafka
vim ./kafka/conf/server.properties
修改配置

broker.id=1
advertised.listeners=PLAINTEXT://192.168.56.101:9092
zookeeper.connect=127.0.0.1:2181

程序启动

注意启动需要按照顺序来
  • 启动zookeeper :/usr/local/zookeeper/bin/zkServer.sh start &
  • 启动kafka :/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties >/dev/null 2>&1 &
  • 启动flume :/usr/local/flume/bin/flume-ng agent –conf-file /usr/local/flume/conf/flume-conf.properties -c conf/ –name agent -Dflume.root.logger=DEBUG,console &

接收程序开发

IDEA 开发程序 新建maven 工程

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com</groupId>
<artifactId>yang.flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>flink 测试</description>
<packaging>jar</packaging>

<name>yang.flink</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!-- <scope>provided</scope>-->
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.6.1</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>

<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>test</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>
target/classes/lib
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>
com.tonytaotao.flink.FlinkKafka
</mainClass>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

FlinkKafka.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package com.yang.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


public class FlinkKafka {
public static void main(String[] args) throws Exception{

// 引入Flink StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置监控数据流时间间隔(官方叫状态与检查点)
env.enableCheckpointing(1000);

// 配置kafka和zookeeper的ip和端口
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.1.1.35:9092");
properties.setProperty("zookeeper.connect", "10.1.1.35:2181");
properties.setProperty("auto.commit.interval.ms", "false");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "test");
System.out.printf(" Start listener: \n");

KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(properties);
List<String> subscribedTopics = new ArrayList<String>();
//如果需要订阅多个Topic,则在这里add进去即可
//subscribedTopics.add(topic1);
subscribedTopics.add("test");
consumer.subscribe(subscribedTopics);
try {
int index = 0;
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
//System.out.println(record.value().toString()+" --> Content ");
Pattern p3 = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
Matcher matcher = p3.matcher(record.value().toString());
if(matcher.find()){
System.out.println("访问的源IP --> "+matcher.group(1));
System.out.println("访问时间 --> "+matcher.group(4));
System.out.println("请求方法+地址 --> "+matcher.group(5));
System.out.println("HTTP相应状态 --> "+matcher.group(6));
System.out.println("完全URL --> "+matcher.group(8));
System.out.println("HTTP请求头信息 --> "+matcher.group(9));
}

index++;
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//数据库操作操作
//....................
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)));
System.out.println("此次接收到数据 --> 累计数 【" + index +"】");
}
}
} finally {
consumer.close();
}
}

public static final class LineSplitter implements AssignerWithPunctuatedWatermarks<String> {

private static final long serialVersionUID = 1L;


@Nullable
public Watermark checkAndGetNextWatermark(String arg0, long arg1) {
if (null != arg0 && arg0.contains(",")) {
String parts[] = arg0.split(",");
return new Watermark(Long.parseLong(parts[0]));
}
return null;
}

public long extractTimestamp(String arg0, long arg1) {
if (null != arg0 && arg0.contains(",")) {
String parts[] = arg0.split(",");
return Long.parseLong(parts[0]);
}
return 0;
}
}

}

运行项目查看结果

1
2
3
4
5
6
访问的源IP --> 10.1.3.165
访问时间 --> 17/May/2019:14:11:52 +0800
请求方法+地址 --> GET /doc/list HTTP/1.1
HTTP相应状态 --> 200
完全URL --> http://10.1.1.35/doc
HTTP请求头信息 --> Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36 OPR/58.0.3135.132

×

谢谢客官

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 日志实时分析系统(单机)
  2. 2. 一准备环境
    1. 2.1. 软件下载
  3. 3. 环境安装
    1. 3.1. Java JDK 安装
    2. 3.2. 安装zookeeper+flume+kafka
      1. 3.2.1. zookeeper安装配置
      2. 3.2.2. flume安装配置
      3. 3.2.3. kafka安装配置
  4. 4. 程序启动
  5. 5. 接收程序开发
    1. 5.1. IDEA 开发程序 新建maven 工程
  6. 6. 运行项目查看结果
,