logstash实时采集log4j日志并存入elasticsearch案例实战

首先给大家推荐一下我老师大神的人工智能教学网站。教学不仅零基础,通俗易懂,而且非常风趣幽默,还时不时有内涵黄段子!点这里可以跳转到网站

Logstash实时采集log4j日志配置

Java应用端log4j配置

properties文件配置方式

#请使用该socket log4j.appender.socket=org.apache.log4j.net.SocketAppender#logstash服务主机端口号log4j.appender.socket.Port=4567#logstash服务主机log4j.appender.socket.RemoteHost=168.7.1.67log4j.appender.socket.ReconnectionDelay=10000#输出类名、行、文件名等字段信息log4j.appender.socket.LocationInfo=true

xml文件配置方式

#appender-ref 中引用 socketAppender<appender name="socketAppender" class="org.apache.log4j.net.SocketAppender">	<param name="remoteHost" value="168.7.1.67" /><!-- 远程主机地址 -->	<param name="port" value="4567" /><!-- 远程主机端口 -->	<!-- 请自定义该项 -->	<param name="Threshold" value="DEBUG" />	<param name="ReconnectionDelay" value="10000" />	<param name="LocationInfo" value="true" /></appender>

Logstash服务端配置

conf文件配置

input {    log4j {        mode => "server"        host => "168.37.1.67"        port => 4567		codec => plain { charset => "GB2312" }    }}filter { 	#判断trade.log	if [method] == "execute" and (![stack_trace]) {		grok {			match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }			match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }			match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }				match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }			match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }			remove_field  => ["message","thread","class","file","method"]			add_field => [ "type", "tradelog" ]		}			} 		#判断error.log	else if [logger_name] == "error.except" and (![stack_trace]) {		kv {			source => "message"			field_split => "\|"			value_split => "="			remove_field  => ["message","thread","class","file","method"]			add_field => [ "type", "errorlog" ]		}	} 		#不合条件的多余的消息不往下执行	else {		drop {}	}		#解析时间字段	date {		match => ["timestamp","UNIX_MS"]		remove_field => "timestamp"	} }output { 	stdout{codec=> rubydebug}		if [type] == "tradelog" {		elasticsearch {			index => "log4j-tradelog"			hosts => ["168.37.1.67:9200"]						manage_template => true			template_overwrite => true			template => "/home/elk/myconf/tradelog_template.json"		}	}	if [type] == "errorlog" {		elasticsearch {			index => "log4j-errorlog"			hosts => ["168.37.1.67:9200"]						manage_template => true			template_overwrite => true			template => "/home/elk/myconf/errorlog_template.json"		}	}	}

input

使用log4j插件,开启一个服务,接收java应用端源源不断发送过来的数据
mode:可以是server 或者 client ,client模式会主动请求java应用端获取日志,server模式会接收java应用发过来的日志
host::启动logstash服务的主机,即localhost
port: 端口号
codec: 设置编码

filter

因为接收到的日志有多种不需要的日志,所以用条件判断进行过滤

trade日志

判断条件是method字段值是execute,并且没有stack_trace字段。

if [method] == "execute" and (![stack_trace]) {	grok {		match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }		match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }		match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }			match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }		match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }		remove_field  => ["message","thread","class","file","method"]		add_field => [ "type", "tradelog" ]	}		}
grok插件

因为该日志中有5种格式,以最后几个字段为例说明:

交易名|登录名|编号|ip地址|mac地址|返回结果|异常信息
交易名|登录名|编号|ip地址|mac地址|返回结果|
交易名|登录名|编号|ip地址|mac地址| 
交易名|ip地址|mac地址|返回结果|
交易名|ip地址|mac地址| 

所以采用5种正则规则去匹配,logstash默认会从上到下按规则去匹配,直到匹配上为止。(注意5种正则规则的上下顺序,下面的规则放在上面会导致可能内容解析不全,比如源数据是:请求交易名|操作员登录名|操作员编号|ip地址|mac地址|返回结果|异常信息,如果按照“请求交易名|ip地址|mac地址|”规则去匹配,只能识别出3个字段,而且匹配成功,不继续往下执行,这样识别的内容就不全) remove_field:解析完成后移除一些字段:”message”,”path”,”thread”,”class”,”file”,”method”
add_field: 给type字段覆盖添加值tradelog,便于后面输出的时候做条件判断。

error日志

判断条件是logger_name字段值是error.except,并且没有stack_trace字段。

else if [logger_name] == "error.except" and (![stack_trace]) {	kv {		source => "message"		field_split => "\|"		value_split => "="		remove_field  => ["message","thread","class","file","method"]		add_field => [ "type", "errorlog" ]	}}
kv插件

kv插件用来解析key-value形式的内容
source:要解析的字段内容。
field_split:以“|”为分隔符切分键值对
value_split:以“=”为分割符分割出key和value,key作为新的字段名,value作为该字段的值。
remove_field:删除不需要的字段。
add_field:给type字段覆盖添加值errorlog,便于后面输出的时候做条件判断。

其他日志

drop插件

不符合条件的日志使用drop插件丢弃掉。

date插件

以UNIX_MS格式解析原始时间字段timestamp,解析成Date类型数据并赋值给@timestamp字段,并移除原始字段timestamp。

output

stdout插件

标准输出,便于排错监控。

elasticsearch插件

这里做了条件判断,根据type字段内容不同,输出到elasticsearch的不同索引里去。
index:要输入的es索引,如果索引不存在会自动创建索引
hosts:es地址,有多个节点配置多个节点
template:指定elasticsearch的mapping模板文件,如果该索引不存在,logstash会根据这个mapping模板去自动创建索引。

点这里可以跳转到人工智能网站

发表评论