Logstash — Unstructured Multiple Log Lines as one Structured Record

Prologue

Log centralisation plays a key role in modern application support, analysis, and monitoring. ELK (Elasticsearch, Logstash, Kibana) stack has become one of the trending monitoring tool stacks that caters log centralisation use case. In such a setup Logstash is typically the one to receive log data for collecting, parsing, and transforming them into structured and meaningful data prior ingesting them to Elasticsearch for stashing.

Logstash, in my opinion, is equipped with a toolset to process any kind of machine generated unstructured data into stash-able structured data. Machine generated unstructured data in the sense, there should be some pattern in logs even though it could be something complex that spans through several lines.

Introduction

In this blog we will focus on parsing unstructured log data that spans through two or more lines. Here we will be discussing how we can construct a Logstash data pipeline configuration to parse the following log file.

sample log file

What is a Logstash Data Pipeline Configuration?

Logstash data pipeline configuration is a .conf file that we feed to Logstash by putting them under the ‘pipeline’ folder in a logstash setup. One configuration file contains three sections as follows,

  • Input
  • Filter
  • Output

Input section instruct Logstash how to listen/query for logs. Recieved or obtained log data is passed to the Filter section. Filter section is responsible for parsing, collecting, and trasforming the log data. Finally the Output section ingest the data to the stashing endpoint (in our case it is Elasticsearch).

For further details refer to Logstash official documentation https://www.elastic.co/guide/en/logstash/7.x/introduction.html

Parse Unstructured Log Lines

As you can see in the above sample log file, the data is not structured. It seems like a text file with some useful information.

However, when it comes to .csv, .json, and .xml files, we can find Logstash plugins available for parsing them.

Grok Filter Plugin

So in this case the best bet we have is to use the grok filter plugin to extract the useful information from the logs. You can find the official documentation at plugins-filters-grok.

The grok plugin is a regex parsing tool. Fear not! you do not need to learn writing complex regexes because common patterns are available at your expose. You can directly use these common patterns in your grok filter as follows,

mutate {
gsub => ["message","/"," "]
}
grok {
match => {
"message" => [
"%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}started%{SPACE}on%{SPACE}(?<start_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:start_time}%{SPACE}in%{SPACE}subsystem%{SPACE}%{WORD:subsystem_name}%{SPACE}in%{SPACE}%{WORD:mainsystem_name}%{GREEDYDATA}","%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}ended%{SPACE}on%{SPACE}(?<end_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:end_time};%{SPACE}%{NUMBER:elapsed_time_sec}%{SPACE}seconds used; end code%{SPACE}%{INT:return_code}%{SPACE}%{GREEDYDATA}" ]
}
}

Above is a complex grok filter that I have constructed to filter the above sample log file.

With Predefined Patterns:

Here the most important thing is if you want to use a predefined pattern (you can find predefined patterns at grok-patterns) then use the following syntax,

%{SYNTAX:field_name}e.g.
%{NUMBER:duration}%{SPACE}%{IP:client}

With Custom Patterns:

In case if you can’t find any predefined pattern, you can extract information using your own custom patterns using the following syntax,

(?<field_name>the pattern here)e.g.
(?<start_date>\d\d \d\d \d\d)

Now you know how to extract the useful information into field-names. The next concern here as you can see is, there are two possible matches and we want to combine them into one record before ingesting them to Elasticsearch.

Combine Multiple Lines into One Record

In the uses cases where you want to combine multiple lines into one record before ingesting to Elasticsearch, you can accomplish that using aggregate filter plugin.

Aggregate Filter Plugin

In the documentation (plugins-filters-aggregate) you can find how you should utilise aggregate filter plugin in different use cases. Here I will point out several important facts that you must know to get the best out of the aggregate filter.

if "_grokparsefailure" in [tags] {
drop { }
}
if [log_type] == "INFO" {

aggregate {
task_id => "%{job_id}"
code => "
map['job_id'] = event.get('job_id')
map['data_center'] = event.get('[fields][logs_from]')
map['application'] = event.get('subsystem_name')
map['node_id'] = event.get('[fields][logs_from]')
map['order_date'] ||= event.get('start_date')
map['order_id'] ||= event.get('start_date')
event.cancel()
"
map_action => "create"
}
}if [log_type] == "COMPLETION" { aggregate {
task_id => "%{job_id}"
code => "
map['order_date'] ||= event.get('end_date')
map['order_id'] ||= event.get('end_date')
map['job_status'] = event.get('return_code')
map['elapsed_time_sec'] = event.get('elapsed_time_sec')
items_to_remove = ['message', 'm_id', 'log_type', 'job_id', 'user', 'job_mem_name', 'end_date', 'end_time', 'elapsed_time_sec', 'return_code']
items_to_remove.each{|item| event.remove(item)}
map.each{|field, value| event.set(field, value)}
"
map_action => "update"
end_of_task => true
timeout => 120
}
}
if "_aggregatetimeout" in [tags] {
drop { }
}

Conceptually in the aggregate filter what happens is, it maintains a map against a given task_id.

The term event is used here to indicate the Logstash event. In other words, after log parsing the result contains in the event object as key value pairs.

One event should create this map against a task_id. This is achieved by having map_action => “create” . See the if [log_type] == "INFO" section.

Then another event should end this by specifying end_of_task => true . See the if [log_type] == "COMPLETION" section.

In between create and end_of_task you can have other events which update the map. In events where it should update the map you should specify map_action => "update" .

In any of map create, map update, and end_of_task events you can do changes to the map and to the event by providing a code => "" section.

What goes in code section is Ruby syntax because Logstash is written in Ruby.

Logstash will ingest every event into Elasticsearch unless you explicitly cancel the event by providing code => "event.cancel()" . In order to make sure only combined records get ingested to Elasticsearch you need to cancel the event except the end_of_task => true event.

What if there were no any end_of_task => true event encountered? Will that mean that them map live in the memory forever?

Here timeouts come in for our rescue. You can specify like timeout => 120 after which Logstash will emit the event with _aggregatetimeout tag.

There is quite number of variations to the above described behaviours. You can refer to the documentation and find what works for you if you understand the above building blocks.

Note:

Make sure the Logstash instance that runs your aggregate pipelines should have only one pipeline worker. Otherwise you will encounter discrepancies, inconsistencies, and unexpected behaviours.

Conclusion

Hope you have got an overview idea on processing multiple log lines and combining them into a single record before stashing. Please find my complete Logstash (version: 6.8) config file in the following. Cheers!

logstash pipeline config file

--

--

--

Senior Software Engineer

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

How to plan WITH essential tools to build a better Mobile Application?

Introduction to ZFinance (ZFI)

The Alexa Project, Part 2: Get Visual and Get An Alexa Endpoint in Just 5 Minutes

Floating Point vs. Fixed Point Arithmetics in PicoLisp

How to serve your website on port 80 or 443 using AWS Load Balancers

Strange things people do and still say they do Scrum

Will the programming job disappear?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Thilina Madumal

Thilina Madumal

Senior Software Engineer

More from Medium

Docker Desktop Replacement : Minikube

10 grep command usage examples that every Linux user, sysadmin and developer should be familiar…

Static and Dynamic libraries in C programming language — Linux

Run JavaFX via Terminal