1. Logstash 概述

Logstash 是一个具有实时管道功能的开源数据收集引擎。Logstash可以动态统一来自不同来源的数据,并将数据规范化到您选择的目标中。为了多样化的高级下游分析和可视化用例,清理和使所有数据平等化。

虽然 Logstash 最初在日志收集方面推动了创新,但它的能力远远超出了该用例。任何类型的事件都可以通过广泛的输入、过滤和输出插件进行增强和转换,许多本地编解码器进一步简化了摄入过程。Logstash 通过利用更多的数据量和种类加速您的洞察力。

Logstash 到 Elastic Cloud 无服务器
使用 Logstash Elasticsearch 输出插件将数据发送到 Elastic Cloud 无服务器。请注意 Elastic Cloud 无服务器与 Elasticsearch 服务和自管理的 Elasticsearch 之间的以下差异:

  • 使用 API 密钥从 Logstash 访问 Elastic Cloud 无服务器。忽略 Elasticsearch 输出插件配置中的任何基于用户的安全设置,可能会导致错误。

  • Elastic Cloud 无服务器使用数据流数据生命周期管理 (DLM),而不是索引生命周期管理(ILM)。忽略 Elasticsearch 输出插件配置中的任何 ILM 设置,可能会导致错误。

  • 通过 Elastic Cloud 无服务器上的 Elastic Observability 中的 Logstash 集成提供 Logstash 监控

Logstash 到 Elasticsearch 无服务器的已知问题。logstash-output-elasticsearch 设置默认为端口:9200。将该值设置为端口:443。

2. 工作原理

Logstash 事件处理管道有三个阶段:输入 → 过滤器 → 输出

inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

inputs 和 outputs 支持编解码器,使您能够在数据进入或离开管道时对数据进行编码或解码,而无需使用单独的过滤器。

提示:inputs/filters/outputs是通过插件机制扩展各种能力

1686562622214
inputs:inputs 可以收集多种数据源的数据,下面是常见的数据源:

  • file:扫描磁盘中的文件数据,例如: 扫描日志文件。

  • syslog:监听端口514以获取 syslog 消息,并根据 RFC3164 格式解析。

  • mysql :扫描 Mysql 的表数据

  • redis:从 redis 服务器中读取数据,使用 redis 通道和 redis 列表。

    • Redis通常用作集中式 Logstash 安装中的“代理”,用于排队来自远程 Logstash “发件人”的 Logstash 事件。
  • Filebeat:轻量级的文件数据采集器,可以取代file的能力。

  • 消息队列 kafka、rabbitmq 等:支持从各种消息队列读取数据。

filters:filters 是一个可选模块,可以在数据同步到目的地之前,对数据进行一些格式化、过滤、简单的数据处理操作。常用的filters功能:

  • grok:logstash 中最常用的日志解释和结构化插件。

    • grok 是一种采用组合多个预定义的正则表达式,用来匹配分割文本并映射到关键字的工具。
  • mutate :支持事件的变换,例如重命名、移除、替换、修改等

  • drop :完全丢弃事件

  • clone :克隆事件

  • geoip:添加关于 IP 地址的地理位置信息

outputs:Logstatsh的最后一个处理节点,outputs负责将数据同步到目的地。下面是常见的目的地:

  • elasticsearch:将事件数据发送到Elasticsearch。
  • file:将事件数据写入磁盘上的文件,也可以将数据同步到一个文件中。
  • graphite:将事件数据发送到 Graphite,这是一个流行的用于存储和绘制度量指标的开源工具。
  • statsd:将事件数据发送到 Statsd,这是一个“监听通过 UDP 发送的统计信息(如计数器和定时器)的服务,并将聚合数据发送到一个或多个可插拔的后端服务”的服务。

Codecs:codecs 就是编码器,负责对数据进行序列号处理,主要就是 json 和文本两种编码器。

3. 安装和配置

  • 参考官方网站 Logstash

  • 每个版本的下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash
    image-20240314105652388

Hostname CPU/硬盘 IP
logstash-server 2c2g/20GB 192.168.221.140

