[TOC]

1、日志收集所解决的问题

  1. ⽣产环境出现问题后,需要查看各种⽇志进⾏分析排错;
  2. 日常运维工作,日志文件分散,运维工作繁琐。可以实现日志聚合;
  3. 开发⼈员没有登陆服务器的权限。开发人员可以通过web界面查看日志;
  4. 面对大量的访问日志,可以统计出各项指标,比如PV UV。

    2、 Elastic Stack 组件介绍

  • Elasticsearch(简称ES)是一个分布式、RESTful风格的开源搜索和数据分析引擎,专为全文搜索、结构化搜索、分析、和数据可视化而设计。支持几乎实时的搜索,并且具有高扩展性,能够处理PB级别的数据。
    • Index(索引): 索引类似于关系型数据库中的表,它存储着相似结构的数据。在Elasticsearch中,每个索引包含一个或多个文档(Document)。
    • Document(文档): 文档是Elasticsearch的基本数据单位,相当于关系型数据库中的一行记录。文档以JSON格式存储,包含字段和值的对。
    • Shards(分片)和Replicas(副本): Elasticsearch将数据分为多个分片存储,每个索引可以包含一个或多个分片。分片可以提高并行查询的性能。副本是分片的备份,保证了数据的高可用性和容错性。
    • Cluster(集群): Elasticsearch集群由一个或多个节点(Node)组成。每个节点负责存储部分数据,并参与集群中的索引和搜索操作。集群有一个主节点,用于管理集群状态。
    • Node(节点): 节点是Elasticsearch的运行实例,每个节点属于某个集群。节点可以充当主节点、数据节点。
    • 使用场景:
      • 日志和事件数据分析: Elasticsearch常用于集中化日志管理和分析,帮助企业在海量日志中快速定位问题。
      • 电商网站的搜索功能: Elasticsearch被广泛用于电商网站的产品搜索,提供快速、相关性高的搜索结果。
      • 数据存储与检索: Elasticsearch能有效存储和检索结构化或非结构化数据,适用于大规模数据的管理。
  • Logstash
    • Logstash 是一个开源的数据收集、处理和传输引擎,主要用于实时的数据管道管理。它是 Elastic Stack(ELK Stack:Elasticsearch、Logstash、Kibana)的重要组成部分,负责将各种来源的数据收集起来,进行过滤和格式化处理,并最终将其输出到指定的目标,如 Elasticsearch、文件或其他存储系统。
  • Kibana
    • Kibana 提供强大的数据可视化能力,使用户可以在浏览器中实时查看、搜索和分析存储在 Elasticsearch 中的数据。它提供丰富的可视化选项,如图表、地图和表格,支持构建交互式仪表板。
  • Filebeat
    • Filebeat 是一个轻量级的日志收集和传输工具。它通常用于从各种数据源收集日志,并将这些日志传输到 Logstash 或 Elasticsearch 进行进一步处理和分析。Filebeat 是一个边车代理,安装在需要监控的服务器上,用于高效地读取和转发日志数据。它旨在减少资源消耗,提供可靠的日志传输,并确保在出现故障时不会丢失日志数据。

      一、安装JAVA环境(所有es节点,logstash节点,kinbana节点)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      #>>> 上传jdk1.8安装包
      [root@elk03 ~]# ll jdk-8u381-linux-x64.tar.gz
      -rw-r--r--. 1 root root 139273048 7月 31 10:23 jdk-8u381-linux-x64.tar.gz

      #>>> 解压jdk到指定目录
      [root@elk03 ~]# tar xf jdk-8u381-linux-x64.tar.gz -C /usr/local/

      #>>> 创建软链接
      [root@elk03 ~]# cd /usr/local/
      [root@elk03 local]# ln -s jdk1.8.0_381/ java

      #>>> 声明Java环境变量
      [root@elk03 local]# cat >> /etc/profile.d/jdk.sh <<-EOF
      #!/bin/bash
      export JAVA_HOME=/usr/local/java
      export PATH=\$PATH:\$JAVA_HOME/bin
      EOF
      #>>> 重新加载配置文件
      [root@elk03 local]# source /etc/profile.d/jdk.sh

      #>>> 测试java
      [root@elk03 local]# java -version
      java version "1.8.0_381"
      Java(TM) SE Runtime Environment (build 1.8.0_381-b09)
      Java HotSpot(TM) 64-Bit Server VM (build 25.381-b09, mixed mode)

      二、 安装ES集群

      1、单节点部署

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      #>>> 解压es安装包到指定目录
      [root@elk01 ~]# tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz -C /usr/local

      #>>> 创建es软链接
      [root@elk01 ~]# cd /usr/local/
      [root@elk01 local]# ln -s elasticsearch-7.17.11/ es

      #>>> 声明ES环境变量
      [root@elk01 loacl]# cat >> /etc/profile.d/es.sh <<-EOF
      #! /bin/bash
      export ES_HOME=/usr/local/es
      export PATH=$PATH:\$ES_HOME/bin
      EOF

      #>>> 重新加载环境变量
      [root@elk01 local]# source /etc/profile.d/es.sh

      # 启动es实例
      [root@elk01 ~]# elasticsearch

      此时启动会有报错信息,记住详细看报错信息哟!嘿嘿。报错信息如下:此服务不能以超级管理员的身份运行,需要单独创建es普通用户。

解决方法如下操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#>>> 创建用于启动ES的用户
[root@elk01 local]# useradd es

#>>> 查看用户是否创建
[root@elk01 ~]# id es
uid=1000(es) gid=1000(es) 组=1000(es)

#>>> 修改elasticsearch属主和数组
[root@elk01 ~]# chown -R es.es /usr/local/elasticsearch-7.17.11/
#>>> 查看是否修改成功
[root@elk01 ~]# ll /usr/local/elasticsearch-7.17.11/
#>>> 切换es工作目录
[root@elk01 ~]# cd /usr/local/elasticsearch-7.17.11/config/

#>>> 备份es配置文件
[root@elk01 config]# cp elasticsearch.yml{,.bak}

#>>> 修改文件相关参数
[root@elk01 config]# egrep -v "^(#|$)" elasticsearch.yml
···
# es集群名称
cluster.name: study-elk-cluster
# 节点名称(一般以当前主机名一致)
node.name: elk01
# 监听地址,,配置IP则知允许基于IP地址进行访问
network.host: 0.0.0.0
# 自动发现节点IP
discovery.seed_hosts: ["192.168.100.160"]
# 初始化master节点
cluster.initial_master_nodes: ["192.168.100.160"]
# 添加以下参数关闭geoip数据库的更新,减少对带宽和存储的消耗,禁用自动更新。GeoIP根据IP地址确定地理位置的技术,用于日志分析、用户活动追踪等场景。
ingest.geoip.downloader.enabled: false
···

#>>> 测试启动ES
[root@elk01 ~]# su -c "elasticsearch" es

虽然es能够正常启动,但是启动日志会出下报错信息,需要我们修改启动的内核参数,第一个为修改系最大的文件描述符,第二个修改系统的虚拟内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#>>> 修改es需要的limits参数(重新连接会话框才能成功加载参数)
[root@elk01 logs]# cat >>/etc/security/limits.d/elk.conf <<EOF
* soft nofile 65535
* hard nofile 131070
EOF
# 参数解释:
soft nofile 65535表示将文件描述符的软限制设置为65535。
hard nofile 131070表示将文件描述符的硬限制设置为131070。


#>>> 查看limits参数是否加载
[root@elk01 ~]# ulimit -Sn
65535
[root@elk01 ~]# ulimit -Hn
131070

#>>> 修改内核参数
[root@elk01 config]# cat >> /etc/sysctl.d/elk.conf <<'EOF'
vm.max_map_count = 262144
EOF
# 参数解释
vm.max_map_count = 262144 用于设置单个进程可以拥有的内存映射的最大数量。

#>>> 加载内核参数
[root@elk01 config]# sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
[root@elk01 config]# sysctl -q vm.max_map_count
vm.max_map_count = 262144


