Logstash — Unstructured Multiple Log Lines as one Structured Record

Process multiple lines of unstructured logs and combine them as one structured record prior stashing them

Thilina Madumal
5 min readNov 6, 2020

Prologue

Log centralisation plays a key role in modern application support, analysis, and monitoring. ELK (Elasticsearch, Logstash, Kibana) stack has become one of the trending monitoring tool stacks that caters log centralisation use case. In such a setup Logstash is typically the one to receive log data for collecting, parsing, and transforming them into structured and meaningful data prior ingesting them to Elasticsearch for stashing.

Logstash, in my opinion, is equipped with a toolset to process any kind of machine generated unstructured data into stash-able structured data. Machine generated unstructured data in the sense, there should be some pattern in logs even though it could be something complex that spans through several lines.

Introduction

In this blog we will focus on parsing unstructured log data that spans through two or more lines. Here we will be discussing how we can construct a Logstash data pipeline configuration to parse the following log file.

sample log file

What is a Logstash Data Pipeline Configuration?

Logstash data pipeline configuration is a .conf file that we feed to Logstash by putting them under the ‘pipeline’ folder in a logstash setup. One configuration file contains three sections as follows,

  • Input
  • Filter
  • Output

Input section instruct Logstash how to listen/query for logs. Recieved or obtained log data is passed to the Filter section. Filter section is responsible for parsing, collecting, and trasforming the log data. Finally the Output section ingest the data to the stashing endpoint (in our case it is Elasticsearch).

For further details refer to Logstash official documentation https://www.elastic.co/guide/en/logstash/7.x/introduction.html

Parse Unstructured Log Lines

As you can see in the above sample log file, the data is not structured. It seems like a text file with some useful information.

However, when it comes to .csv, .json, and .xml files, we can find Logstash plugins available for parsing them.

Grok Filter Plugin

So in this case the best bet we have is to use the grok filter plugin to extract the useful information from the logs. You can find the official documentation at plugins-filters-grok.

The grok plugin is a regex parsing tool. Fear not! you do not need to learn writing complex regexes because common patterns are available at your expose. You can directly use these common patterns in your grok filter as follows,

mutate {
gsub => ["message","/"," "]
}
grok {
match => {
"message" => [
"%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}started%{SPACE}on%{SPACE}(?<start_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:start_time}%{SPACE}in%{SPACE}subsystem%{SPACE}%{WORD:subsystem_name}%{SPACE}in%{SPACE}%{WORD:mainsystem_name}%{GREEDYDATA}","%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}ended%{SPACE}on%{SPACE}(?<end_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:end_time};%{SPACE}%{NUMBER:elapsed_time_sec}%{SPACE}seconds used; end code%{SPACE}%{INT:return_code}%{SPACE}%{GREEDYDATA}" ]
}
}

Above is a complex grok filter that I have constructed to filter the above sample log file.

With Predefined Patterns:

Here the most important thing is if you want to use a predefined pattern (you can find predefined patterns at grok-patterns) then use the following syntax,

%{SYNTAX:field_name}e.g.
%{NUMBER:duration}%{SPACE}%{IP:client}

With Custom Patterns:

In case if you can’t find any predefined pattern, you can extract information using your own custom patterns using the following syntax,

(?<field_name>the pattern here)e.g.
(?<start_date>\d\d \d\d \d\d)

Now you know how to extract the useful information into field-names. The next concern here as you can see is, there are two possible matches and we want to combine them into one record before ingesting them to Elasticsearch.

Combine Multiple Lines into One Record

In the uses cases where you want to combine multiple lines into one record before ingesting to Elasticsearch, you can accomplish that using aggregate filter plugin.

Aggregate Filter Plugin

In the documentation (plugins-filters-aggregate) you can find how you should utilise aggregate filter plugin in different use cases. Here I will point out several important facts that you must know to get the best out of the aggregate filter.

if "_grokparsefailure" in [tags] {
drop { }
}
if [log_type] == "INFO" {

aggregate {
task_id => "%{job_id}"
code => "
map['job_id'] = event.get('job_id')
map['data_center'] = event.get('[fields][logs_from]')
map['application'] = event.get('subsystem_name')
map['node_id'] = event.get('[fields][logs_from]')
map['order_date'] ||= event.get('start_date')
map['order_id'] ||= event.get('start_date')
event.cancel()
"
map_action => "create"
}
}if [log_type] == "COMPLETION" { aggregate {
task_id => "%{job_id}"
code => "
map['order_date'] ||= event.get('end_date')
map['order_id'] ||= event.get('end_date')
map['job_status'] = event.get('return_code')
map['elapsed_time_sec'] = event.get('elapsed_time_sec')
items_to_remove = ['message', 'm_id', 'log_type', 'job_id', 'user', 'job_mem_name', 'end_date', 'end_time', 'elapsed_time_sec', 'return_code']
items_to_remove.each{|item| event.remove(item)}
map.each{|field, value| event.set(field, value)}
"
map_action => "update"
end_of_task => true
timeout => 120
}
}
if "_aggregatetimeout" in [tags] {
drop { }
}

Conceptually in the aggregate filter what happens is, it maintains a map against a given task_id.

The term event is used here to indicate the Logstash event. In other words, after log parsing the result contains in the event object as key value pairs.

One event should create this map against a task_id. This is achieved by having map_action => “create” . See the if [log_type] == "INFO" section.

Then another event should end this by specifying end_of_task => true . See the if [log_type] == "COMPLETION" section.

In between create and end_of_task you can have other events which update the map. In events where it should update the map you should specify map_action => "update" .

In any of map create, map update, and end_of_task events you can do changes to the map and to the event by providing a code => "" section.

What goes in code section is Ruby syntax because Logstash is written in Ruby.

Logstash will ingest every event into Elasticsearch unless you explicitly cancel the event by providing code => "event.cancel()" . In order to make sure only combined records get ingested to Elasticsearch you need to cancel the event except the end_of_task => true event.

What if there were no any end_of_task => true event encountered? Will that mean that them map live in the memory forever?

Here timeouts come in for our rescue. You can specify like timeout => 120 after which Logstash will emit the event with _aggregatetimeout tag.

There is quite number of variations to the above described behaviours. You can refer to the documentation and find what works for you if you understand the above building blocks.

Note:

Make sure the Logstash instance that runs your aggregate pipelines should have only one pipeline worker. Otherwise you will encounter discrepancies, inconsistencies, and unexpected behaviours.

Conclusion

Hope you have got an overview idea on processing multiple log lines and combining them into a single record before stashing. Please find my complete Logstash (version: 6.8) config file in the following. Cheers!

logstash pipeline config file

--

--