欢迎观看我们关于如何在 Rocky Linux 8 上安装 ELK Stack 的演示。
ELK 是三个开源项目的首字母缩写:Elasticsearch、Logstash 和 Kibana。 Elasticsearch 是一个搜索和分析引擎。 Logstash 是一个服务器端数据处理管道,它同时从多个来源获取数据,对其进行转换,然后将其发送到像 Elasticsearch 这样的“stash”。 Kibana 允许用户使用图表和图形在 Elasticsearch 中可视化数据。
在 Rocky Linux 8 上安装 ELK Stack
Elastic Stack 组件的安装顺序非常重要。 它通常需要订单 Elasticsearch > Kibana > Logstash > Beats. 另请注意,所有组件都应具有相同的版本。
要在 Rocky Linux 8 系统上安装 Elastic Stack 组件,您可以选择创建 Elastic RPM 存储库或使用各自的 RPM 二进制文件安装每个组件。
我们在本指南中使用 Elastic 存储库。
在 Rocky Linux 8 上创建 Elastic Stack RPM 存储库
可以通过运行以下命令来创建 Elastic Stack 版本 7.x 存储库。
cat > /etc/yum.repos.d/elasticstack.repo << EOL [elasticsearch] name=Elasticsearch repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md EOL
运行系统包更新。
dnf update
在 Rocky Linux 8 上安装 Elasticsearch
您可以从创建的 Elastic RPM 存储库在 Rocky Linux 8 上安装 Elasticsearch
dnf install elasticsearch
配置 Elasticsearch
开箱即用,Elasticsearch 可以很好地与默认配置选项配合使用。 但是,在此演示中,我们将根据重要的 Elasticsearch 配置进行一些更改。
如果您需要从 Kibana 或 Logstash 或从 Beats 启用远程访问,请将 Elasticsearch 绑定地址设置为特定 IP。 将 IP 192.168.56.154 替换为您相应的服务器 IP 地址.
sed -i 's/#network.host: 192.168.0.1/network.host: 192.168.60.19/' /etc/elasticsearch/elasticsearch.yml
您也可以保留默认设置,只允许本地访问 Elasticsearch。
当配置为侦听非环回接口时,Elasticsearch 期望在启动时加入集群。 但是由于我们设置的是单节点 Elastic Stack,您需要在 ES 配置中定义这是单节点设置,通过输入以下行, discovery.type: single-node
, 在发现配置选项下。 但是,如果您的 ES 正在侦听环回接口,则可以跳过此步骤。
vim /etc/elasticsearch/elasticsearch.yml
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
# Single Node Discovery
discovery.type: single-node
接下来,将 JVM 堆大小配置为不超过内存大小的一半。 在这种情况下,我们的测试服务器有 2G RAM,最大和最小大小的堆大小都设置为 512M。
vim /etc/elasticsearch/jvm.options
...
################################################################
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms512m
-Xmx512m
...
启动并启用 ES 以在系统启动时运行。
systemctl daemon-reload
systemctl enable --now elasticsearch
验证 Elasticsearch 是否按预期运行。
curl -XGET 192.168.60.19:9200
{ "name" : "localhost.localdomain", "cluster_name" : "elasticsearch", "cluster_uuid" : "Ga4BA59FTcqPtAPzaUaezw", "version" : { "number" : "7.13.2", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "4d960a0733be83dd2543ca018aa4ddc42e956800", "build_date" : "2021-06-10T21:01:55.251515791Z", "build_snapshot" : false, "lucene_version" : "8.8.2", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
在 Rocky Linux 8 上安装 Kibana
下一个要安装的 Elastic Stack 组件是 Kabana。 由于我们已经创建了 Elastic Stack 存储库,您只需运行以下命令即可安装它。
dnf install kibana
配置 Kibana
首先,您需要配置 Kibana 以允许远程访问。 默认情况下,它通常只允许在端口 5601/tcp 上进行本地访问。 因此,打开 Kibana 配置文件进行编辑和取消注释并更改以下行;
vim /etc/kibana/kibana.yml
...
#server.port: 5601
...
# To allow connections from remote users, set this parameter to a non-loopback address.
#server.host: "localhost"
...
# The URLs of the Elasticsearch instances to use for all your queries.
#elasticsearch.hosts: ["https://localhost:9200"]
如下图所示:
相应地替换 Kibana 和 Elasticsearch 的 IP 地址。 请注意,在此演示中,所有 Elastic Stack 组件都在同一主机上运行。
...
server.port: 5601
...
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "192.168.60.19"
...
# The URLs of the Elasticsearch instances to use for all your queries.
elasticsearch.hosts: ["https://192.168.60.19:9200"]
启动并启用 Kibana 以在系统引导时运行。
systemctl enable --now kibana
检查状态;
● kibana.service - Kibana Loaded: loaded (/etc/systemd/system/kibana.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2021-07-01 20:06:15 EAT; 9s ago Docs: https://www.elastic.co Main PID: 3594 (node) Tasks: 14 (limit: 4938) Memory: 114.1M CGroup: /system.slice/kibana.service ├─3594 /usr/share/kibana/bin/../node/bin/node /usr/share/kibana/bin/../src/cli/dist --logging.dest=/var/log/kibana/kibana.log --pid.file=/run/kibana/kibana.pid └─3606 /usr/share/kibana/node/bin/node --preserve-symlinks-main --preserve-symlinks /usr/share/kibana/src/cli/dist --logging.dest=/var/log/kibana/kibana.log --p> Jul 01 20:06:15 localhost.localdomain systemd[1]: Started Kibana.
在 FirewallD 上打开 Kibana 端口,如果它正在运行;
firewall-cmd --add-port=5601/tcp --permanent
firewall-cmd --reload
访问 Kibana 接口
您现在可以使用 URL 从浏览器访问 Kibana, https://kibana-server-hostname-OR-IP:5601
.
在 Kibana Web 界面上,您可以选择尝试示例数据,因为还没有数据发送到 Elasticsearch。 您也可以选择探索自己的数据,当然是在将数据发送到 ES 之后。
在 Rocky Linux 8 上安装 Logstash
顺便说一下,这是可选的
Logstash 是 Elastic Stack 的组件,它在将事件数据发送到 Elasticsearch 数据存储之前对其进行进一步处理。 例如,您可以开发自定义正则表达式、grok 模式以从事件数据中提取特定字段。
也可以将数据直接发送到 Elasticsearch,而不是通过 Logstash。
在 Rocky Linux 8 上安装 Logstash。
dnf install logstash
测试 Logstash
Logstash 安装完成后,您可以通过运行如下所示的基本管道命令来验证它是否已准备好处理事件数据;
/usr/share/logstash/bin//logstash -e 'input { stdin { } } output { stdout {} }'
按 ENTER 执行命令并等待 Pipeline 准备好接收输入数据(stdin 插件现在正在等待输入:)。
...
[INFO ] 2021-07-01 20:50:03.470 [LogStash::Runner] agent - No persistent UUID file found. Generating new UUID {:uuid=>"99d0b8b2-3130-413d-bb95-5be99ac6e138", :path=>"/usr/share/logstash/data/uuid"}
[INFO ] 2021-07-01 20:50:05.404 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
...
...
[INFO ] 2021-07-01 20:50:09.164 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2021-07-01 20:50:09.226 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
The stdin plugin is now waiting for input:
输入任何字符串,例如, 测试 Logstash 管道 并按 ENTER。
Logstash 处理输入数据并将时间戳和主机地址信息添加到消息中。
The stdin plugin is now waiting for input:
testing Logstash pipeline << PRESS ENTER AFTER THIS LINE
{
"@version" => "1",
"@timestamp" => 2021-07-01T18:06:31.822Z,
"host" => "elk.kifarunix-demo.com",
"message" => "testing Logstash pipeline"
}
您可以按 Ctrl+D 停止 Logstash 管道。
配置 Logstash 收集事件并将其发送到 Elasticsearch
Logstash 现在已准备好接收和处理数据。 在这个演示中,我们将学习如何配置 Logstash 管道以从本地系统收集事件。
Logstash 管道由三个部分组成;
- 这 输入: 从不同来源收集数据
- 这 筛选:(可选)对数据进行进一步处理。
- 这 输出:将接收到的数据存储到目标数据存储中,例如 Elasticsearch。
配置 Logstash 输入插件
您可以配置 beats 将数据发送到 Logstash 或简单地读取系统上的本地文件。 为了从本地系统收集事件,我们将使用文件输入插件。
您可以使用多个输入插件,请在 Logstash 输入插件上查看它们。
在这个演示中,我们使用单个配置文件来定义管道组件; 输入,过滤器,输出。
vim /etc/logstash/conf.d/local-ssh-events.conf
## Collect System Authentication events from /var/log/secure
input {
file {
path => "/var/log/secure"
type => "ssh_auth"
}
配置 Logstash 过滤器插件
配置 Logstash 过滤器以仅提取相关事件,例如从 /var/log/secure
文件。
Jul 1 21:45:34 localhost sshd[13707]: Failed password for invalid user gentoo from 192.168.60.18 port 53092 ssh2
Jul 1 21:45:50 localhost sshd[13768]: Failed password for invalid user KIFARUNIX from 192.168.60.18 port 53094 ssh2
Jul 1 21:47:14 localhost sshd[13949]: Failed password for root from 192.168.60.18 port 53112 ssh2
Jul 1 21:47:17 localhost sshd[13949]: Accepted password for root from 192.168.60.18 port 53112 ssh2
我们正在使用 Grok 过滤器来处理从日志文件中提取这些行;
vim /etc/logstash/conf.d/local-ssh-events.conf
## Collect System Authentication events from /var/log/secure input { file { path => "/var/log/secure" type => "ssh_auth" } } filter { if [type] == "ssh_auth" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>.+)s+fors+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "linux_auth" } grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>.+)s+fors+invalids+users%{USER:auth_user_nonexist}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "linux_auth" } } # Drop any message that doesn't contain the keywords below if [message] !~ /(Failed password|Accepted password|Accepted publickey|for invalid)/ { drop { } } }
请注意,上面使用的 grok 过滤器只会捕获基于密码和公钥的 SSH 登录。 您可以使用 Kibana 上的 Grok Debugger 来测试您的模式, 开发工具 > Grok 调试器.
另外,请注意过滤器;
if [message] !~ /(Failed password|Accepted password|Accepted publickey|for invalid)/ { drop { }
出于演示的目的并仅捕获我们在此演示中需要的日志,该行基本上删除了不包含上述指定关键字的任何其他事件日志。
配置 Logstash 输出插件
接下来,我们要将处理后的数据发送到在本地主机上运行的 Elasticsearch。
定义 Elasticsearch 输出。
vim /etc/logstash/conf.d/local-ssh-events.conf
## Collect System Authentication events from /var/log/secure input { file { path => "/var/log/secure" type => "ssh_auth" } } filter { if [type] == "ssh_auth" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>.+)s+fors+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "linux_auth" } grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>.+)s+fors+invalids+users%{USER:auth_user_nonexist}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "linux_auth" } } # Drop any message that doesn't contain the keywords below if [message] !~ /(Failed password|Accepted password|Accepted publickey|for invalid)/ { drop { } } } ## Send data to Elasticsearch on the localhost output { elasticsearch { hosts => ["192.168.60.19:9200"] manage_template => false index => "ssh_auth-%{+YYYY.MM}" } }
确保 Logstash 可以读取被监控的文件, /var/log/secure
. 默认情况下,文件归 root 所有。 因此,为了使logstash 能够读取文件,首先将组所有权更改为adm,将logstash 添加到组adm,并将读取权限分配给logstash。
chown :adm /var/log/secure
usermod -aG adm logstash
chmod g+r /var/log/secure
验证 Logstash Grok 过滤器
在将数据发送到 Elasticsearch 之前,您需要验证 grok 过滤器。 按照下面的指南学习如何调试 grok 模式。
如何调试 Logstash Grok 过滤器
验证 Logstash 配置
配置完成后,运行以下命令验证Logstash配置,然后才能启动它。
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
...
[2021-07-01T21:50:50,644][INFO ][org.reflections.Reflections] Reflections took 36 ms to scan 1 urls, producing 24 keys and 48 values
Configuration OK
[2021-07-01T21:50:52,005][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
...
配置OK 确认配置文件没有错误。
如果需要调试特定的 Logstash 管道配置文件,可以执行下面的命令。 将配置文件的路径替换为您的文件路径。 执行此命令时,确保 logstash 未运行。
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/local-ssh-events.conf --path.settings /etc/logstash/
运行 Logstash
您可以启动并启用 Logstash 在系统引导时运行。
systemctl enable --now logstash
如果由于某些奇怪的原因 Logstash 没有生成具有以下症状的 systemd 服务文件:(Failed to restart logstash.service: Unit logstash.service not found.)
只需执行命令即可生成服务文件;
/usr/share/logstash/bin/system-install /etc/logstash/startup.options systemd
您可以启动并启用它在启动时运行,如上所示。
检查状态;
systemctl status logstash
● logstash.service - logstash Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2021-07-01 21:53:56 EAT; 6s ago Main PID: 15139 (java) Tasks: 15 (limit: 23673) Memory: 311.8M CGroup: /system.slice/logstash.service └─15139 /usr/share/logstash/jdk/bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Djava.a> Jul 01 21:53:56 elk.kifarunix-demo.com systemd[1]: Started logstash. Jul 01 21:53:56 elk.kifarunix-demo.com logstash[15139]: Using bundled JDK: /usr/share/logstash/jdk Jul 01 22:03:31 elk.kifarunix-demo.com logstash[16724]: OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be remove> Jul 01 22:03:48 elk.kifarunix-demo.com logstash[16724]: Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties Jul 01 22:03:48 elk.kifarunix-demo.com logstash[16724]: [2021-07-01T22:03:48,814][INFO ][logstash.runner ] Log4j configuration path used is: /etc/logstash/log4j2.> Jul 01 22:03:48 elk.kifarunix-demo.com logstash[16724]: [2021-07-01T22:03:48,824][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.13.2", "jruby> Jul 01 22:03:50 elk.kifarunix-demo.com logstash[16724]: [2021-07-01T22:03:50,369][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
确认 Elasticsearch Index 上的数据接收
对您的系统执行一些身份验证,然后返回 Kibana 界面 > 管理 > 堆栈管理 > 数据 > 索引管理.
如果您的事件已被接收并转发到 Elasticsearch,那么现在应该已经创建了定义的索引。
在 Kibana 上可视化数据
要在 Kibana 中可视化和探索数据,您需要创建索引模式以从 Elasticsearch 检索数据。
在这个演示中,我们的索引模式是 ssh_auth-*
,如 Logstash Elasticsearch 输出插件中所定义, index => "ssh_auth-%{+YYYY.MM}"
.
在 Kibana 仪表板上,导航到 管理 > 堆栈管理 > Kibana > 索引模式 > 创建索引模式.
索引模式可以匹配单个索引的名称,或包含通配符
匹配多个索引。 单击下一步并选择 @时间戳 作为时间过滤器并单击创建索引模式
. 之后,点击 发现选项卡
在左侧窗格中查看数据。 适当扩大时间范围。
您也可以根据您的 grok 模式选择要查看的字段。 在下面的屏幕截图中,时间范围是过去 15 分钟。
这就是如何在 Rocky Linux 8 上安装 ELK Stack 的全部内容。如果使用 Logstash 作为数据处理引擎,请确保根据自己的喜好自定义 Grok 模式。
参考;
安装弹性堆栈
将 Wazuh Manager 与 ELK Stack 集成
使用 ElastAlert 配置 ELK 堆栈警报
.tdi_4.td-a-rec{text-align:center}.tdi_4 .td-element-style{z-index:-1}.tdi_4.td-a-rec-img{text-align:left}.tdi_4 .td-a-rec-img img{margin:0 auto 0 0}@media(max-width:767px){.tdi_4.td-a-rec-img{text-align:center}}