7、Logstash
7.1、简介
用途
7.2、部署安装
#检查jdk环境,要求jdk1.8+ java -version #上传压缩包 sudo rz #解压安装包 tar -xvf logstash-6.5.4.tar.gz #第一个logstash示例 bin/logstash -e 'input { stdin { } } output { stdout {} }'
执行效果如下:
7.3、配置详解
Logstash的配置有三部分,如下:
input { #输入 stdin { ... } #标准输入 } filter { #过滤,对数据进行分割、截取等处理 ... } output { #输出 stdout { ... } #标准输出 }
7.3.1、输入
- 采集各种样式、大小和来源的数据,数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
- Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方* 式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
7.3.2、过滤
- 实时解析和转换数据
- 数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
7.3.3、输出
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
7.4、读取自定义日志
7.4.1、日志结构
2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002
可以看到,日志中的内容是使用“|”进行分割的,使用,我们在处理的时候,也需要对数据做分割处理。
7.4.2、编写配置文件
#vim itcast-pipeline.conf input { file { path => "/itcast/logstash/logs/app.log" #读取日志的入口 start_position => "beginning" #从头开始 } } filter { mutate { split => {"message"=>"|"} #分割的符号 } } output { stdout { codec => rubydebug } #输出在控制台 }
7.4.3、启动测试
#启动 ./bin/logstash -f ./itcast-pipeline.conf #写日志到文件 echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002" >> app.log #输出的结果 { "@timestamp" => 2019-03-15T08:44:04.749Z, "path" => "/itcast/logstash/logs/app.log", "@version" => "1", "host" => "node01", "message" => [ [0] "2019-03-15 21:21:21", [1] "ERROR", [2] "读取数据出错", [3] "参数:id=1002" ] }
可以看到,数据已经被分割了。
7.4.5、输出到Elasticsearch
input { file { path => "/itcast/logstash/logs/app.log" #type => "system" start_position => "beginning" } } filter { mutate { split => {"message"=>"|"} } } output { elasticsearch { hosts => [ "master:9200","salve1:9200","salve2.135:9200","salve3.135:9200"] } } #启动 ./bin/logstash -f ./itcast-pipeline.conf #写入数据 echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1003" >> app.log
测试:
8、综合练习
将前面所学习到的Elasticsearch + Logstash + Beats + Kibana整合起来做一个综合性的练习。
8.1、流程说明
* 应用APP生产日志,用来记录用户的操作
[INFO] 2019-03-15 22:55:20 [cn.itcast.dashboard.Main] - DAU|5206|使用优惠券|2019-03-15
03:37:20
[INFO] 2019-03-15 22:55:21 [cn.itcast.dashboard.Main] - DAU|3880|浏览页面|2019-03-15 07:25:09
* 通过Filebeat读取日志文件中的内容,并且将内容发送给Logstash,原因是需要对内容做处理
* Logstash接收到内容后,进行处理,如分割操作,然后将内容发送到Elasticsearch中
* Kibana会读取Elasticsearch中的数据,并且在Kibana中进行设计Dashboard,最后进行展示
8.2、APP介绍
APP在生产环境应该是真实的系统,为了简化操作,所以就做数据的模拟生成即可。
业务代码如下:
package cn.itcast.dashboard; import org.apache.commons.lang3.RandomUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); public static final String[] VISIT = new String[]{"浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券", "搜索", "查看订单"}; public static void main(String[] args) throws Exception { while(true){ Long sleep = RandomUtils.nextLong(200, 1000 * 5); Thread.sleep(sleep); Long maxUserId = 9999L; Long userId = RandomUtils.nextLong(1, maxUserId); String visit = VISIT[RandomUtils.nextInt(0, VISIT.length)]; DateTime now = new DateTime(); int maxHour = now.getHourOfDay(); int maxMillis = now.getMinuteOfHour(); int maxSeconds = now.getSecondOfMinute(); String date = now.plusHours(-(RandomUtils.nextInt(0, maxHour))) .plusMinutes(-(RandomUtils.nextInt(0, maxMillis))) .plusSeconds(-(RandomUtils.nextInt(0, maxSeconds))) .toString("yyyy-MM-dd HH:mm:ss"); String result = "DAU|" + userId + "|" + visit + "|" + date; LOGGER.info(result); } } }
运行结果:
代码在资料中可以找到,itcast-dashboard-generate.zip。也可以自己生成导出jar包。
部署:
#打包成jar包,在linux上运行 java -jar itcast-dashboard-generate-1.0-SNAPSHOT.jar #运行之后,就可以将日志写入到/itcast/logs/app.log文件中
8.3、Filebeat
#vim itcast-dashboard.yml filebeat.inputs: - type: log enabled: true paths: - /itcast/logs/*.log setup.template.settings: index.number_of_shards: 3 output.logstash: hosts: ["master:5044"] #启动 ./filebeat -e -c itcast-dashboard.yml
8.4、Logstash
#vim itcast-dashboard.conf input { beats { port => "5044" } } filter { mutate { split => {"message"=>"|"} } mutate { add_field => { "userId" => "%{message[1]}" "visit" => "%{message[2]}" "date" => "%{message[3]}" } } mutate { convert => { "userId" => "integer" "visit" => "string" "date" => "string" } } } #output { # stdout { codec => rubydebug } #} output { elasticsearch { hosts => [ "master:9200","salve1:9200","salve2.135:9200","salve3.135:9200"] } } #启动 ./bin/logstash -f itcast-dashboard.conf
8.5、Kibana
启动Kibana:
#启动 ./bin/kibana #通过浏览器进行访问 http://master/app/kibana
注意:如果启动Kibana后服务器变得非常卡,可以换一台虚拟机启动Kibana,配置和之前一样。
添加Logstash索引到Kibana中:
8.5.1、时间间隔的柱形图
说明:x轴是时间,以天为单位,y轴是count数
保存:(my-dashboard-时间间隔的柱形图)
8.5.2、各个操作的饼图分布
统计各个操作的数量,形成饼图。
保存:(my-dashboard-各个操作的饼图)
8.5.3、数据表格
在数据探索中进行保存,并且保存,将各个操作的数据以表格的形式展现出来。
保存:(my-dashboard-表格)
8.5.4、制作Dashboard
8.5.5、其他操作
- 可以根据自己的需求,绘制不同的统计图
- 可以点击share,导出链接代码,可以直接使用
- 可以将制作的表格,或者监控到的数据导出为 csv
- 可以自己调整统计图的样式,和实时监控刷新的频率