Logstash — Unstructured Multiple Log Lines as one Structured Record
Process multiple lines of unstructured logs and combine them as one structured record prior stashing them
Prologue
Log centralisation plays a key role in modern application support, analysis, and monitoring. ELK (Elasticsearch, Logstash, Kibana) stack has become one of the trending monitoring tool stacks that caters log centralisation use case. In such a setup Logstash is typically the one to receive log data for collecting, parsing, and transforming them into structured and meaningful data prior ingesting them to Elasticsearch for stashing.
Logstash, in my opinion, is equipped with a toolset to process any kind of machine generated unstructured data into stash-able structured data. Machine generated unstructured data in the sense, there should be some pattern in logs even though it could be something complex that spans through several lines.
Introduction
In this blog we will focus on parsing unstructured log data that spans through two or more lines. Here we will be discussing how we can construct a Logstash data pipeline configuration to parse the following log file.
What is a Logstash Data Pipeline Configuration?
Logstash data pipeline configuration is a .conf file that we feed to Logstash by putting them under the ‘pipeline’ folder in a logstash setup. One configuration file contains three sections as follows,
- Input
- Filter
- Output
Input section instruct Logstash how to listen/query for logs. Recieved or obtained log data is passed to the Filter section. Filter section is responsible for parsing, collecting, and trasforming the log data. Finally the Output section ingest the data to the stashing endpoint (in our case it is Elasticsearch).
For further details refer to Logstash official documentation https://www.elastic.co/guide/en/logstash/7.x/introduction.html
Parse Unstructured Log Lines
As you can see in the above sample log file, the data is not structured. It seems like a text file with some useful information.
However, when it comes to .csv, .json, and .xml files, we can find Logstash plugins available for parsing them.
Grok Filter Plugin
So in this case the best bet we have is to use the grok filter plugin to extract the useful information from the logs. You can find the official documentation at plugins-filters-grok.
The grok plugin is a regex parsing tool. Fear not! you do not need to learn writing complex regexes because common patterns are available at your expose. You can directly use these common patterns in your grok filter as follows,
mutate {
gsub => ["message","/"," "]
}grok {
match => {
"message" => ["%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}started%{SPACE}on%{SPACE}(?<start_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:start_time}%{SPACE}in%{SPACE}subsystem%{SPACE}%{WORD:subsystem_name}%{SPACE}in%{SPACE}%{WORD:mainsystem_name}%{GREEDYDATA}","%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}ended%{SPACE}on%{SPACE}(?<end_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:end_time};%{SPACE}%{NUMBER:elapsed_time_sec}%{SPACE}seconds used; end code%{SPACE}%{INT:return_code}%{SPACE}%{GREEDYDATA}" ]
}
}
Above is a complex grok filter that I have constructed to filter the above sample log file.
With Predefined Patterns:
Here the most important thing is if you want to use a predefined pattern (you can find predefined patterns at grok-patterns) then use the following syntax,
%{SYNTAX:field_name}e.g.
%{NUMBER:duration}%{SPACE}%{IP:client}
With Custom Patterns:
In case if you can’t find any predefined pattern, you can extract information using your own custom patterns using the following syntax,
(?<field_name>the pattern here)e.g.
(?<start_date>\d\d \d\d \d\d)
Now you know how to extract the useful information into field-names. The next concern here as you can see is, there are two possible matches and we want to combine them into one record before ingesting them to Elasticsearch.
Combine Multiple Lines into One Record
In the uses cases where you want to combine multiple lines into one record before ingesting to Elasticsearch, you can accomplish that using aggregate filter plugin.
Aggregate Filter Plugin
In the documentation (plugins-filters-aggregate) you can find how you should utilise aggregate filter plugin in different use cases. Here I will point out several important facts that you must know to get the best out of the aggregate filter.
if "_grokparsefailure" in [tags] {
drop { }
}if [log_type] == "INFO" {
aggregate {
task_id => "%{job_id}"
code => "
map['job_id'] = event.get('job_id')
map['data_center'] = event.get('[fields][logs_from]')
map['application'] = event.get('subsystem_name')
map['node_id'] = event.get('[fields][logs_from]')
map['order_date'] ||= event.get('start_date')
map['order_id'] ||= event.get('start_date')
event.cancel()
"
map_action => "create"
}}if [log_type] == "COMPLETION" { aggregate {
task_id => "%{job_id}"
code => "
map['order_date'] ||= event.get('end_date')
map['order_id'] ||= event.get('end_date')
map['job_status'] = event.get('return_code')
map['elapsed_time_sec'] = event.get('elapsed_time_sec')items_to_remove = ['message', 'm_id', 'log_type', 'job_id', 'user', 'job_mem_name', 'end_date', 'end_time', 'elapsed_time_sec', 'return_code']
items_to_remove.each{|item| event.remove(item)}map.each{|field, value| event.set(field, value)}
"
map_action => "update"
end_of_task => true
timeout => 120
}
}if "_aggregatetimeout" in [tags] {
drop { }
}
Conceptually in the aggregate filter what happens is, it maintains a map against a given task_id.
The term event is used here to indicate the Logstash event. In other words, after log parsing the result contains in the event object as key value pairs.
One event should create this map against a task_id. This is achieved by having map_action => “create”
. See the if [log_type] == "INFO"
section.
Then another event should end this by specifying end_of_task => true
. See the if [log_type] == "COMPLETION"
section.
In between create and end_of_task you can have other events which update the map. In events where it should update the map you should specify map_action => "update"
.
In any of map create, map update, and end_of_task events you can do changes to the map and to the event by providing a code => ""
section.
What goes in code section is Ruby syntax because Logstash is written in Ruby.
Logstash will ingest every event into Elasticsearch unless you explicitly cancel the event by providing code => "event.cancel()"
. In order to make sure only combined records get ingested to Elasticsearch you need to cancel the event except the end_of_task => true event.
What if there were no any end_of_task => true event encountered? Will that mean that them map live in the memory forever?
Here timeouts come in for our rescue. You can specify like
timeout => 120
after which Logstash will emit the event with _aggregatetimeout tag.
There is quite number of variations to the above described behaviours. You can refer to the documentation and find what works for you if you understand the above building blocks.
Note:
Make sure the Logstash instance that runs your aggregate pipelines should have only one pipeline worker. Otherwise you will encounter discrepancies, inconsistencies, and unexpected behaviours.
Conclusion
Hope you have got an overview idea on processing multiple log lines and combining them into a single record before stashing. Please find my complete Logstash (version: 6.8) config file in the following. Cheers!