1. 安装(两种方法)

  • 方法一:
[root@logstash-server ~]# curl -OL https://artifacts.elastic.co/downloads/logstash/logstash-8.12.2-linux-x86_64.tar.gz		#可能会下很久,因为要访问国外的网站下载

[root@logstash-server ~]# tar -xzf logstash-8.12.2-linux-x86_64.tar.gz  -C /usr/local/

[root@logstash-server ~]# mv /usr/local/logstash-8.12.2/ /usr/local/logstash
  • 方法二:yum 安装
#下载并安装公共签名密钥:
[root@logstash-server ~]# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

#在 yum 目录中添加以下内容到一个带有后缀.repo的文件中
[root@logstash-server ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

#重新加载一下yum仓库
[root@logstash-server ~]# yum clean all && yum makecache
已加载插件:fastestmirror
正在清理软件源: epel extras logstash-8.x os updates
Cleaning up list of fastest mirrors
Other repos take up 36 M of disk space (use --verbose for details)
已加载插件:fastestmirror
Determining fastest mirrors
epel                                                     | 4.7 kB     00:00
extras                                                   | 2.9 kB     00:00
logstash-8.x                                             | 1.3 kB     00:00
os                                                       | 3.6 kB     00:00
updates                                                  | 2.9 kB     00:00
(1/19): epel/7/x86_64/group_gz                             | 100 kB   00:00
(2/19): epel/7/x86_64/updateinfo                           | 1.0 MB   00:00
(3/19): epel/7/x86_64/prestodelta                          | 2.5 kB   00:00
(4/19): epel/7/x86_64/filelists_db                         |  12 MB   00:00
(5/19): epel/7/x86_64/primary_db                           | 7.0 MB   00:00
(6/19): epel/7/x86_64/other_db                             | 3.4 MB   00:00
(7/19): extras/7/x86_64/filelists_db                       | 303 kB   00:00
(8/19): extras/7/x86_64/primary_db                         | 250 kB   00:00
(9/19): extras/7/x86_64/other_db                           | 150 kB   00:00
(10/19): logstash-8.x/primary                              | 369 kB   00:01
(11/19): os/7/x86_64/group_gz                              | 153 kB   00:00
(12/19): os/7/x86_64/primary_db                            | 6.1 MB   00:00
(13/19): logstash-8.x/other                                |  47 kB   00:00
(14/19): os/7/x86_64/filelists_db                          | 7.2 MB   00:00
(15/19): os/7/x86_64/other_db                              | 2.6 MB   00:00
(16/19): updates/7/x86_64/primary_db                       |  25 MB   00:00
(17/19): updates/7/x86_64/other_db                         | 1.5 MB   00:00
(18/19): updates/7/x86_64/filelists_db                     |  14 MB   00:00
(19/19): logstash-8.x/filelists                            |  53 MB   00:04
logstash-8.x                                                          1086/1086
logstash-8.x                                                          1086/1086
logstash-8.x                                                          1086/1086
元数据缓存已建立

#开始安装
[root@logstash-server ~]# yum -y install logstash
已加载插件:fastestmirror
Loading mirror speeds from cached hostfile
正在解决依赖关系
--> 正在检查事务
---> 软件包 logstash.x86_64.1.8.12.2-1 将被 安装
--> 解决依赖关系完成

依赖关系解决

================================================================================
 Package          架构           版本                源                    大小
================================================================================
正在安装:
 logstash         x86_64         1:8.12.2-1          logstash-8.x         333 M

事务概要
================================================================================
安装  1 软件包

总下载量:333 M
安装大小:579 M
Downloading packages:
logstash-8.12.2-x86_64.rpm                                 | 333 MB   00:12
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  正在安装    : 1:logstash-8.12.2-1.x86_64                                  1/1
  验证中      : 1:logstash-8.12.2-1.x86_64                                  1/1

已安装:
  logstash.x86_64 1:8.12.2-1

完毕!
#注意:仓库不适用于仍使用RPM v3的较旧的基于rpm的发行版,比如CentOS5。

2. 测试运行

运行最基本的 Logstash 管道来测试 Logstash 安装。

Logstash管道具有两个必需元素 input 和 output,以及一个可选元素 filter(过滤器)。

输入插件使用来自源的数据,过滤器插件根据你的指定修改数据,输出插件将数据写入目标。

进入 Logstash 的安装主目录下执行:

[root@logstash-server logstash]# bin/logstash -e ''
Using bundled JDK: /usr/local/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
  • -e 选项用于设置 Logstash 处理数据的输入和输出

  • -e '' 等同于 -e input { stdin { type => stdin } } output { stdout { codec => rubydebug } }

  • input { stdin { type => stdin } } 表示 Logstash 需要处理的数据来源来自于标准输入设备

  • output { stdout { codec => rubydebug } } 表示 Logstash 把处理好的数据输出到标准输出设备

稍等片刻,当看到屏幕上输出如下字样,即可尝试使用键盘输入 hello 字样

[2024-03-14T11:21:21,651][INFO ][logstash.agent           ] Pipelines running {:count=>1,:running_pipelines=>[:main], :non_running_pipelines=>[]}

输入 hello 即会立刻输出配格式化后的数据信息

The stdin plugin is now waiting for input:
hello
{
         "event" => {
        "original" => "hello"
    },
          "host" => {
        "hostname" => "logstash-server"
    },
       "message" => "hello",
      "@version" => "1",
    "@timestamp" => 2024-03-14T03:21:43.519434493Z,
          "type" => "stdin"
}

image-20240314112301915

  • message 字段对应的值是 Logstash 接收到的一行完整的数据
  • @version 是版本信息,可以用于建立索引使用
  • @timestamp 处理此数据的时间戳,可以用于建立索引和搜索
  • type 就是之前 input 中设置的值,这个值可以任意修改,但是,type 是内置的变量,不能修改,用于建立索引和条件判断等
  • hosts 表示从那个主机过来的数据

修改 type 的值为 nginx 的示例(主要是区分索引的时候用,这里改了之后没什实质效果)

[root@logstash-server logstash]# ./bin/logstash -e "input { stdin { type => nginx } } output { stdout { codec => rubydebug } }"

#稍等一会,看到 Pipeline main started,就说明启动正常了
[2024-03-14T11:24:27,247][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-03-14T11:24:27,261][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

#输入hello
{
      "@version" => "1",
    "@timestamp" => 2024-03-14T03:24:33.458038810Z,
       "message" => "hello",
          "host" => {
        "hostname" => "logstash-server"
    },
         "event" => {
        "original" => "hello"
    },
          "type" => "nginx"
}

image-20240314112521291

3. 配置输入和输出

生产中,Logstash 管道要复杂一些:它通常具有一个或多个输入,过滤器和输出插件。

本部分中,将创建一个 Logstash 管道,该管道使用标准输入来获取 Apache Web 日志作为输入,解析这些日志以从日志中创建特定的命名字段,然后将解析的数据输出到标准输出(屏幕上)。

并且这次无需在命令行上定义管道配置,而是在配置文件中定义管道。

创建任意一个文件,并写入如下内容,作为 Logstash 的管道配置文件

[root@logstash-server logstash]# vim /usr/local/logstash/config/first-pipeline.conf
input { 
    stdin {} 
} 
output { 
    stdout {} 
}

配置文件语法测试

[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf --config.test_and_exit
Using bundled JDK: /usr/local/logstash/jdk
/usr/local/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
/usr/local/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2024-03-14T11:39:36,651][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties
[2024-03-14T11:39:36,653][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.12.2", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
[2024-03-14T11:39:36,655][INFO ][logstash.runner          ] JVM bootstrap flags: [-XX:+HeapDumpOnOutOfMemoryError, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, -Djruby.regexp.interruptible=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11, -Dlog4j2.isThreadContextMapInheritable=true, -Xms1g, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Djdk.io.File.enableADS=true, -Dfile.encoding=UTF-8, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, -Djruby.compile.invokedynamic=true, -Xmx1g, -Djava.security.egd=file:/dev/urandom, -Djava.awt.headless=true, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED]
[2024-03-14T11:39:36,656][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `200000000`
[2024-03-14T11:39:36,657][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
[2024-03-14T11:39:36,823][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-03-14T11:39:37,020][INFO ][org.reflections.Reflections] Reflections took 105 ms to scan 1 urls, producing 132 keys and 468 values
/usr/local/logstash/vendor/bundle/jruby/3.1.0/gems/amazing_print-1.5.0/lib/amazing_print/formatter.rb:37: warning: previous definition of cast was here
[2024-03-14T11:39:37,148][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
Configuration OK
[2024-03-14T11:39:37,148][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
#看到 OK 就说明测试正常

-f 用于指定管道配置文件。

运行如下命令启动 Logstash

[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf
Using bundled JDK: /usr/local/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-05-04T10:40:09,455][INFO ][logstash.runner          ] Log4j 
.......
The stdin plugin is now waiting for input:

启动后复制如下内容到命令行中,并按下回车键

#启动后复制如下内容到命令行中,并按下回车键
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

将会看到如下输出

{
      "@version" => "1",
       "message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
    "@timestamp" => 2024-03-14T03:41:48.984091377Z,
          "host" => {
        "hostname" => "logstash-server"
    },
         "event" => {
        "original" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
    }
}

4. 使用 Grok 过滤器插件解析 Web 日志

现在有了一个工作管道,但是日志消息的格式不是理想的。你想解析日志消息,以便能从日志中创建特定的命名字段。为此,应该使用grok 过滤器插件。

使用 grok 过滤器插件,可以将非结构化日志数据解析为结构化和可查询的内容

grok 会根据你感兴趣的内容分配字段名称,并把这些内容和对应的字段名称进行绑定。

grok 如何知道哪些内容是你感兴趣的呢?它是通过自己预定义的模式来识别感兴趣的字段的。这个可以通过给其配置不同的模式来实现。

这里使用的模式是 %{COMBINEDAPACHELOG}

%{COMBINEDAPACHELOG} 是一个预定义的 grok 模式,用于解析 Apache HTTP 服务器的**“combined”**日志格式。

{COMBINEDAPACHELOG} 使用以下模式从 Apache 日志中构造行:

原信息 对应新的字段名称
IP 地址 clientip
用户 ID ident
用户认证信息 auth
时间戳 timestamp
HTTP 请求方法 verb
请求的 URL request
HTTP 版本 httpversion
响应码 response
响应体大小 bytes
跳转来源 referer(类似nginx中防盗链的referer
客户端代理(浏览器) agent

关于 grok 更多的用法请参考 grok 参考文档

并且这里要想实现修改配置文件之后自动加载它,不能配置 inputstdin
所以, 这里使用了 file ,创建示例日志文件

[root@logstash-server ~]# vim /var/log/httpd.log
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

确保没有缓存数据

[root@logstash file]# pwd
/usr/local/logstash/data/plugins/inputs/file
[root@logstash file]# ls -a
.  ..  .sincedb_aff270f7990dabcdbd0044eac08398ef
[root@logstash file]# rm -rf .sincedb_aff270f7990dabcdbd0044eac08398ef

#第一次执行肯定是没有的,data目录下面也没有plugins这个目录

修改好的管道配置文件如下:

[root@logstash-server logstash]# vim /usr/local/logstash/config/first-pipeline.conf

#注释方法#####
input {
    file {
        path => ["/var/log/httpd.log"]
        start_position => "beginning"
    }
}

filter {
    grok {  # 对 web 日志进行过滤处理,输出结构化的数据
    	  # 在 message 字段对应的值中查询匹配上 COMBINEDAPACHELOG
         match => { "message" => "%{COMBINEDAPACHELOG}" }   
         }
}

output {
    stdout {}
}

match => { "message" => "%{COMBINEDAPACHELOG}"} 的意思是:
当匹配到 “message” 字段时,用户模式 “COMBINEDAPACHELOG}” 进行字段映射。

配置完成后,再次进行验证

[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf

下面是输出内容

#需要等一下,才能输出以下内容:
[2024-03-14T11:49:56,399][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-03-14T11:49:56,443][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
{
    "@timestamp" => 2024-03-14T03:49:56.438442963Z,
      "@version" => "1",
    "user_agent" => {
        "original" => "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
    },
          "host" => {
        "name" => "logstash-server"
    },
           "log" => {
        "file" => {
            "path" => "/var/log/httpd.log"
        }
    },
          "http" => {
         "request" => {
              "method" => "GET",
            "referrer" => "http://semicomplete.com/presentations/logstash-monitorama-2013/"
        },
         "version" => "1.1",
        "response" => {
            "scode" => 200,
                   "body" => {
                "bytes" => 203023
            }
        }
    },
     "timestamp" => "04/Jan/2015:05:13:42 +0000",
           "url" => {
        "original" => "/presentations/logstash-monitorama-2013/imageskibana-search.png"
    },
       "message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
         "event" => {
        "original" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
    },
        "source" => {
        "address" => "83.149.9.216"
    }
}

你会发现原来的非结构化数据,变为结构化的数据了。

细心的你一定发现原来的 message 字段仍然存在,假如你不需要它,可以使用 grok 中提供的常用选项之一: remove_field 来移除这个字段。
remove_field 可以移除任意的字段,它可以接收的值是一个数组。

rename 可以重新命名字段

Mutate 过滤器配置选项

配置选项 用途
add_field 向事件添加新字段
remove_field 从事件中删除任意字段
add_tag 向事件添加任意标签
remove_tag 从事件中删除标签(如果存在)
convert 将字段值转换为另一种数据类型
id 向现场事件添加唯一的ID
lowercase 将字符串字段转换为其小写形式
replace 用新值替换字段
strip 删除开头和结尾的空格
uppercase 将字符串字段转换为等效的大写字母
update 用新值更新现有字段
rename 重命名事件中的字段
gsub 用于查找和替换字符串中的替换
merge 合并数组或 hash 事件

修改后管道配置文件如下:

[root@logstash-server logstash]# vim config/first-pipeline.conf

input {
    file {
        path => ["/var/log/httpd.log"]
        start_position => "beginning"
    }
}

filter {
    grok {
         match => { "message" => "%{COMBINEDAPACHELOG}" }    
    }
    
    mutate {
            #重写字段
            rename => {
                "status_code" => "scode"
            }
    }
    
    mutate {
             #去掉没用字段
             remove_field => ["message","input_type","@version","fields"]
    }
}


output {
    stdout {}
}

再次测试,你会发现 message 不见了,而且 status_code 重命名成了 scode :

[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf
Using bundled JDK: /usr/local/logstash/jdk
......
[2024-03-14T11:54:43,296][INFO ][filewatch.observingtail  ][main][17f9be0d29f2eb1e2fd3e943d4672f5fc989db530509b86b731852814b0e0a46] START, creating Discoverer, Watch with file and sincedb collections
[2024-03-14T11:54:43,304][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-03-14T11:54:43,315][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
#如果看到 Pipeline started 成功了之后,但是还是一直不输出数据,那就说明 data 目录下面有缓存,需要将缓存删除一下,再执行这一步的操作

#删除缓存
[root@logstash-server logstash]# cd data/
[root@logstash-server data]# ls
dead_letter_queue  plugins  queue  uuid
[root@logstash-server data]# ll
总用量 4
drwxr-xr-x 2 root root  6 314 11:19 dead_letter_queue
drwxr-xr-x 3 root root 20 314 11:49 plugins
drwxr-xr-x 2 root root  6 314 11:19 queue
-rw-r--r-- 1 root root 36 314 11:19 uuid
[root@logstash-server data]# rm -rf plugins/

[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf
...
{
           "log" => {
        "file" => {
            "path" => "/var/log/httpd.log"
        }
    },
          "http" => {
         "version" => "1.1",
         "request" => {
            "referrer" => "http://semicomplete.com/presentations/logstash-monitorama-2013/",
              "method" => "GET"
        },
        "response" => {
                   "body" => {
                "bytes" => 203023
            },
            "scode" => 200
        }
    },
    "user_agent" => {
        "original" => "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
    },
     "timestamp" => "04/Jan/2015:05:13:42 +0000",
          "host" => {
        "name" => "logstash-server"
    },
    "@timestamp" => 2024-03-14T03:58:41.236243588Z,
           "url" => {
        "original" => "/presentations/logstash-monitorama-2013/imageskibana-search.png"
    },
         "event" => {
        "original" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
    },
        "source" => {
        "address" => "83.149.9.216"
    }
}
#你会发现 message不见了,而且 status_code 重命名成了 scode 

5. 使用 Geoip 过滤器插件增强数据

注意:本插件在8.1之后暂不可用,以下实验版本为:logstash-7.13.2

geoip:geographic ip的缩写,IP地理位置数据库

除解析日志数据以进行更好的搜索外,筛选器插件还可以从现有数据中获取补充信息。例如,geoip 插件可以通过查找到IP地址,并从自己自带的数据库中找到地址对应的地理位置信息,然后将该位置信息添加到日志中。

该geoip插件配置要求您指定包含IP地址来查找源字段的名称。在此示例中,该clientip字段包含IP地址。

    geoip {
        source => "clientip"
    }

由于过滤器是按顺序求值的,因此请确保该geoip部分位于grok配置文件的该部分之后,并且grok和geoip部分都嵌套在该filter部分中。

完成后的管道配置文件如下:

[root@logstash-server logstash]# vim config/first-pipeline.conf

input {
    file {
        path => ["/var/log/httpd.log"]
        start_position => "beginning"
    }
}

filter {
    grok {
         match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    geoip { source => "clientip" }
}

output {
    stdout {}
}

再次输入之前的日志内容,就会看到如下输出

#记得先删除缓存
[root@logstash-server logstash]# rm -rf data/plugins

[root@logstash-server logstash]# bin/logstash -f  config/first-pipeline.conf
[2023-05-04T11:30:41,667][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
{
           "host" => "logstash-server",
           "verb" => "GET",
          "geoip" => {
          "country_name" => "Russia",
         "country_code2" => "RU",
              "location" => {
            "lat" => 55.7527,
            "lon" => 37.6172
        },
             "longitude" => 37.6172,
           "region_name" => "Moscow",
           "region_code" => "MOW",
              "timezone" => "Europe/Moscow",
         "country_code3" => "RU",
        "continent_code" => "EU",
                    "ip" => "83.149.9.216",
             "city_name" => "Moscow",
              "latitude" => 55.7527,
           "postal_code" => "129223"
    },
          "ident" => "-",
       "clientip" => "83.149.9.216",
           "auth" => "-",
     "@timestamp" => 2023-05-04T03:30:42.063Z,
        "message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
      "timestamp" => "04/Jan/2015:05:13:42 +0000",
       "@version" => "1",
           "path" => "/var/log/httpd.log",
        "request" => "/presentations/logstash-monitorama-2013/imageskibana-search.png",
          "bytes" => "203023",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
    "httpversion" => "1.1",
       "response" => "200",
       "referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\""
}

详情请参考 grokgeoip,更多过滤器插件的使用:过滤器插件

#查看插件
[root@logstash-server logstash]# ./bin/logstash-plugin list
Using bundled JDK: /usr/local/logstash/jdk
logstash-codec-avro
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-plain
logstash-codec-rubydebug
logstash-filter-aggregate
logstash-filter-anonymize
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-de_dot
logstash-filter-dissect
logstash-filter-dns
logstash-filter-drop
logstash-filter-elasticsearch
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-http
logstash-filter-json
logstash-filter-kv
logstash-filter-memcached
logstash-filter-metrics
logstash-filter-mutate
logstash-filter-prune
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-translate
logstash-filter-truncate
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
logstash-input-azure_event_hubs
logstash-input-beats
└── logstash-input-elastic_agent (alias)
logstash-input-couchdb_changes
logstash-input-dead_letter_queue
logstash-input-elastic_serverless_forwarder
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jms
logstash-input-pipe
logstash-input-redis
logstash-input-snmp
logstash-input-snmptrap
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
logstash-integration-aws
 ├── logstash-codec-cloudfront
 ├── logstash-codec-cloudtrail
 ├── logstash-input-cloudwatch
 ├── logstash-input-s3
 ├── logstash-input-sqs
 ├── logstash-output-cloudwatch
 ├── logstash-output-s3
 ├── logstash-output-sns
 └── logstash-output-sqs
logstash-integration-elastic_enterprise_search
 ├── logstash-output-elastic_app_search
 └──  logstash-output-elastic_workplace_search
logstash-integration-jdbc
 ├── logstash-input-jdbc
 ├── logstash-filter-jdbc_streaming
 └── logstash-filter-jdbc_static
logstash-integration-kafka
 ├── logstash-input-kafka
 └── logstash-output-kafka
logstash-integration-logstash
 ├── logstash-input-logstash
 └── logstash-output-logstash
logstash-integration-rabbitmq
 ├── logstash-input-rabbitmq
 └── logstash-output-rabbitmq
logstash-output-csv
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pipe
logstash-output-redis
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs
logstash-patterns-core

6. 配置接收 Beats 的输入

# 监听 5044 端口,接收 filebeat 的输入;logstash服务器上操作
[root@logstash-server logstash]# vim config/first-pipeline.conf
input {
    beats {
      port => 5044
   }
}

filter {
    grok {
         match => { "message" => "%{COMBINEDAPACHELOG}" } 
    }
    # geoip { source => "clientip" }
}

output {
    stdout {}
}

运行 logstash 之后,修改 filebeat 的 yml 文件输出目标如下:

# filebeat 服务器上面操作:
[root@filebeat-server filebeat]# vim filebeat.yml
...
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.221.140:5044"]	#IP是logstash的IP
 ...
 
#将 output.elasticsearch 删除,output.logstash复制到这里

filebeat机器清除缓存目录

[root@filebeat-server filebeat]# rm -rf /usr/local/filebeat/data/

运行filebeat

[root@filebeat-server filebeat]# systemctl restart filebeat.service
[root@filebeat-server filebeat]# systemctl status filebeat.service
● filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.
   Loaded: loaded (/usr/lib/systemd/system/filebeat.service; enabled; vendor preset: disabled)
   Active: active (running) since 四 2024-03-14 15:29:16 CST; 6s ago
 Main PID: 1418 (filebeat)
   CGroup: /system.slice/filebeat.service
           └─1418 /usr/local/filebeat/filebeat -c /usr/local/filebeat/filebea...

314 15:29:16 filebeat-server systemd[1]: Stopped Filebeat sends log file....
314 15:29:16 filebeat-server systemd[1]: Started Filebeat sends log file....
Hint: Some lines were ellipsized, use -l to show in full.

运行logstash

[root@logstash-server logstash]# rm -rf data/plugins
[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf
Using bundled JDK: /usr/local/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
.......
.......
#可以看到 logstash 中取的是 filebeat 收集的日志信息
{
         "input" => {
        "type" => "log"
    },
        "source" => {
        "address" => "123.127.39.50"
    },
          "http" => {
         "request" => {
            "referrer" => "http://81.68.233.173/",
              "method" => "GET"
        },
         "version" => "1.1",
        "response" => {
                   "body" => {
                "bytes" => 14137
            },
            "status_code" => 200
        }
    },
           "ecs" => {
        "version" => "1.12.0"
    },
           "log" => {
        "offset" => 0,
          "file" => {
            "path" => "/opt/nginx/log/nginx/access.log"
        }
    },
         "agent" => {
                  "id" => "afbbf9f5-d7f7-4057-a70d-fa4e3a4741fc",
             "version" => "8.12.2",
                "type" => "filebeat",
        "ephemeral_id" => "28cf958a-d735-43d4-88c0-19d4460a39f2",
                "name" => "filebeat-server"
    },
      "@version" => "1",
          "host" => {
        "containerized" => false,
         "architecture" => "x86_64",
                 "name" => "filebeat-server",
                  "mac" => [
            [0] "00-0C-29-40-59-B2"
        ],
                   "id" => "4746d2ecb7c945cdbc93de5d156817a0",
                   "ip" => [
            [0] "192.168.221.139",
            [1] "fe80::4ee8:bb9d:ef6c:9934"
        ],
             "hostname" => "filebeat-server",
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
                "name" => "CentOS Linux",
                "type" => "linux",
             "version" => "7 (Core)",
              "kernel" => "3.10.0-1062.el7.x86_64",
              "family" => "redhat"
        }
    },
    "user_agent" => {
        "original" => "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36"
    },
       "service" => {
        "type" => "nginx"
    },
    "@timestamp" => 2024-03-14T07:30:51.531Z,
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
           "url" => {
        "original" => "/logo.jpg"
    },
       "fileset" => {
        "name" => "access"
    },
       "message" => "123.127.39.50 - - [04/Mar/2021:10:50:28 +0800] \"GET /logo.jpg HTTP/1.1\" 200 14137 \"http://81.68.233.173/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36\" \"-\"",
     "timestamp" => "04/Mar/2021:10:50:28 +0800",
         "event" => {
          "module" => "nginx",
        "original" => "123.127.39.50 - - [04/Mar/2021:10:50:28 +0800] \"GET /logo.jpg HTTP/1.1\" 200 14137 \"http://81.68.233.173/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36\" \"-\"",
        "timezone" => "+08:00",
         "dataset" => "nginx.access"
    }
}
{
         "input" => {
        "type" => "log"
    },
           "ecs" => {
        "version" => "1.12.0"
    },
           "log" => {
        "offset" => 0,
          "file" => {
            "path" => "/opt/nginx/log/nginx/error.log"
        }
    },
         "agent" => {
                  "id" => "afbbf9f5-d7f7-4057-a70d-fa4e3a4741fc",
                "type" => "filebeat",
             "version" => "8.12.2",
        "ephemeral_id" => "28cf958a-d735-43d4-88c0-19d4460a39f2",
                "name" => "filebeat-server"
    },
      "@version" => "1",
          "host" => {
        "containerized" => false,
         "architecture" => "x86_64",
                 "name" => "filebeat-server",
                  "mac" => [
            [0] "00-0C-29-40-59-B2"
        ],
                   "id" => "4746d2ecb7c945cdbc93de5d156817a0",
                   "ip" => [
            [0] "192.168.221.139",
            [1] "fe80::4ee8:bb9d:ef6c:9934"
        ],
             "hostname" => "filebeat-server",
                   "os" => {
            "codename" => "Core",
              "family" => "redhat",
                "name" => "CentOS Linux",
                "type" => "linux",
             "version" => "7 (Core)",
              "kernel" => "3.10.0-1062.el7.x86_64",
            "platform" => "centos"
        }
    },
       "service" => {
        "type" => "nginx"
    },
    "@timestamp" => 2024-03-14T07:30:51.531Z,
          "tags" => [
        [0] "beats_input_codec_plain_applied",
        [1] "_grokparsefailure"
    ],
       "fileset" => {
        "name" => "error"
    },
       "message" => "2021/03/04 10:50:28 [error] 11396#0: *5 open() \"/farm/bg.jpg\" failed (2: No such file or directory), client: 123.127.39.50, server: localhost, request: \"GET /bg.jpg HTTP/1.1\", host: \"81.68.233.173\", referrer: \"http://81.68.233.173/\"",
         "event" => {
          "module" => "nginx",
        "original" => "2021/03/04 10:50:28 [error] 11396#0: *5 open() \"/farm/bg.jpg\" failed (2: No such file or directory), client: 123.127.39.50, server: localhost, request: \"GET /bg.jpg HTTP/1.1\", host: \"81.68.233.173\", referrer: \"http://81.68.233.173/\"",
         "dataset" => "nginx.error",
        "timezone" => "+08:00"
    }
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