http://pastebin.com/A5pRDv2P
###INPUT###
input {
tcp {
port => 514
type => "syslog-relay"
}
udp {
port => 514
type => "syslog-relay"
buffer_size => 16384
}
gelf {
port => 12201
type => "gelf"
}
}
filter {
grep {
type => "syslog-relay"
match => [ "@message", ":\s\%ASA-" ]
add_tag => "got_syslog_cisco"
drop => false
}
grep {
type => "syslog-relay"
match => [ "@message", ":\s\%ASA-" ]
add_tag => "got_syslog_standard"
drop => false
negate => true
}
# strip the syslog PRI part
grok {
type => "syslog-relay"
pattern => [ "(?m)<%{POSINT:syslog_pri:int}>(?:%{SPACE})%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_pri"
add_field => [ "syslog_raw_message", "%{@message}" ]
}
syslog_pri {
type => "syslog-relay"
tags => [ "got_syslog_pri" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_pri" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_pri" ]
remove => [ "message_remainder" ]
}
# strip the syslog timestamp and force event timestamp to be the same.
# the original string is saved in field %{syslog_timestamp}.
# the original logstash input timestamp is saved in field %{received_at}.
grok {
# put cisco log timestamp in cisco_syslog_timestamp as ES can't store 2 format of dates in the same field
# also parse the hostname if present....
type => "syslog-relay"
tags => [ "got_syslog_cisco" ]
pattern => [ "(?m)%{SYSLOGTIMESTAMPWITHYEAR:cisco_syslog_timestamp}(\s+%{SYSLOGHOST:syslog_hostname}\s+\:|\:)?\s+%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_timestamp"
add_field => [ "received_at", "%{@timestamp}" ]
}
grok {
# put log timestamp in syslog_timestamp
type => "syslog-relay"
tags => [ "got_syslog_standard" ]
pattern => [ "(?m)%{TIMESTAMP_RFC3339:syslog_timestamp}%{SPACE}%{GREEDYDATA:message_remainder}", "(?m)%{SYSLOGTIMESTAMPWITHOUTYEAR:syslog_timestamp}%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_timestamp"
add_field => [ "received_at", "%{@timestamp}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_timestamp" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_timestamp" ]
remove => [ "message_remainder" ]
}
date {
# parse the cisco_syslog_timestamp
type => "syslog-relay"
tags => [ "got_syslog_timestamp" , "got_syslog_cisco" ]
cisco_syslog_timestamp => [ "MMM dd yyyy HH:mm:ss", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
}
date {
# parse the syslog_timestamp
type => "syslog-relay"
tags => [ "got_syslog_timestamp", "got_syslog_standard" ]
syslog_timestamp => [ "MMM dd yyyy HH:mm:ss", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
}
# strip the host field from the syslog line.
# the extracted host field becomes the logstash %{@source_host} metadata
# and is also available in the filed %{syslog_hostname}.
# the original logstash source_host is saved in field %{logstash_source}.
grok {
type => "syslog-relay"
tags => [ "got_syslog_standard" ]
pattern => [ "(?m)%{SYSLOGHOST:syslog_hostname}%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_host"
add_field => [ "logstash_source", "%{@source_host}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_host" ]
replace => [ "@source_host", "%{syslog_hostname}", "@message", "%{message_remainder}" ]
#replace => [ "@message", "%{message_remainder}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_host" ]
remove => [ "message_remainder" ]
}
# strip the app name and set it in syslog_file_name field to compute the local log file name
grok {
# do the stip multiline for standard syslog
# program can still be like "program_main/program_param"
type => "syslog-relay"
tags => [ "got_syslog_standard" ]
pattern => [ "(?m)%{SYSLOGPROG:syslog_program}\:%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => [ "got_syslog_program", "%{program}" ]
add_field => [ "syslog_file_name", "%{program}" ]
}
grok {
# split the main and param part of the program
type => "syslog-relay"
tags => [ "got_syslog_program" ]
match => ["program", "%{MULTIPROG}" ]
add_tag => [ "got_syslog_program_param", "%{program_main}", "%{program_param}" ]
}
grok {
# do the strip single line for cisco syslog
type => "syslog-relay"
tags => [ "got_syslog_cisco" ]
pattern => [ "\%%{SYSLOGPROG:syslog_program}\:%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => [ "got_syslog_program", "%{program}" ]
add_field => [ "syslog_file_name", "%{program}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_program" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
type => "syslog-relay"
tags => [ "got_syslog_timestamp" ]
remove => [ "message_remainder" ]
}
#############################################################
#
# Jboss logs = tag JBOSSserver
#
#############################################################
# try to get multilines back
multiline {
# match 2012-07-30 10:29:55,985
type => "syslog-relay"
tags => "JBOSSserver"
pattern => "^([0-9][0-9]-[a-zA-Z][a-zA-Z][a-zA-Z]-2012|\d{4}-\d{2}-\d{2}\s\d{2}\:\d{2}\:\d{2}|[a-zA-Z]{3}\s\d{2},\s\d{4})"
negate => true
what => "previous"
}
# remove logs which are malformed stacktraces
grep {
# tag the malformed stacktrace
type => "syslog-relay"
tags => [ "JBOSSserver" ]
match => [ "@message", "java\.lang\.Throwable" ]
add_tag => "got_syslog_stacktrace"
drop => false
negate => false
}
# Parse jboss messages
grok {
type => "syslog-relay"
tags => [ "JBOSSserver" ]
pattern => [ "(?m)%{JBOSSSERVERLOG}" ]
}
mutate {
# remove the timestamp at the begining of the message
# doing this completly remove timestamp of errors in the file output module
type => "syslog-relay"
tags => [ "JBOSSserver" ]
replace => [ "@message", "%{jboss_loglevel} [%{jboss_class}] %{jboss_caller}: %{jboss_message}" ]
}
mutate {
type => "syslog-relay"
tags => [ "JBOSSserver" ]
remove => [ "jboss_message" ]
}
# set the date to the Jboss error date
date {
type => "syslog-relay"
tags => [ "JBOSSserver" ]
# season to taste for your own syslog format(s)
jboss_timestamp => [ "yyyy-MM-dd HH:mm:ss,SSS" ]
}
#############################################################
#
# Tomcat
#
#############################################################
# define multiline messages starting at the date
# Feb 28, 2012 2:07:33 PM org.apache.jk.common.ChannelSocket processConnection
# WARNING: processCallbacks status 2
# 2012-02-28 14:10:27,723 DEBUG [shq.servlet.GetResourceFlex] - <Ressource demandee : /sde/>
multiline {
type => "syslog-relay"
tags => "Tomcat"
pattern => "^([0-9][0-9]-[a-zA-Z][a-zA-Z][a-zA-Z]-2012|\d{4}-\d{2}-\d{2}\s\d{2}\:\d{2}\:\d{2}|[a-zA-Z]{3}\s\d{2},\s\d{4})"
negate => true
what => "previous"
}
#############################################################
#
# OUD
#
#############################################################
# OUD logs are XML inside <record> </record>
multiline {
type => "syslog-relay"
tags => "OUDSERVER"
pattern => "\<\/record\>"
negate => false
what => "previous"
}
#############################################################
#
# SHQ Synapse
#
#############################################################
# OUD logs are XML inside <record> </record>
multiline {
type => "syslog-relay"
tags => "synapse"
pattern => "\<\/record\>"
negate => false
what => "previous"
}
multiline {
type => "syslog-relay"
tags => "oud"
pattern => "\<\/record\>"
negate => false
what => "previous"
}
# synapse/main tagged logs
# 2012-06-21 13:04:25,024 [10.100.64.74-qxpsbp01] [HttpServerWorker-9] INFO
multiline {
type => "syslog-relay"
tags => "synapse/main"
pattern => "^([0-9][0-9]-[a-zA-Z][a-zA-Z][a-zA-Z]-2012|\d{4}-\d{2}-\d{2}\s\d{2}\:\d{2}\:\d{2}|[a-zA-Z]{3}\s\d{2},\s\d{4})"
negate => true
what => "previous"
}
multiline {
type => "syslog-relay"
tags => "synapse/service"
pattern => "^([0-9][0-9]-[a-zA-Z][a-zA-Z][a-zA-Z]-2012|\d{4}-\d{2}-\d{2}\s\d{2}\:\d{2}\:\d{2}|[a-zA-Z]{3}\s\d{2},\s\d{4})"
negate => true
what => "previous"
}
# synapse service.log
grok {
type => "syslog-relay"
tags => [ "synapse/main" ]
pattern => [ "(?m)%{SYNAPSESERVICELOG}" ]
}
# synapse service.log
grok {
type => "syslog-relay"
tags => [ "synapse/service" ]
pattern => [ "(?m)%{SYNAPSESERVICELOG}" ]
}
# synapse wrapper.log
grok {
type => "syslog-relay"
tags => [ "synapse/wrapper" ]
pattern => [ "(?m)%{SYNAPSESERVICELOG}" ]
}
# synapse trace.log
grok {
type => "syslog-relay"
tags => [ "synapse/trace" ]
pattern => [ "(?m)%{SYNAPSETRACELOG}" ]
}
# set the date to the SYNAPSE error date
date {
type => "syslog-relay"
tags => [ "synapse/main" ]
# season to taste for your own syslog format(s)
jboss_timestamp => [ "yyyy-MM-dd HH:mm:ss,SSS" ]
}
date {
type => "syslog-relay"
tags => [ "synapse/service" ]
# season to taste for your own syslog format(s)
jboss_timestamp => [ "yyyy-MM-dd HH:mm:ss,SSS" ]
}
date {
type => "syslog-relay"
tags => [ "synapse/wrapper" ]
# season to taste for your own syslog format(s)
jboss_timestamp => [ "yyyy-MM-dd HH:mm:ss,SSS" ]
}
#############################################################
#
# Other messages
#
#############################################################
# rebuild multiline messages
multiline {
type => "gelf"
pattern => "^\s"
what => "previous"
}
}
output {
# stdout {
# }
# gelf {
# chunksize => 1420
# facility => "logstash-gelf" #########Default Setting ##########
# host => "qxplog02.corp.shq.local"
# level => "INFO" #########Default Setting ##########
# port => 12201
# sender => "%{@source_host}"
# }
elasticsearch {
host => "localhost"
embedded => false
}
file {
flush_interval => 10
tags => ["got_syslog_standard"]
path => "/opt/data/syslog/%{+YYYY}/%{+MM}/%{+dd}/%{@source_host}/%{syslog_file_name}.log"
message_format => "%{@timestamp} %{@source_host} %{@message}"
}
file {
flush_interval => 10
tags => ["got_syslog_cisco"]
path => "/opt/data/syslog/%{+YYYY}/%{+MM}/%{+dd}/%{@source_host}/%{program}.log"
message_format => "%{@timestamp} %{@source_host} %{@message}"
}
}
Nice article, concisely describes how to use logstash grok stuff. I was wondering how large your log volume is. And if you had any issues setting up elasticsearch to handle the volume. We have a stupid amount of log volume and had issues getting elasticsearch to handle it without building an elasticsearch cluster dedicated to logs.
Elasticsearch occupies a pretty large space in order to store these logs. unfortunately the most easy solution is to build a elasticsearch cluster. But also you can look at logstash processing and leaving out unnecessary logs, for example load balancer health checks etc…
can you expand your example in order to put the ssl_chiper and request_time in additional fields too?
my logformat looks like this:
‘$host $remote_addr – $remote_user [$time_local] “$request” ‘
‘$status $body_bytes_sent “$http_referer” ‘
‘”$http_user_agent” “$http_x_forwarded_for” ‘
‘$ssl_cipher $request_time ‘
‘$gzip_ratio $upstream_addr $upstream_response_time’;
I am not able to catch any field after http_user_agent :/
This is my logstash filter
“%{IPORHOST:host} %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \”%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\” %{NUMBER:response} (?:%{NUMBER:bytes}|-) \”(?:%{URI:referrer}|-)\” %{QS:agent} %{QS:x_forwarded_for} %{USER:ssl_chiper} %{NUMBER:request_time} %{NUMBER:gzip_rato} %{IPORHOST:upstream} %{NUMBER:upstream_request_time}”
Based on my understanding from the above log format, the “%{USER:ssl_chiper}” has to be replaced by “%{QUOTEDSTRING:ssl_chiper}”. In case, you are still having issues, can you post a sample line from the log file for which it is failing.
How to create a custom field and fill it with dynamic data from the log message.
Sample log message given below. I want to add one field for “client IP” filled with client IP address, “Event ID” filled with event ID number in the below example “675″, “Username” filled with Username, “Service Name” filled with service name from the log.
Your help on this is highly appreciated.
Log:
MSWinEventLog 1 Security 15596139 Mon Aug 06 11:21:48 2012 675 Security SYSTEM User Failure Audit XXXXX1 Account Logon Pre-authentication failed: User Name: xxxxxxxxx User ID: %{S-1-5-21-1058243859-2292328430-792808483-12929} Service Name: krbtgt/XXX Pre-Authentication Type: 0×0 Failure Code: 0×19 Client Address: 10.X.X.X 15534664