filebeat+kafka+logstash对日志进行统一归集管理


0, 环境架构

1
2
192.168.162.111                      logstash kafka
192.168.162.112 filebeat

1, 配置yum源(所有服务器)

1
2
3
4
5
6
7
8
9
10
11
12
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

cat > /etc/yum.repos.d/elasticsearch.repo << EOF
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

2, 安装jdk(所有服务器)

1
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y

3, 安装并配置filebeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
yum install filebeat -y

cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/*.log

filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false

setup.template.settings:
index.number_of_shards: 1

processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

tags: ["test1"]
output.kafka:
hosts: ["192.168.162.111:9092"]
topic: "get_logs"
required_acks: 1
compression: gzip
max_message_bytes: 1000000

4, 安装并配置logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
yum install logstash -y

mkdir /data/logs/
chown -R logstash:logstash /data/logs

cat /etc/logstash/conf.d/nginx.conf
input {
kafka {
bootstrap_servers => "192.168.162.111:9092"
group_id => "test"
client_id => "test"
auto_offset_reset => "latest"
topics => ["get_logs"]
codec => json { charset => "UTF-8" }
}
}

filter {
ruby {
code => 'event.set("filename",event.get("[log][file][path]").split("/")[-1])'
}
}

output {
# 调试
stdout { codec => rubydebug }
if "nginx" in [log][file][path] and "access" in [log][file][path] {
file {
path => "/data/logs/live-test-nginx/live-test_%{+YYYYMMdd}_%{filename}"
flush_interval => 3
codec => line { format => "%{[tags][0]} %{message}"}
}
# 输出到elasticsearch
elasticsearch {
hosts => ["localhost:9200"]
# 定义索引名称
index => "%{[tags][0]}-api-%{+YYYY.MM.dd}"
}
}

if "nginx" in [log][file][path] and "error" in [log][file][path] {
file {
path => "/data/logs/live-test-nginx/live-test_%{+YYYYMMdd}_%{filename}"
flush_interval => 3
codec => line { format => "%{[tags][0]} %{message}"}
}
}
}

5, 配置kafka和zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
wget -c -P /tmp "https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz"

cd /opt/ ; tar xvf /tmp/kafka_2.13-3.6.2.tgz
mv kafka_2.13-3.6.2 kafka
vim kafka/config/server.properties(只配置下面两行,换成本机内网ip)
listeners=PLAINTEXT://192.168.162.111:9092
advertised.listeners=PLAINTEXT://192.168.162.111:9092

配置kafka systemd管理文件
cat /usr/lib/systemd/system/kafka.service
[Unit]
Description=Kafka server daemon

[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
KillMode=process
Restart=on-failure
RestartSec=42s

[Install]
WantedBy=multi-user.target

6. 配置zookeeper systemd管理文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cat /usr/lib/systemd/system/zkp.service
[Unit]
Description=Zookeeper server daemon

[Service]
Type=simple
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
KillMode=process
Restart=on-failure
RestartSec=42s

[Install]
WantedBy=multi-user.target

systemctl daemon-reload
systemctl restart zkp kafka logstash
systemctl enable zkp kafka logstash

Snipaste_2020-09-23_18-19-08.png