日志实时分析系统(单机)
本次教程的主要目的是利用flume采集Apache的access.log数据通过kafka消息订阅服务转发至日志分析程序,其中需要的环境包括:linux系统,Java JDK,zookeeper,flume,kafka,接收数据的程序java程序。
一准备环境
软件下载
1 2 3 4
| 软件的下载 wget http://www.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz wget http://www.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
|
结果
1 2 3 4 5
| -rw-r--r--. 1 root root 58688757 Oct 27 2018 apache-flume-1.8.0-bin.tar.gz -rw-r--r--. 1 root root 286821827 Apr 4 14:13 flink-1.8.0-bin-scala_2.12.tgz -rw-r--r--. 1 root root 63999924 Mar 23 08:57 kafka_2.11-2.2.0.tgz -rw-r--r--. 1 root root 28678231 Mar 4 2016 scala-2.11.8.tgz -rw-r--r--. 1 root root 37676320 Apr 1 22:44 zookeeper-3.4.14.tar.gz
|
环境安装
Java JDK 安装
1 2 3 4 5 6
| yum -y install java-1.8.0-openjdk* 安装结果: java -version openjdk version "1.8.0_212" OpenJDK Runtime Environment (build 1.8.0_212-b04) OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)
|
安装zookeeper+flume+kafka
zookeeper安装配置
1 2 3 4 5 6 7 8 9 10
| cd /usr/local tar -zxvf zookeeper-3.4.14.tar.gz mv ./zookeeper-3.4.14 ./zookeeper vim ./zookeeper/conf/zoo.cfg 修改配置
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/tmp/zookeeper clientPort=2181
|
flume安装配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| cd /usr/local tar -zxvf apache-flume-1.8.0-bin.tar.gz mv ./apache-flume-1.8.0 ./flume vim ./flume/conf/flume-conf.properties
# 配置修改
agent.sources = s1 agent.channels = c1 agent.sinks = k1
# 从指定文件读取数据 agent.sources.s1.type = exec agent.sources.s1.command = tail -f /home/wwwlogs/access.log agent.sources.s1.channels = c1
# 配置传输通道 agent.channels.c1.type = memory agent.channels.c1.capacity = 10000 agent.channels.c1.transactionCapacity = 100
# 配置kafka接收数据 agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.brokerList = 10.1.1.35:9092 agent.sinks.k1.topic = test agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder agent.sinks.k1.channel = c1
|
kafka安装配置
1 2 3 4 5 6 7 8 9
| cd /usr/local tar -zxvf kafka_2.11-2.2.0.tgz mv ./kafka_2.11-2.2.0 ./kafka vim ./kafka/conf/server.properties 修改配置
broker.id=1 advertised.listeners=PLAINTEXT://192.168.56.101:9092 zookeeper.connect=127.0.0.1:2181
|
程序启动
注意启动需要按照顺序来
- 启动zookeeper :/usr/local/zookeeper/bin/zkServer.sh start &
- 启动kafka :/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties >/dev/null 2>&1 &
- 启动flume :/usr/local/flume/bin/flume-ng agent –conf-file /usr/local/flume/conf/flume-conf.properties -c conf/ –name agent -Dflume.root.logger=DEBUG,console &
接收程序开发
IDEA 开发程序 新建maven 工程
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com</groupId> <artifactId>yang.flink</artifactId> <version>0.0.1-SNAPSHOT</version> <description>flink 测试</description> <packaging>jar</packaging>
<name>yang.flink</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.6.1</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.6.1</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.6.1</version> <!-- <scope>provided</scope>--> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.6.1</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
</dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>test</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory> target/classes/lib </outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass> com.tonytaotao.flink.FlinkKafka </mainClass> <classpathPrefix>lib/</classpathPrefix> </manifest> <manifestEntries> <Class-Path>.</Class-Path> </manifestEntries> </archive> </configuration> </plugin> </plugins> </build> </project>
|
FlinkKafka.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| package com.yang.flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition;
import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern;
public class FlinkKafka { public static void main(String[] args) throws Exception{
// 引入Flink StreamExecutionEnvironment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置监控数据流时间间隔(官方叫状态与检查点) env.enableCheckpointing(1000);
// 配置kafka和zookeeper的ip和端口 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.1.1.35:9092"); properties.setProperty("zookeeper.connect", "10.1.1.35:2181"); properties.setProperty("auto.commit.interval.ms", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("session.timeout.ms", "30000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("group.id", "test"); System.out.printf(" Start listener: \n");
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(properties); List<String> subscribedTopics = new ArrayList<String>(); //如果需要订阅多个Topic,则在这里add进去即可 //subscribedTopics.add(topic1); subscribedTopics.add("test"); consumer.subscribe(subscribedTopics); try { int index = 0; while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { //System.out.println(record.value().toString()+" --> Content "); Pattern p3 = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); Matcher matcher = p3.matcher(record.value().toString()); if(matcher.find()){ System.out.println("访问的源IP --> "+matcher.group(1)); System.out.println("访问时间 --> "+matcher.group(4)); System.out.println("请求方法+地址 --> "+matcher.group(5)); System.out.println("HTTP相应状态 --> "+matcher.group(6)); System.out.println("完全URL --> "+matcher.group(8)); System.out.println("HTTP请求头信息 --> "+matcher.group(9)); } index++; } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //数据库操作操作 //.................... consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1))); System.out.println("此次接收到数据 --> 累计数 【" + index +"】"); } } } finally { consumer.close(); } } public static final class LineSplitter implements AssignerWithPunctuatedWatermarks<String> {
private static final long serialVersionUID = 1L;
@Nullable public Watermark checkAndGetNextWatermark(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(","); return new Watermark(Long.parseLong(parts[0])); } return null; }
public long extractTimestamp(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(","); return Long.parseLong(parts[0]); } return 0; } }
}
|
运行项目查看结果
1 2 3 4 5 6
| 访问的源IP --> 10.1.3.165 访问时间 --> 17/May/2019:14:11:52 +0800 请求方法+地址 --> GET /doc/list HTTP/1.1 HTTP相应状态 --> 200 完全URL --> http://10.1.1.35/doc HTTP请求头信息 --> Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36 OPR/58.0.3135.132
|