#>>> 测试启动ES
[root@elk01 ~]# su -c "elasticsearch" es
#>>> 或者使用一下的参数实现后台运行
[root@elk01 ~]# su -c "elasticsearch -d" es

#>>> 测试节点
[root@elk01 ~]# curl 192.168.100.160:9200
{
"name" : "elk01", # 节点的名称,一般以当前主机名命名。
"cluster_name" : "study-elk-cluster", # Elasticsearch集群的名称,多个节点可以组成一个集群,共同提供搜索和存储功能。
"cluster_uuid" : "EyOTgfNNSHOvUsQNLQ9f7Q", # 集群的唯一标识符(UUID)。每个Elasticsearch集群都有一个唯一的cluster_uuid,用于在集群内识别和管理节点之间的关系。
"version" : {
"number" : "7.17.11", # Elasticsearch 实例的版本
"build_flavor" : "default", # 示这个版本没有特殊的自定义修改或变化。
"build_type" : "tar", # Elasticsearch 是通过 tar 包安装的。
"build_hash" : "eeedb98c60326ea3d46caef960fb4c77958fb885", # Git提交哈希。
"build_date" : "2023-06-23T05:33:12.261262042Z", # 版本的构建日期和时间。
"build_snapshot" : false, # 此版本适用于生产
"lucene_version" : "8.11.1", # Lucene版本
"minimum_wire_compatibility_version" : "6.8.0", # lasticsearch实例能够与运行 6.8.0 及以上版本的其他节点进行通讯。
"minimum_index_compatibility_version" : "6.0.0-beta1" # Elasticsearch兼容的最低索引版本。
},
"tagline" : "You Know, for Search" # Elasticsearch的标语或宣传口号。
}

端口解释:

  • *9200 对外暴露端口,使用的HTTP协议, Elasticsearch 的 REST API 服务的默认端口。开发者和应用程序通过 HTTP 协议与 Elasticsearch 进行交互,发送查询、索引文档、管理集群等操作。 *
  • *9300 集群内部通讯端口。使用TCP协议。 用于集群内的节点间同步和协调。节点之间通过这个端口交换数据,包括集群管理信息、索引和搜索操作等。 *

    2、ES JAVA调优 堆(heap)内存大小

    1
    2
    3
    4
    5
    6
    7
    #>>> 查看运行的JAVA程序
    [root@elk01 ~]# jps
    14917 Jps
    14713 Elasticsearch

    #>>> 查看ES堆内存大小
    [root@elk01 ~]# jmap -heap 14713(pid)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    #>>> 修改堆内存大小(最大设置为32G,要么内存的一半)
    [root@elk01 config]# vim /usr/local/elasticsearch-7.17.11/config/jvm.options
    ···
    -Xms256m
    -Xmx256m
    ···

    #>>> 删除数据、日志
    [root@elk01 config]# rm -rf /usr/local/es/{data,logs}/*
    #>>> 删除缓存数据
    [root@elk01 config]# rm -rf /tmp/*

    #>>> 准备ES启动脚本并启动(加入systemd)
    [root@elk01 config]# cat >> /usr/lib/systemd/system/es.service <<EOF
    [Unit]
    # 描述服务的简短说明,这里描述为 "ELK"。
    Description=ELK
    # 指定服务的启动顺序。表示该服务在网络目标 (network.target) 之后启动,确保网络服务已启动并可用。
    After=network.target

    [Service]
    # 指定服务的启动类型为 forking,表示服务启动后会产生一个子进程,并且主进程会在启动完成后退出。这通常用于后台运行的守护进程。
    Type=forking
    # 指定启动服务的命令
    ExecStart=/usr/local/es/bin/elasticsearch -d
    # 指定服务在退出后不重启。可以根据需要将其更改为 always 或 on-failure,以确保服务在失败后自动重启。
    Restart=no
    # 指定以 es 用户的身份运行服务。
    User=es
    # 指定服务所属的组为 es。
    Group=es
    # 设置进程打开文件的最大数量(文件描述符限制)
    LimitNOFILE=131070

    [Install]
    # 指定服务的目标,表示该服务在多用户模式下可用。
    WantedBy=multi-user.target
    EOF

    # 重新加载systemd配置文件
    [root@elk01 config]# systemctl daemon-reload

    # 启动es
    [root@elk01 config]# systemctl restart es

3、部署ES集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#>>> 创建用于启动ES的用户
[root@elk01 opt]# useradd es
[root@elk01 opt]# id es
uid=1000(elasticsearch) gid=1000(elasticsearch) 组=1000(elasticsearch)

#>>> 创建ES数据目录和日志目录存放目录
[root@elk01 usr]# mkdir -p /opt/{data,logs}
[root@elk01 usr]# install -d /opt/{data,logs}/es -o es -g es

#>>> 解压es安装包到指定目录
[root@elk01 ~]# tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz -C /opt/

#>>> 更改目录名
[root@elk01 ~]# cd /opt/ && mv elasticsearch-7.17.11 es

#>>> 创建ES环境变量
[root@elk01 opt]# vim >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/opt/es
export PATH=\$PATH:\$ES_HOME/bin
EOF

#>>> 重新加载环境变量
[root@elk01 opt]# source /etc/profile.d/es.sh

#>>> 修改elasticsearch属主和数组
[root@elk01 opt]# chown -R es,es /opt/es

#>>> 修改es需要的limits参数(重新连接会话框才能成功加载参数)
[root@elk01 opt]# cat >> /etc/security/limits.d/elk.conf <<-EOF
* soft nofile 65535
* hard nofile 131070
EOF

#>>> 查看limits参数是否加载
[root@elk01 opt]# ulimit -Sn
65535
[root@elk01 opt]# ulimit -Hn
131070

#>>> 修改内核参数
[root@elk01 opt]# cat > /etc/sysctl.d/elk.conf <<EOF
vm.max_map_count = 262144
EOF

#>>> 加载内核参数
[root@elk01 opt]# sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
[root@elk01 opt]# sysctl -q vm.max_map_count
vm.max_map_count = 262144

#>>> 修改堆内存大小(最大设置为32G,要么内存的一半)
[root@elk01 opt]# vim /opt/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···
1
2
3
4
5
6
7
8
9
10
#>>> elk01修改配置文件
[root@elk01 opt]# egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml
cluster.name: study-elk-cluster
node.name: elk01
path.data: /opt/data/es # 指定数据目录
path.logs: /opt/logs/es # 指定日志目录
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
1
2
3
4
5
6
7
8
9
10
#>>> elk02修改配置文件
[root@elk02 opt]# egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml
cluster.name: study-elk-cluster
node.name: elk02
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
1
2
3
4
5
6
7
8
9
10
#>>> elk03修改配置文件
[root@elk03 opt]# egrep -v "^#|^$" /opt/es/config/elasticsearch.yml
cluster.name: study-elk-cluster
node.name: elk03
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#>>> 所有节点添加elk启动脚本
[root@elk03 opt]# cat > /usr/lib/systemd/system/es.service <<EOF
[Unit]
Description=ELK
After=network.target
[Service]
Type=forking
ExecStart=/opt/es/bin/elasticsearch -d
Restart=no
User=es
Group=es
LimitNOFILE=131070
[Install]
WantedBy=multi-user.target
EOF

#>>> 所有节点重新加载并启动
[root@elk01 config]# systemctl daemon-reload
[root@elk01 config]# systemctl restart es

#>>> 测试集群
[root@elk01 ~]# curl 192.168.100.160:9200/_cat/nodes
192.168.100.160 40 59 8 0.14 0.10 0.06 cdfhilmrstw * elk01
192.168.100.161 54 30 11 0.55 0.29 0.11 cdfhilmrstw - elk02
192.168.100.162 48 28 8 0.29 0.16 0.06 cdfhilmrstw - elk03
# 参数解释:
第一列:每个节点的IP地址;
第二列:每个节点的CPU使用率;
第三列:每个节点的内存的使用率;
第四列:每个节点的活跃分片数量;
第五列:每个节点的1分钟、5分钟、15分钟平均负载;
第六列:每个节点在集群中的角色;
第七列:*代表主节点;主节点负责管理集群的元数据、分片分配和集群状态等任务。
第八列:节点的名称

