ELK 架构之 Logstash 和 Filebeat 配置使用(采集过滤)详解架构师

ELK 使用步骤:Spring Boot 日志输出到指定目录,Filebeat 进行采集,Logstash 进行过滤,Elasticsearch 进行存储,Kibana 进行展示。

Filebeat 示例配置(vi /etc/filebeat/filebeat.yml):

filebeat.prospectors: - input_type: log 
  paths: - /var/log/spring-boot-log4j2/*.log 
  document_type: "spring-boot-log4j2" # 定义写入 ES 时的 _type 值 
  multiline: #pattern: '^/s*(/d{4}|/d{2})/-(/d{2}|[a-zA-Z]{3})/-(/d{2}|/d{4})'   
  # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串) pattern: '^/s*("{)'                         
  # 指定匹配的表达式(匹配以 "{ 开头的字符串) negate: true                                
  # 是否匹配到 match: after                                
  # 合并到上一行的末尾 max_lines: 1000                             
  # 最大的行数 timeout: 30s                                
  # 如果在规定的时候没有新的日志事件就不等待后面的日志 
  fields: logsource: node1 logtype: spring-boot-log4j2 
 
- input_type: log 
  paths: - /var/log/messages #- /var/log/*.log 
  document_type: "syslog" # 定义写入 ES 时的 _type 值 
  fields: logsource: node1 logtype: syslog 
 
#output.elasticsearch: 
  #hosts: ["node1:9200"] 
 output.logstash: 
  hosts: ["node1:10515"]

上面的配置需要注意几点:

  • pattern:配置的正则表达式,是为了合并异常信息(而不是单行显示),匹配以"{开头的字符串(判断是否 Json 格式),如果匹配不到的话,就进行合并行。

  • document_type:配置的是 Elasticsearch 的 Type 值,方便 Elasticsearch 对日志数据的归类。

  • logtype:新增的字段,用于 Filebeat 和 Logstash 之间传递参数,进行过滤的判断逻辑。

Logstash 示例配置(vi /etc/logstash/conf.d/logstash.conf):

input { 
 beats { 
   port => 10515 
  } 
} 
filter { 
  if [fields][logtype] == "syslog" { 
    grok { 
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:/[%{POSINT:syslog_pid}/])?: %{GREEDYDATA:syslog_message}" } 
      add_field => [ "received_at", "[email protected]}" ] 
      add_field => [ "received_from", "%{host}" ] 
    } 
    syslog_pri { } 
    date { 
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ] 
    } 
  } 
  if [fields][logtype] == "spring-boot-log4j2" { 
    json { 
      source => "message" 
      target => "data" 
    } 
  } 
} 
output { 
  if [fields][logtype] == "spring-boot-log4j2"{ 
    elasticsearch { 
      hosts => ["127.0.0.1:9200"] 
      index => "spring-boot-log4j2-%{+YYYY.MM.dd}" 
    } 
  } 
 
  if [fields][logtype] == "syslog"{ 
    elasticsearch { 
      hosts => ["127.0.0.1:9200"] 
      index => "filebeat-%{+YYYY.MM.dd}" 
    } 
  } 
}

上面的配置需要注意几点:

  • logstash.conf:配置文件可以配置多个,input、filter和output可以单独文件配置。

  • fields logtype:就是上面 Filebeat 配置的字段,这边用来判断服务来源,然后进行单独的处理。

  • filter:过滤器做了两件事,一个是使用grok插件,匹配数据和增加字段值,另一个就是使用json插件,将字符串转换成 Json 对象(会创建data层级结构,如果不想新建层级的话,删掉target配置即可)。

  • output:根据logtype判断,输出到指定的 Elasticsearch 地址,以及创建指定的索引。

简单总结下, Filebeat 是客户端,一般部署在 Service 所在服务器(有多少服务器,就有多少 Filebeat),不同 Service 配置不同的input_type(也可以配置一个),采集的数据源可以配置多个,然后 Filebeat 将采集的日志数据,传输到指定的 Logstash 进行过滤,最后将处理好的日志数据,存储到指定的 Elasticsearch。


好了,下面我们测试下上面的配置,是否可行。

Logstash 和 Filebeat 配置好之后,重启一下:

$ systemctl restart logstash &&  systemctl restart filebeat

Spring Boot 中log4j2.xml中的配置:

<?xml version="1.0" encoding="UTF-8"?> 
<Configuration status="OFF" monitorInterval="30"> 
<Properties> 
<Property name="LOG_PATTERN">%m%n%ex</Property> 
<Property name="LOG_FILE_PATH">/Users/xishuai/Downloads/logs</Property> 
</Properties> 
<Appenders> 
<Console name="ConsoleAppender" target="SYSTEM_OUT" follow="true"> 
<PatternLayout pattern="${LOG_PATTERN}"/> 
</Console> 
<RollingFile name="FileAppender" fileName="${LOG_FILE_PATH}/spring-boot-log4j2-demo.log" filePattern="${LOG_FILE_PATH}/spring-boot-log4j2-demo-%d{yyyy-MM-dd}-%i.log"> 
<PatternLayout> 
<Pattern>${LOG_PATTERN}</Pattern> 
</PatternLayout> 
<Filters> 
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" /> 
</Filters> 
<Policies> 
<SizeBasedTriggeringPolicy size="10MB" /> 
<TimeBasedTriggeringPolicy interval="1" /> 
</Policies> 
<DefaultRolloverStrategy max="10"/> 
</RollingFile> 
</Appenders> 
<Loggers> 
<Root level="ERROR"> 
<AppenderRef ref="ConsoleAppender" /> 
<AppenderRef ref="FileAppender"/> 
</Root> 
</Loggers> 
</Configuration>

配置说明可以参考之前的文章,这边的LOG_PATTERN配置改为了%m%n%ex,直接输出日志信息或异常信息。

测试代码:

@Log4j2 @RestController @EnableDiscoveryClient @SpringBootApplication public class SpringBootLog4j2Application implements ApplicationRunner { 
 public static void main(String[] args) { 
        SpringApplication.run(SpringBootLog4j2Application.class, args); 
    } 
 @Override public void run(ApplicationArguments applicationArguments) throws Exception { 
        logger.debug("Debugging log"); 
        logger.info("Info log"); 
        logger.warn("Hey, This is a warning!"); 
        logger.error("jack! We have an Error. OK"); 
        logger.fatal("xishuai! Fatal error. Please fix me."); 
    } 
 
@RequestMapping("/log") public String log() { 
 
        log.error("{/"msg/":/"出现一个异常错误:请求连接失败/",/"level/":/"ERROR/",/"createTime/":/"2018-5-21 20:22:22/",/"provider/":/"xishuai/",/"ip/":/"192.168.1.11/",/"stackTrace/":/"java.lang.Exception//n//tat com.example.log_demo.LogDemoTests.logCustomField(LogDemoTests.java:33)//n//tat org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)//n//tat org.junit.runner.JUnitCore.run(JUnitCore.java:137)//n//tat com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)//n//tat com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)//n//tat com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)//n//tat com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)//n/",/"tag/":/"/",/"url/":/"/"}"); 
 return "Hello World ----spring-boot-log4j2"; 
    } 
}

启动服务,然后访问http://localhost:8280/log,手动产生一条日志数据(Json 格式)。

可以看到,有一条新的索引spring-boot-log4j2-2018.05.21产生。

接着,我们使用 Kibana,创建一个索引模版(“spring-boot-log4j2-*):

ELK 架构之 Logstash 和 Filebeat 配置使用(采集过滤)详解架构师

然后,我们就可以看到日志信息了:

ELK 架构之 Logstash 和 Filebeat 配置使用(采集过滤)详解架构师

第一条日志数据中,红框里面是我们输出的日志信息,测试代码中总共输出了 5 种日志级别的信息,因为配置文件中设置的日志级别是ERROR,所以这边ERROR以下的日志不会输出,另外,因为设置了日志匹配规则,两条数据都不是以"{开头,这边就将两条日志数据,合并成一条了。

我们再看一下自定义输出的日志信息(Json 格式):

ELK 架构之 Logstash 和 Filebeat 配置使用(采集过滤)详解架构师

测试代码中输出的是 Json 字符串,经过 Logstash 过滤处理之后,就转换成 Json对象了。

ELK 架构之 Logstash 和 Filebeat 配置使用(采集过滤)详解架构师

另外,我们还可以data.level:ERROR这样格式进行搜索,或者data.msg:(错误)格式进行模糊搜索。


还需要注意的是,上面多行合并的配置是在 Filebeat 中,如果每个服务都是一样的规则,那么每台服务器都需要配置,如果规则更改了,这样每台服务器的 Filebat 配置都需要更改,就比较不方便。

Logstash 也提供了多行合并的配置功能,我们只需要这样配置(使用codec/multiline):

input { 
 beats { 
   port => 10515 
   codec => multiline { 
            pattern => '^/s*({")' 
            negate => true 
            what => "previous" 
        } 
  } 
}

效果和 Filebeat 配置是一样的。

如果日志组件使用 Log4j/Log4j2,Logstash 还提供了另一种处理 Log4j 的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理 TCP 端口接收的数据。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/6795.html

(0)
上一篇 2021年7月17日
下一篇 2021年7月17日

相关推荐

发表回复

登录后才能评论