三、Kibana安装(elk03节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#>>> 安装kibana
[root@elk03 ~]# yum localinstall -y kibana-7.17.11-x86_64.rpm

#>>> 备份配置文件
[root@elk03 ~]# cd /etc/kibana/
[root@elk03 kibana]# cp kibana.yml kibana.yml.bak

#>>> 修改kibana配置文件
[root@elk03 kibana]# egrep -v "^(#|$)" kibana.yml
# 服务端口
server.port: 5601
# 主机地址或者主机名
server.host: "0.0.0.0"
# 服务名称
server.name: "study-elk-kibana"
# ES主机组地址
elasticsearch.hosts: ["http://192.168.100.160:9200","http://192.168.100.161:9200","http://192.168.100.162:9200"]
# 修改语言
i18n.locale: "zh-CN"

#>>> 启动Kibana
[root@elk03 kibana]# systemctl enable --now kibana

#>>> 游览器IP+5601访问

四、Logstash安装(elk02)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#>>> 安装Logstash
[root@elk02 ~]# yum localinstall -y logstash-7.17.11-x86_64.rpm

#>>> 创建软连接
[root@elk02 local]# ln -s /usr/share/logstash/bin/logstash /usr/bin/logstash

#>>> 编写测试文件
[root@elk02 ~]# mkdir conf-logstash
[root@elk02 ~]# cat conf-logstash/01-stdin-to-stdout.conf
input {
stdin {}
}
output{
stdout {}
}


#>>> 测试Logstash
[root@elk02 ~]# logstash -f ~/conf-logstash/01-stdin-to-stdout.conf

image.png

五、Filebeat安装(elk01)

帮助文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#>>> 安装Filebeat
[root@elk01 ~]# yum localinstall -y filebeat-7.17.11-x86_64.rpm

#>>> 测试Filebeat
[root@elk01 opt]# cd filebeat
[root@elk01 filebeat]# mkdir filebeat-config
[root@elk01 filebeat]# cp filebeat.yml filebeat-config/01-test.yml
[root@elk01 filebeat]# cat filebeat-config/01-test.yml
filebeat.inputs:
- type: stdin
enabled: true

output.console:
pretty: true # 启动美观格式输出

# 启动Filebeat实例
[root@elk01 filebeat]# filebeat -e -c /opt/filebeat/filebeat-config/01-test.yml

1、Filebeat架构

架构图:
Input(数据源)收集方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuration-filebeat-options.html
Output(输出地)推送方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuring-output.html

六、Filebeat日志收集

Input方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html

1、Filebeat标准输入和输出

​ 标准输入官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html
​ 标准输出官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/console-output.html
简介:使用终端输入从标准输入读取事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建Filebeat配置文件测试目录
[root@elk01 ~]# mkdir filebeat-config

# 创建配置文件
[root@elk01 ~]# vim filebeat-config/01-stdin-on-stdout.yml
# 指定输入类型
filebeat.inputs:
# 终端标准输入
- type: stdin
# 标准输出类型:终端输出

output.console:
#如果设置为 true,则写入 stdout 的事件将采用良好的格式。默认值为 false。
pretty: true

# 测试
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/01-stdin-on-stdout.yml

2、 Filebeat 基于Log类型进行输入

Log方式类型输入:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-log.html#input-paths
简介Log类型输入将日志文件发送到输出。使用输入从日志文件中读取行。此类型[ 7.16.0 ]版本以后废弃,推荐使用filestream input类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#>>> 编写Log类型日志输入测试文件
[root@elk01 ~]# vim filebeat-config/02-log-on-stdout.yml
# 指定输入类型
filebeat.inputs:
# 日志输入类型
- type: log
# 日志存放路径
paths:
- /var/log/messages
- /var/log/*.log


#>>> 设置标准终端输出至屏幕
output.console:
#如果设置为 true,则写入stdout的事件将采用良好的格式。默认值为 false。
pretty: true

#>>> 测试
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/02-log-on-stdout.yml

2.1 Filebeat 基于Log类型进行输入并且自定义字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@elk01 ~]# vim  filebeat-config/02-log-on-stdout.yml 
filebeat.inputs:
- type: log
paths:
- /var/log/messages
# 开启自定义字段
fields:
# 添加自定义字段
log_type: system
fields_under_root: true

- type: log
paths:
- /var/log/nginx/access.log
# 开启自定义字段
fields:
# 开启自定义字段
log_type: nginx_access
fields_under_root: true
output.console:
pretty: true

#>>> 清除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

#>>> 启动Filebeat实例
[root@elk01 filebeat]# filebeat -e -c /root/filebeat-config/02-log-on-stdout.yml

注意:
如果fields_under_root设置为true,则自定义字段将作为顶级字段存储在输出文档中,而不是分组在字段子字典下。如果自定义字段名称与Filebeat添加的其他字段名称冲突,则自定义字段将覆盖其他字段。效果图如下:

2.2 Filebeat 基于Log类型进行输入并且自定义Tags
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[root@elk01 ~]# vim  filebeat-config/02-log-on-stdout.yml 
filebeat.inputs:
- type: log
paths:
- /var/log/messages
# 对当前的输入日志文件,指定独有的标签
tags: ["system logs","",""]
fields:
log_type: system

- type: log
paths:
- /var/log/nginx/access.log
# 对当前的输入日志文件,指定独有的标签
tags: ["nginx access_log"]
fields:
log_type: nginx_access

output.console:
pretty: true

# 清除偏移量
[root@elk01 ~]# > /var/lib/filebeat/registry/filebeat/log.json

# 启动Filebeat实例
[root@elk01 filebeat]# filebeat -e -c /root/filebeat-config/02-log-on-stdout.yml

对于Tags的设定,将Filebeat收集到的日志转发至ES集群时,可以针对不同的Tag设置不同的索引,方便查阅。

2.3 offset偏移量概念

简介:偏移量(offset)在 Filebeat 中指的是文件读取位置的标记,用于追踪文件中的读取进度。
在某些情况下,可能需要手动操作偏移量。例如:

  • 假如想要重新读取一个日志文件,可以删除对应的注册表条目或者编辑偏移量值。注意:请确保 Filebeat 暂停运行期间进行这些操作以防止冲突。

RPM方式安装的Filebeat offset存放路径:

1
2
3
4
5
6
7
8
9
# 切换目录
[root@elk01 ~]# cd /var/lib/filebeat/registry/filebeat/

# 查看文件
[root@elk01 filebeat]# ls
log.json meta.json

# 清除偏移量文件,启动Filebeat重新读取日志文件
[root@elk01 registry]# > log.json

log.json为偏移量存放文件。清空偏移量后,再次启动Filebeat会重新读取日志文件


思考题:为什么Filebeat需要设置这个偏移量?

2.4 Filebeat 基于Log类型进行输入实现日志删除

支持正则表达式:https://www.elastic.co/guide/en/beats/filebeat/7.17/regexp-support.html
简介exclude_lines与要Filebeat排除的行匹配的正则表达式列表。Filebeat将删除列表中与正则表达式匹配的所有行。默认情况下,不会删除任何行。空行将被忽略。如果还指定了多行设置,则在通过exclude_lines过滤行之前,每条多行消息都将合并为一行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 编写测试日志文件
[root@elk01 ~]# cat >> /tmp/test.log << EOF
hi cloud
hello world
Hello cloud
EOF

# 编写Filebeat配置文件
[root@elk01 ~]# vim filebeat-config/03-log-exclude-stdin.yml
filebeat.inputs:
- type: log
paths:
- /tmp/test.log
tags: ["test"]
fields:
log_type: test
fields_under_root: true
# 删除以hello开头的行,注意区分大小写
exclude_lines: ['^hello','^haha','^lala']

output.console:
pretty: true

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/03-log-exclude-stdin.yml


查看offset偏移量

查看文件字符

此参数是在该日志文件阅读完成以后,在进行过滤删除,故在offset文件中还是有所体现。

2.5 Filebeat 基于Log类型进行输入实现匹配日志输出

支持正则表达式:https://www.elastic.co/guide/en/beats/filebeat/7.17/regexp-support.html
简介include_lines希望Filebeat包含的行相匹配的正则表达式列表。Filebeat仅导出与列表中正则表达式匹配的行。默认情况下,将导出所有行。空行将被忽略。如果还指定了多行设置,则在通过include_lines过滤行之前,每条多行消息将合并为一行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 准备测试日志文件
[root@elk01 ~]# cat >>/tmp/test02.log <<EOF
ERR haha
WARN haha
info haha
EOF

# 编写Filebeat测试文件
[root@elk01 ~]# vim filebeat-config/04-log-include-stdin.yml
filebeat.inputs:
- type: log
paths:
- /tmp/test02.log
tags: ["test"]
fields:
log_type: test
fields_under_root: true
# 只过滤出以ERR WARN的行,区分大小写
include_lines: ['^ERR','^WARN']

output.console:
pretty: true

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/04-log-include-stdin.yml

2.6 include_lines和exclude_lines优先级

​ 如果同时定义了include_linesexclude_lines,则Filebeat首先执行include_lines,然后执行exclude_line。定义这两个选项的顺序无关紧要。include_lines选项将始终在exclude_lines选项之前执行,即使exclude_line出现在配置文件中的include_lines之前也是如此。

2.7 Nginx error日志过滤处理实战

简介:在公司中,对于应用的错误日志一般只关心包含error等相关内容的行,其它的我们都需要对其进行过滤处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#>>> 编写测试日志文件
[root@elk01 ~]# vim /var/log/nginx/error.log
2024/05/30 01:55:13 [notice] 967#967: worker process 976 exited with code 0
2024/05/30 01:55:13 [notice] 967#967: signal 29 (SIGIO) received
2024/05/30 01:55:13 [notice] 967#967: signal 17 (SIGCHLD) received from 977
2024/05/30 01:55:13 [notice] 967#967: worker process 977 exited with code 0
2024/05/30 01:55:13 [notice] 967#967: signal 17 (SIGCHLD) received from 975
2024/05/30 01:55:13 [notice] 967#967: worker process 975 exited with code 0
2024/05/30 01:55:13 [notice] 967#967: exit
2024/05/30 01:55:33 [emerg] 2359#2359: unexpected "}" in /etc/nginx/nginx.conf:11
2024/05/30 01:55:16 [emerg] 2384#2384: "access_log" directive is not allowed here in /etc/nginx/nginx.conf:22
2024/05/30 01:55:41 [notice] 2396#2396: using the "epoll" event method
2024/05/30 01:55:41 [notice] 2396#2396: nginx/1.24.0
2024/05/30 01:55:41 [notice] 2396#2396: built by gcc 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC)
2024/05/30 01:55:41 [notice] 2396#2396: OS: Linux 3.10.0-1160.el7.x86_64
2024/05/30 01:55:41 [notice] 2396#2396: getrlimit(RLIMIT_NOFILE): 1024:4096

#>>> 编写Filebeat测试文件
[root@elk01 ~]# vim filebeat-config/05-log-nginx-error-stdin.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/error.log
tags: ["nginx_error"]
fields:
log_type: nginx_error
fields_under_root: true
include_lines: ['\[emerg\]']

output.console:
pretty: true

#>>> 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/05-log-nginx-error-stdin.yml

2.8 Filebeat输出数据至Elasticsearch集群

官方链接:https://www.elastic.co/guide/en/beats/filebeat/7.17/elasticsearch-output.html#elasticsearch-output
简介:Elasticsearch输出使用Elasticsearch HTTP API将事件直接发送到Elasticearch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 编写Filebeat配置文件
[root@elk01 ~]# vim filebeat-config/06-log-nginx-access-es.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["nginx_access"]
fields:
log_type: nginx_access
fields_under_root: true

# 输出至ES集群
output.elasticsearch:
# ES集群地址
hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/06-log-nginx-access-es.yml


Kibana查看ES数据



通过索引模式来匹配索引

![](https://hjmimage.oss-cn-zhangjiakou.aliyuncs.com/202405300059590.png#id=b38Ds&originHeight=1417&originWidth=2488&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none)
2.9 Filebeat输出数据至Elasticsearch集群并自定义单个索引

官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/elasticsearch-output.html#index-option-es
简介:使用每日索引时要将事件写入的索引名称。默认值为“filebeat-%{[agent.version]}-%{+yyyy.MM.dd}”,例如“filebeat-7.17.21-2024-05-22”。在公司中一般会根据日志的类型设置不同的索引。但是如果需要自定以索引,需要把索引的生命周期管理给禁用掉setup.ilm.enabled: false,否则自定以索引无法生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 编写Filebeat测试文件
[root@elk01 ~]# vim filebeat-config/07-log-nginx-access-es.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["nginx_access"]
fields:
log_type: nginx_access
fields_under_root: true

output.elasticsearch:
hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]
index: "web-nginx-access-%{+yyyy.MM.dd}"
# 禁用索引生命周期管理
setup.ilm.enabled: false
# 设置索引模板的名称
setup.template.name: "web"
# 设置索引模板的匹配模式
setup.template.pattern: "web*"

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/07-log-nginx-access-es.yml

参数解释:
setup.ilm.enabled: false # 禁用了 Filebeat 的 索引生命周期管理, ILM 是 Elasticsearch 提供的一种功能,用于自动管理索引的生命周期。通过 ILM,管理员可以定义索引的创建、滚动(如每天创建一个新索引)、迁移(如将数据从热节点迁移到冷节点)、归档和删除等操作。
setup.template.name: “web” # 指定了 Filebeat创建的索引模板的名称。索引模板定义了 Elasticsearch 中新创建的索引的默认设置和映射。每当一个新索引与指定的模板模式匹配时,Elasticsearch 会应用该模板中的设置和映射规则。
setup.template.pattern: web # 定义索引模板的 匹配模式。 web:表示 Filebeat创建的所有索引名称中,只要符合 web的模式,都会使用名为 web的模板。 是通配符,表示任意字符。


kibana操作和查看








2.10 Filebeat输出数据至Elasticsearch集群并自定义多个索引

官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/elasticsearch-output.html
简介:每个规则都指定用于与该规则匹配的事件的索引。在发布过程中,Filebeat使用数组中的第一个匹配规则。规则可以包含条件、基于字符串的字段格式和名称映射。如果缺少索引设置或没有匹配的规则,则使用索引设置。与索引类似,定义自定义索引将禁用索引生命周期管理(ILM)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#>>> 编写Filebeat配置文件
[root@elk01 ~]# vim filebeat-config/08-log-nginx-log-es.yml
- /var/log/nginx/.log
tags: ["nginx_access"]
fields:
log_type: nginx_access
fields_under_root: true

- type: log
paths:
- /var/log/nginx/error.log
tags: ["nginx_error"]
fields:
log_type: nginx_error
fields_under_root: true
include_lines: ['\[emerg\]']

output.elasticsearch:
hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]
indices:
# 索引名称
- index: "web-nginx-access-%{+yyyy.MM.dd}"
# 匹配标签
when.contains:
tags: "nginx_access"
# 索引名称
- index: "web-nginx-error-%{+yyyy.MM.dd}"
# 匹配标签
when.contains:
tags: "nginx_error"
# 禁索引命周期管理
setup.ilm.enabled: false
# 设置索引模板的名称
setup.template.name: "web"
# 设置索引模板的匹配模式
setup.template.pattern: "web*"

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c /root/filebeat-config/08-log-nginx-log-es.yml


kibana操作和查看




注意:日志是以索引的方式写入的!

2.11 Filebeat输出数据至Elasticsearch集群并设定副本分片

官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuration-template.html

简介

  • index.number_of_shards: 3:主分片(primary shard)是将数据分割成较小部分以便分布存储和基础单位。设置主分片数为3意味着每个索引将分成3个主分片。主分片的数量在索引创建后无法更改。
  • index.number_of_replicas: 1:副本分片(replica shard)是主分片的副本,用于提供高可用性和故障恢复能力。如果主分片所在的节点故障,副本分片可以作为备份继续提供数据服务。设置副本分片数为1意味着每个主分片将有1个副本分片。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    # 编写Filebeat测试文件
    [root@elk01 ~]# vim filebeat-config/09-log-nginx-log-shard-es.yml
    filebeat.inputs:
    - type: log
    paths:
    - /var/log/nginx/access.log
    tags: ["nginx_access"]
    fields:
    log_type: nginx_access
    fields_under_root: true

    - type: log
    paths:
    - /var/log/nginx/error.log
    tags: ["nginx_error"]
    fields:
    log_type: nginx_error
    fields_under_root: true
    include_lines: ['\[emerg\]']

    output.elasticsearch:
    hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]
    indices:
    # 索引名称
    - index: "web-nginx-access-%{+yyyy.MM.dd}"
    # 匹配标签
    when.contains:
    tags: "nginx_access"
    # 索引名称
    - index: "web-nginx-error-%{+yyyy.MM.dd}"
    # 匹配标签
    when.contains:
    tags: "nginx_error"
    # 禁索引命周期管理
    setup.ilm.enabled: false
    # 设置索引模板的名称
    setup.template.name: "web"
    # 设置索引模板的匹配模式
    setup.template.pattern: "web*"
    # 配置索引模板
    setup.template.settings:
    # 设置分片数量
    index.number_of_shards: 3
    # 设置副本数量,要求小于集群的数量
    index.number_of_replicas: 1

    # 启动Filebeat实例
    [root@elk01 ~]# filebeat -e -c /root/filebeat-config/09-log-nginx-error-shard-es.yml

    注意:三分片一副本,表示每个主分片都会有一个副本分片,分片数量一旦确定,不能修改(缩容/扩容)。会导致数据无法查找。副本分片可以扩容。

记得把原有的模板给删除掉,否则分片和副本数量无法修改成功

2.12 FIlebeat 收集JSON格式的nginx访问日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 安装nginx
[root@elk01 ~]# vim /etc/yum.repos.d/nginx.repo
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=0
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
[root@elk01 ~]# yum install -y nginx
# 启动nginx
[root@elk01 ~]# systemctl enable --now nginx

# 修改nginx访问日志格式
[root@elk-01 ~]# vim /etc/nginx/nginx.conf
···
log_format test_nginx_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"SendBytes":$body_bytes_sent,'
'"responsetime":$request_time,'

'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"uri":"$uri",'
'"domain":"$host",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"tcp_xff":"$proxy_protocol_addr",'
'"http_user_agent":"$http_user_agent",'
'"status":"$status"}';
access_log /var/log/nginx/access.log test_nginx_json;
···

# 检查语法格式
[root@elk-01 ~]# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful

# 重启nginx
[root@elk-01 ~]# systemctl reload nginx

# 测试
[root@elk01 ~]# curl localhost
[root@elk01 ~]# cat /var/log/nginx/access.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 修改Filebeat配置(识别json格式日志,生成相关字段)
[root@elk01 ~]# vim filebeat-config/10-log-nginx-log-shard-json-es.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log*
tags: ["access"]
# 启动json格式
json.keys_under_root: true

output.elasticsearch:
hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]
index: "web-nginx-access-%{+yyyy.MM.dd}"
# 禁索引命周期管理
setup.ilm.enabled: false
# 设置索引模板的名称
setup.template.name: "web"
# 设置索引模板的匹配模式
setup.template.pattern: "web*"
# 覆盖已有的索引模板,如果为true,则会直接覆盖现有的索引模板,如果为false则不覆盖!
setup.template.overwrite: true
# 配置索引模板
setup.template.settings:
# 设置分片数量
index.number_of_shards: 3
# 设置副本数量,要求⼩于集群的数量
index.number_of_replicas: 1


# 清除filebeat的指针偏移,用于从第一行重新读取日志文件
[root@elk-01 log]# rm -rf /var/lib/filebeat/*

# 清楚以前的ES nginx索引,防止出现脏数据


# 重启filebeat
[root@elk01 ~]# filebeat -e -c ~/filebeat-config/10-log-nginx-log-shard-json-es.yml

七、Logstash 日志收集

官网:https://www.elastic.co/guide/en/logstash/7.17/
简介:Logstash是一个具有实时流水线功能的开源数据收集引擎。Logstash可以动态地统一来自不同来源的数据,并将数据规范化为您选择的目的地。为各种高级下游分析和可视化用例清理和民主化您的所有数据。
虽然Logstash最初推动了日志收集的创新,但其功能远远超出了该用例。任何类型的事件都可以通过广泛的输入、过滤和输出插件进行丰富和转换。

1、Logstash安装

1
2
3
4
5
6
7
8
# 安装
[root@elk03 ~]# yum install -y logstash-7.17.11-x86_64.rpm

# 声明软连接
[root@elk03 ~]# ln -s /usr/share/logstash/bin/logstash /sbin/

# 创建Logstash 测试文件目录
[root@elk03 ~]# mkdir logstash-config

2、Logstash 标准输入输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 编写Logstash配置文件
[root@elk03 ~]# cd logstash-config/
[root@elk03 logstash-config]# vim 01_test.conf
input {
stdin {}
}


output {
stdout {}
}

# 检查配置文件语法
[root@elk03 logstash-config]# logstash -tf ~/logstash-config/01_test.conf

1
2
# 启动Logstash
[root@elk03 logstash-config]# logstash -f ~/logstash-config/01_test.conf

3、Logstash基于File的形式进行input

官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-file.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 编写配置文件
[root@elk03 logstash-config]# vim 02-file-input.conf
input {
file {
# 指定日志文件收集的路径
path => ["/tmp/*.txt"]
# 从日志文件的第一行进行读取,只会在第一次启动时从头读取
start_position => "beginning"
# 从日志文件的尾行读取日志,且偏移指针文件对原日志文件偏移量未记录
# start_position => "end"
}
}

output {
stdout {}
}

# 准备测试日志文件
[root@elk01 stduy-logstash-config]# ech0 1111 > /tmp/1.txt

# 启动Logstash实例
[root@elk01 stduy-logstash-config]# logstash -f /root/stduy-logstash-config/02_input_file.conf

1
2
3
4
5
6
7
# 查看偏移量
[root@elk03 logstash-config]# cat /usr/share/logstash/data/plugins/inputs/file/.sincedb_820ddbbd098cfece4b56f4fcbf67a9bb
16789540 0 64768 4 1717082577.147625 /tmp/1.txt

# 查看日志文件
[root@elk03 logstash-config]# ll -i /tmp/1.txt
16789540 -rw-r--r-- 1 root root 4 5月 30 23:20 /tmp/1.txt

4、Logstash基于Filebeat形式进行input

Filebeat output Logstash官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/logstash-output.html
Logstash input Fliebeat官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-beats.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 修改Filebeat配置文件
[root@elk01 filebeat-config]# vim /root/filebeat-config/output-logstash.yml

filebeat.inputs:
- type: log
paths:
- /tmp/test.log
tags: ["test"]
fields:
log_type: test
fields_under_root: true

output.logstash:
# Logstash主机地址
hosts: ["192.168.174.142:5044"]


#启动Filebeat实例
[root@elk01 test]# filebeat -e -c /root/filebeat-conifg/output-logstash.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 修改Logstash配置文件
[root@elk01 stduy-logstash-config]# vim input-beats.conf
input {
beats {
# 通讯端口号,默认5044
port => 5044
}
}

output {
stdout {}
}

# 启动Logstash
[root@elk01 logstash-config]# logstash -rf /root/logstash-config/input-beats.conf

5、Logstash基于ES形式进行output

官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-outputs-elasticsearch.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 修改Logstash配置文件
[root@elk03 logstash-config]# cat 08_output_es.conf
input {
beats {
# 通讯端口号,默认5044
port => 5044
}
}

output {
elasticsearch {
# ES集群IP
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# 索引模式名称
index => "test-log-%{+yyyy.MM.dd}"
}
}

# 启动Logstash
[root@elk03 logstash-config]# logstash -f /root/stduy-logstash-config/08_output_es.conf

6、Logstash grok插件使用

官网: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
简介:解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化和可查询内容的好方法。该工具非常适合系统日志,apache和其他Web服务器日志,mysql 日志,以及通常为人类编写的任何日志格式。

6.1 Logstash 内置正则使用

简介:将非结构化的日志格式通过Grok内置的正则转化为结构化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 日志格式
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"

# 编写Filebeat配置文件
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["access"]
output.logstash:
hosts: ["192.168.174.142:5044"]

# 清除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*
# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml


# 编写Logstash配置文件
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
}
}

output {
stdout {}
elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "test-%{+yyyy.MM.dd}"
}
}

# 启动Logstash
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

6.2 Logstash基于内置正则实现日志结构化

官网: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
Logstash内置变量:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 测试日志准备
55.3.244.1 GET /index.html 15824 0.043
# 参数说明
55.3.244.1 # 客户端IP
GET # 请求方式
/index.html # 请求路径
15824 # 发送字节大小
0.043 # 相应时常

# Logstash 配置文件编写
[root@elk03 ~]# vim stdin_grok_stdout.conf
input {
stdin {}
}

filter {
grok {
match => { "message" => "%{IP:client_ip} %{WORD:Request_method} %{URIPATHPARAM:request_uri} %{NUMBER:bytes} %{NUMBER:response_time}" }

}
}

output {
stdout {}
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/stdin_grok_stdout.conf

提示:
%{IP:client} 变量解释: IP为内置变量,client为字段名称或者叫标识符。可以自行修改,例如:%{IP:req-client}

6.4 Logstash删除指定字段

简介:如果此筛选器成功,请从此事件中删除任意字段。

未删除字段终端打印如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# Filebeat配置文件编写
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["access"]
output.logstash:
hosts: ["192.168.174.142:5044"]

# 删除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml

# Logstash配置文件编写
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
# You can also remove multiple fields at once:
remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]

}
}

output {
stdout {}

elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "test-%{+yyyy.MM.dd}"
}
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

删除指定字段后终端数据显示如下:

6.5 Logstash 添加指定字段

简介:如果此筛选成功,则向此事件添加任意字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Filebeat配置文件编写
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["access"]
output.logstash:
hosts: ["192.168.174.142:5044"]

# 删除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml

# Logstash配置文件编写
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
# You can also remove multiple fields at once:
remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]
# You can also add multiple fields at once:
add_field => {
"host-name" => "%{host}"
"service" => "ELK stack"
}
}
}

output {
stdout {}

elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "test-%{+yyyy.MM.dd}"
}
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

6.6 Logstash 自定义Tags

简介:如果此筛选成功,请向事件添加任意标记。 标记可以是动态的,并使用语法包含事件的某些部分。%{field}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# Filebeat配置文件编写
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["access"]
output.logstash:
hosts: ["192.168.174.142:5044"]

# 删除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml

# Logstash配置文件编写
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
# You can also remove multiple fields at once:
remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]
# You can also add multiple fields at once:
add_field => {
"host-name" => "%{host}"
"service" => "ELK stack"
}
# You can also add multiple tags at once:
add_tag => [ "Filebeat", "Logstash","Nginx","Tomcat","Kibana"]
}
}

output {
stdout {}

elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "test-%{+yyyy.MM.dd}"
}
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

7、Logstash geoip插件分析用户IP所属地

官方:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-geoip.html
简介:GeoIP 过滤器添加有关 IP 地址地理位置的信息, 基于 MaxMind GeoLite2 数据库的数据。 MaxMind GeoLite2 数据库:开源IP地址定位数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Filebeat配置文件编写
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["access"]
output.logstash:
hosts: ["192.168.174.142:5044"]

# 删除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml

# Logstash配置文件编写
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
# You can also remove multiple fields at once:
remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]
}
# 指定geoip插件
geoip {
# 配置客户端IP字段名称
source => "clientip"
}
}

output {
stdout {}

elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "test-%{+yyyy.MM.dd}"
}
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

8、Logstash useragent插件分析用户客户端类型

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-useragent.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Filebeat配置文件编写
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
tags: ["access"]
json.keys_under_root: true

output.logstash:
hosts: ["192.168.174.142:5044"]

# 删除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml

# Logstash配置文件编写
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
grok {
remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]
}
# 指定geoip插件
geoip {
# 配置客户端IP字段名称
source => "clientip"
}
useragent {
source => "http_user_agent"
target => "test_user_agent"
remove_field => [ "agent","@version","tags","ecs","log","offset","type" ]
}
}

output {
stdout {}

elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "test-%{+yyyy.MM.dd}"
}
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

9、Logstash Filter多if分支

官网:https://www.elastic.co/guide/en/logstash/7.17/event-dependent-configuration.html#metadata
简介: 针对不同的日志类型。filter去做不同的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Filebeat配置文件
[root@elk01 ~]# vim filebeat-out-logstash.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
fields:
log_type: nginx_access
fields_under_root: true
json.keys_under_root: true

- type: log
paths:
- /var/log/nginx/error.log
fields:
log_type: nginx_error
fields_under_root: true

- type: log
paths:
- /var/log/messages
fields:
log_type: system_log
fields_under_root: true



output.logstash:
hosts: ["192.168.174.142:5044"]

# 删除偏移量
[root@elk01 ~]# rm -rf /var/lib/filebeat/*

# 启动Filebeat实例
[root@elk01 ~]# filebeat -e -c ~/filebeat-out-logstash.yml


# Logstash 配置文件准备
[root@elk03 ~]# vim filter_grok.yml
input {
beats {
port => 5044
}
}

filter {
mutate {
add_field => {
"name" => "ELK stack"
}
remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"]
}
if [log_type] == "nginx_access" {
geoip {
source => "clientip"
}
useragent {
source => "http_user_agent"
target => "study_user_agent"
}
}
}

output {
stdout {}

if [log_type] == "nginx_access" {
elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "web-access-%{+yyyy.MM.dd}"
}
} else if [log_type] == "nginx_error" {
elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "web-error-%{+yyyy.MM.dd}"
}
} else if [log_type] == "system_log" {
elasticsearch {
hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
index => "system-log-%{+yyyy.MM.dd}"
}
}
}

# 启动Logstash
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

八、Kibana操作使用

1、kibana手动创建索引模板




1
2
3
4
{
"number_of_replicas":1,
"number_of_shards":3
}

九、kafka简介和使用

1、MQ两种模式

1.1 P2P模式

1.1.1 架构图

1.1.2 简介

​ 在消息队列(MQ)系统中,==点对点模式==(Point-to-Point Model)是最基本的消息通信模式之一。在这种模式下,消息生产者发送消息到一个队列(Queue),而消息消费者从这个队列中接收和处理消息。每条消息只能被一个消费者消费一次,这与发布/订阅模式(Pub/Sub Model)中的多播机制不同。

1.1.3 工作流程
  1. 生产者发送消息:生产者创建一条消息,并将其发送到指定的队列(Queue)。
  2. 队列暂存消息:队列接收到消息并存储,等待消费者来接收。
  3. 消费者接收消息:消费者从队列检索消息进行处理。如果有多个消费者,每条消息只会被其中一个消费者接收。
  4. 消息确认:消费者处理完消息后,向队列发送确认,表示消息已经成功处理。MQ删除消费数据如果消费者没有确认消息,消息可能会被重新投递到队列中供其他消费者处理。

1.2 发布/订阅模式

1.2.1 架构图

1.2.2 简介

​ 在消息队列(MQ)系统中,发布-订阅模式(Publish-Subscribe Model,简称Pub/Sub)是一种常见的消息通信模式。在这种模式下,消息生产者(发布者)将消息发布到一个主题(Topic),而消息消费者(订阅者)则订阅该主题,从而接收发布者发布的消息。与点对点模式不同,发布到主题的每条消息可以被多个订阅者接收和处理

1.2.3 工作流程
  1. 发布者发布消息:发布者创建一条消息,并将其发布到指定的主题。
  2. 主题广播消息:主题接收到消息,并将其广播给所有订阅了该主题的订阅者。
  3. 订阅者接收消息:订阅者接收或者拉取主题广播的消息并进行处理。
  • Pull方式
    • 优点:消费者根据自身消费数据的能力去队列中拉取数据。
    • 缺点:消费者需要长期打开一个进程来监视队列是否有数据产生。
  • Push方式
    • 优点:消息队列主动推送数据,例如:公众号话题推送,APP更新推送
    • 缺点:消息队列自身需要维护一张订阅者名单。当订阅者过多时,特别消耗资源。

2、kafka相关概念

官网https://kafka.apache.org/

2.1 介绍

​ ==Apache Kafka==是一个开源的分布式事件流平台,由数以千计的公司提供高性能数据管道、流分析、 数据集成和任务关键型应用程序。

  • 高吞吐量:使用延迟低至2ms的机器集群以网络有限的吞吐量传递消息。

  • 可伸缩:将生产集群扩展到1000个代理、每天数万亿条消息、PB的数据和数十万个分区。弹性扩展和收缩存储和处理。

  • 永久存储器:将数据流安全地存储在分布式、持久、容错的集群中。

  • 高可用性:在可用性区域上高效地扩展集群,或者跨地理区域连接单独的集群。

    2.2 Kafka相关专业术语

  • 主题(Topic):主题是Kafka中用于对消息进行分类的逻辑分组,每个主题可以看作是消息的分类器。主题是多订阅者模式,即一个主题可以有多个消费者订阅。

  • 分区(Partition):每个主题被分成一个或多个分区。分区是消息存储的基本单元。分区内的消息是有序的,但不同分区之间无序。每个分区可以分布在不同的Kafka服务器上,从而实现水平扩展。

    • leader partition 负责对kafka集群的读写操作,和客户端进行交互
    • follower partition 负责去leader partition 同步数据,不可以和客户端进行交互
  • 偏移量(Offset):偏移量是分区中每条消息的唯一标识符。它是一个递增的整数,记录了消息在分区中的位置。消费者使用偏移量来跟踪读取消息的位置。

  • 生产者(Producer):生产者是负责向Kafka主题发布消息的客户端应用程序。生产者将消息发送到指定的主题和分区。

  • 消费者(Consumer):消费者是负责从Kafka主题读取消息的客户端应用程序。消费者通过订阅一个或多个主题来读取消息,并使用偏移量来跟踪读取进度。

  • 消费者组(Consumer Group):消费者组是一组消费者实例,共同消费一个或多个主题的消息。

  • 代理(Broker):代理是Kafka集群中的一个服务器节点,负责存储和传输消息数据。一个Kafka集群由多个代理组成,每个代理可以处理多个分区。

  • 复制(Replication):Kafka中的复制机制将分区数据复制到多个代理上,以确保数据的高可用性和容错性。每个分区有一个领导副本(Leader)和若干个跟随副本(Follower)。所有的读写操作都由领导副本处理,跟随副本只需同步领导副本的数据。但是Leader和Follwer都属于副本。创建时不能副本数不能为0。

3、Zookeeper使用和配置

官网链接:https://dlcdn.apache.org/zookeeper/

下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz

3.1 简介:

​ Zookeeper 是 Apache 旗下的一个开源分布式协调服务,用于管理和协调分布式应用程序中的各种服务和组件。它提供了一系列高效且可靠的分布式数据一致性和协调机制。

3.2 zookeeper集群部署:三节点😀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#>>> 解压安装包到指定目录
$ tar xf apache-zookeeper-3.8.2-bin.tar.gz -C /opt/ && cd /opt/

#>>> 更改目录名称
$ mv apache-zookeeper-3.8.2-bin/ zookeeper

#>>> 配置zk环境变量
$ cat >> /etc/profile.d/zookeeper.sh <<-EOF
#!/bin/bash
export ZK_HOME=/opt/zookeeper
export PATH=\$PATH:\$ZK_HOME/bin
EOF

#>>> 重新加载环境变量
$ source /etc/profile.d/zookeeper.sh

# 创建zk数据存放目录
$ mkdir -p /opt/data/zk

#>>> 修改zk的配置文件(三节点)
$ cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg

$ egrep -v "^(#|$)" /opt/zookeeper/conf/zoo.cfg
# Zookeeper的心跳间隔时间。服务器和客户端会通过心跳包维持连接状态。
tickTime=2000
# 从节点最多可以等待10 个tickTime(即 10 * 2000 毫秒 = 20 秒)来与领导者同步数据状态。否则它将被认为是不可用的。
initLimit=10
# 定义Zookeeper领导者和从节点之间的心跳和同步请求的最大允许延迟时间。领导者与从节点之间的响应时间如果超过了这个限制,从节点将被认为失去同步状态。(即 5 * 2000 毫秒 = 10 秒)。
syncLimit=5
# 定义zk数据目录
dataDir=/opt/data/zk
# 定义Zookeeper集群的客户端连接端口
clientPort=2181
server.140=192.168.174.140:2888:3888
server.141=192.168.174.141:2888:3888
server.142=192.168.174.142:2888:3888
# 参数解释:
# dataDir zk数据目录
# clientPort 端口号
# server.140 zk节点唯一标识
# 192.168.100.160 zk节点主机地址
# 2888 集群内部通讯端口
# 3888 leader选举端口

# 创建ID文件(位置存放在zk的数据存放路径下)
101节点
$ echo "140" > /opt/data/zk/myid
102节点
$ echo "141" > /opt/data/zk/myid
103节点
$ echo "142" > /opt/data/zk/myid

#>>> 所有节点启动zk
$ zkServer.sh start

#>>> 查看zk状态
$ zkServer.sh status

myid文件:在Zookeeper集群中,myid文件是每个Zookeeper服务器节点的重要配置文件之一,用于唯一标识集群中的每个服务器。

3.3 Zookeeper角色划分

在Zookeeper集群中,不同的服务器节点可以承担不同的角色,以确保集群的高可用性、数据一致性和故障恢复能力。主要角色包括:==server.140=192.168.174.140:2888:3888:角色==

  1. 领导者(Leader)

    • 处理所有写操作(如创建、更新、删除节点)并确保数据的一致性。

    • 管理并协调集群中其他节点的活动。

    • 负责为客户端请求生成唯一的事务ID(zxid)。

    • 在集群启动或领导者失效时,会进行领导者选举,选出新的领导者。

    • 定期发送心跳给跟随者,确保自己处于活动状态。

  2. 跟随者(Follower)

    • 处理所有读取操作(如读取节点数据)。
    • 接收并转发客户端的写请求给领导者进行处理。
    • 将领导者的事务日志同步到本地,确保数据一致性。
    • 参与领导者选举。
    • 与领导者保持同步,接收并应用领导者的事务。
  3. 观察者(Observer)

    • 处理读取操作,减轻领导者和跟随者的负担。
    • 不参与领导者选举和事务投票,只同步领导者的事务日志。
    • 提高集群的读取扩展能力,适合需要高读取吞吐量的场景。
  4. 客户端(Client)

    • 连接到集群中的任一节点进行读取或写入操作。
    • 自动处理节点故障并重新连接到其他可用节点。
    • 通过客户端API与Zookeeper集群进行交互。

      3.4 zookeeper配置内存堆栈

      1
      2
      3
      4
      5
      6
      7
      8
      #>>> 查看zk进程
      $ jps
      1367 Elasticsearch
      5400 QuorumPeerMain
      5593 Jps

      #>>> 查看zk的堆栈大小
      $ jmap -heap 5400

zookeeper默认堆内存大小为1GB,一般设置为2GB或者4GB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#>>> 调节zookeeper推内存大小为256MB
$ vim /opt/zookeeper/conf/java.env
#! /bin/bash
# 指定JDK安装路径
export JAVA_HOME=/usr/local/java
# 指定zookeeper的堆内存大小
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"

#>>> 同步文件
$ scp /opt/zookeeper/conf/java.env elk02:/opt/zookeeper/conf/
$ scp /opt/zookeeper/conf/java.env elk03:/opt/zookeeper/conf/

#>>> 所有节点重启zk
$ zkServer.sh restart

#>>> 验证堆内存
$ jmap -heap `jps | grep QuorumPeerMain | awk '{print $1}'`

4、kafka集群安装:三节点😀

​ 下载链接:https://kafka.apache.org/downloads
​ 官网:https://kafka.apache.org/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#>>> 解压安装包到指定目录
$ tar xf kafka_2.13-3.2.1.tgz -C /opt/ && cd /opt/

#>>> 修改目录名称
$ mv kafka_2.13-3.2.1/ kafka

#>>> 配置kafka环境变量
$ cat >> /etc/profile.d/kafka.sh <<-EOF
#!/bin/bash
export KAFKA_HOME=/opt/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF

#>>> 重新加载环境变量
$ source /etc/profile.d/kafka.sh

#>>> 创建数据目录
$ mkdir /opt/data/kafka -p

#>>> 修改kafka配置文件

# 1.kafka101节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=140
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka

# 2.kafka102节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=141
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka # /kafka为zk中的znode

# 3.kafka103节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=142
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka

#>>> 所有节点启动kafka
$ kafka-server-start.sh -daemon /opt/kafka/config/server.properties

#>>> zk节点查看kafka注册信息
$ zkCli.sh ls /kafka/brokers/ids | grep "^\["
[140, 141, 142]

#>>> kafka停止脚本
$ kafka-server-stop.sh

5、Kafka Topic日常操作

5.1 查看Topic相关信息

1
2
3
4
5
6
7
8
#>>> 查看集群中所有Topic
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 --list

#>>> 查看所有Topic详细信息
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 --describe

#>>> 查看某个Topic信息
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 --describe --topic Topice名称

5.2 创建Topic

1
2
3
4
5
6
7
8
9
10
#>>> 创建test-elk Topic
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092 --create --topic test-elk
Created topic test-elk.

#>>> 创建Topic,并指定副本数量
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092 --create --topic test-elk-02 --partitions 10 --replication-factor 1
Created topic test-elk.

#>>> 查看Topic的详细信息
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092 --describe --topic test-elk-02

1
2
3
4
#>>> 创建Topic,并指定副本数量
$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092 --create --topic test-elk-03 --partitions 10 --replication-factor 2

$ kafka-topics.sh --bootstrap-server 192.168.174.140:9092 --describe --topic test-elk-03

注意:副本数量不能大于Broker的数量

6、Filebeat收集日志至Kafka

官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/kafka-output.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[root@elk01 ~]# cat filbeat-out-kafka.yml 
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
fields:
log_type: nginx_access
fields_under_root: true
json.keys_under_root: true

#- type: log
# paths:
# - /var/log/nginx/error.log
# fields:
# log_type: nginx_error
# fields_under_root: true



output.kafka:
# initial brokers for reading cluster metadata
hosts: ["192.168.174.140:9092", "192.168.174.141:9092", "192.168.174.142:9092"]

# message topic selection + partitioning
topic: "test-kafka-topic"

[root@elk01 ~]# filebeat -e -c ~/filbeat-out-kafka.yml

# kafka测试拉取数据
[root@elk03 opt]# kafka-console-consumer.sh --topic test-kafka-topic --bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 --from-beginning

7、Logstash收集Kafka Topic日志

官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
input {
kafka {
bootstrap_servers => "192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092" # Kafka集群IP
topics => ["test-kafka-topic"] # 指定Topic进行消费数据
group_id => "test-kafka" # 指定消费者组
codec => json { # 指定消费的数据是JSON格式。
charset => "UTF-8"
}
}
}

filter {
mutate {
add_field => {
"name" => "ELK stack"
}
remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"]
}
if [log_type] == "nginx_access" {
geoip {
source => "clientip"
}
useragent {
source => "http_user_agent"
target => "study_user_agent"
}
}
}

output {
stdout {}

#将下面解开注释即可将数据存入es集群中,分类添加索引到kibana界面上然后呈现数据
# if [log_type] == "nginx_access" {
# elasticsearch {
# hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# index => "web-access-%{+yyyy.MM.dd}"
# }
# } else if [log_type] == "nginx_error" {
# elasticsearch {
# hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# index => "web-error-%{+yyyy.MM.dd}"
# }
# } else if [log_type] == "system_log" {
# elasticsearch {
# hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# index => "system-log-%{+yyyy.MM.dd}"
# }
# }
}

# 启动Logstash实例
[root@elk03 ~]# logstash -rf ~/filter_grok.yml

十、Elasticsearch Restful风格API实战

1、ES集群状态

  1. 绿色(Green)
  • 含义:集群健康状态正常,所有的主分片和副本分片都已分配。
  • 解释:绿色状态表示集群中的所有数据都可以访问,所有分片(包括主分片和副本分片)都分配到集群中的节点上。
  • 示例:假设集群有3个主分片,每个主分片有1个副本分片,那么绿色状态下这6个分片都已成功分配且正常运行。
  1. 黄色(Yellow)
  • 含义:集群健康状态部分正常,所有的主分片都已分配,但有一个或多个副本分片未分配。
  • 解释:黄色状态表示集群中的所有主分片都可以访问,但一些副本分片由于某种原因(例如节点故障或资源不足)未能分配。这意味着数据是安全的,但没有高可用性,因为如果某个节点失败,它可能会导致数据无法访问。
  • 示例:假设集群有3个主分片和每个主分片1个副本分片,如果有3个主分片和2个副本分片已分配,但1个副本分片未能分配,则集群为黄色状态。
  1. 红色(Red)
  • 含义:集群健康状态不正常,有一个或多个主分片未分配。
  • 解释:红色状态表示集群中有一些数据不可访问,因为主分片未能分配。此时,可能存在数据丢失的风险,需要立即采取措施来修复问题。
  • 示例:假设集群有3个主分片和每个主分片1个副本分片,如果有2个主分片和所有副本分片未能分配,则集群为红色状态。

2、Elasticsearch术语

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Document
文档,用户存储在ES的数据,ES最小单元,文档不可被拆分。文档使用JSON的对象存储类型。

filed
相当于数据表的字段,对文档数据根据不同属性进行分类标识。

index
索引,一个索引就是拥有相似特征文档的集合。

shard
分片,存储数据的地方,每个底层对应的使一个Lucene库,一个索引至少有一个或多个分片。

replica
副本,一个分片可以有0个或者多个副本。作用是对数据进行备份,一旦副本数量不为0,就会引入主分片(primary shard)和副本分片(replica shard)的概念。
主分片(primary shard)
实现数据的读写操作。
副本分片(replica shard)
可以实现数据的读操作,需要主分片同步数据,当主分片挂掉时,副本分片会变为主分片。

Allocation
分配。将分片分配给某个节点的过程,包括主分片和副本分片。如果副本分片,还包含从主分片复制数据的过程,此过程由Master节点调度完成。

2.1、Elasticsearch分片分配的基本策略

​ ES使用数据分片(shard)来提高服务的可用性,将数据分散保存在不同的节点上以降低当单个节点发生故障时对数据完整性的影响,同时使用副本(repiica)来保证数据的完整性。关于分片的默认分配策略,在7.x之前,默认5个primary shard,每个primary shard默认分配一个replica,即5主1副,而7.x之后,默认1主1副ES在分配单个索引的分片时会将每个分片尽可能分配到更多的节点上。但是,实际情况取决于集群拥有的分片和索引的数量以及它们的大小,不一定总是能均匀地分布。

​ Paimary只能在索引创建时配置数量,而replica可以在任何时间分配,并且primary支持读和写操作,而replica只支持客户端的读取操作,数据由es自动管理,从primary同步。ES不允许Primary和它的Replica放在同一个节点中,并且同一个节点不接受完全相同的两个Replica,同一个节点允许多个索引的分片同时存在。