Shipping Events to Logstash (Part 2)

2023-10-22

For your reference, below is a list of the articles in this series.

  1. Introduction to Logstash
  2. Overview of Logstash plugins
  3. Shipping Events to Logstash (Part 1)
  4. Shipping Events to Logstash (Part 2) (this article)

In this follow-up post, we will build upon our previous post: Shipping Events to Logstash.
Our objective now is to filter and extract valuable information from log data, specifically authentication failures, and identify the IP addresses making these requests. Additionally, we will save the filtered data both to a CSV file and an Elasticsearch index for further analysis and monitoring.

Create a file logstash-7.15.0/pipelines/es_transform.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
input {
elasticsearch {
hosts => "http://localhost:9200"
index => "linux"
query => '{"query": {"match": {"message": "authentication failure"}}}'
scroll => "5m"
size => 1000
}
}

filter {
grok {
match => { "message" => "tty=%{DATA:span_process} rhost=%{IP:host_name}" }
}

if [span_process] == "" or [host_name] == "" {
drop { }
}
}

output {
csv {
path => "/path/logstash-7.15.0/es_datapoints/auth_failures.csv"
fields => ["span_process", "host_name"]
}

elasticsearch {
hosts => "http://localhost:9200"
index => "auth_failures"
manage_template => true
}
}



Logstash Configuration

Input

To begin our analysis, we need to fetch log data from our Elasticsearch server. We can set up an Elasticsearch input plugin in our Logstash configuration to do this. Here’s the input configuration:

1
2
3
4
5
6
7
8
9
input {
elasticsearch {
hosts => "http://localhost:9200"
index => "linux"
query => '{"query": {"match": {"message": "authentication failure"}}}'
scroll => "5m"
size => 1000
}
}

In this configuration:

  • hosts specifies the Elasticsearch server’s URL.
  • index defines the index from which we want to fetch data, in our case, the “linux” index.
  • query allows us to narrow down our search to records containing the phrase “authentication failure” within the “message” field.
  • scroll sets the scroll time for our search results to 5 minutes.
  • size specifies the number of documents to retrieve per scroll.

Filter

Once we’ve fetched the data, we need to filter and structure it for our analysis. Logstash provides the powerful Grok filter for this purpose. Here’s how we configure the filter:

1
2
3
4
5
6
7
8
9
filter {
grok {
match => { "message" => "tty=%{DATA:span_process} rhost=%{IP:host_name}" }
}

if [span_process] == "" or [host_name] == "" {
drop { }
}
}

In this filter configuration:

  • We use Grok to extract specific fields from the “message” field, such as “span_process” (representing the ttyprocess) and “host_name” (the host from which the authentication was attempted).
  • We use conditional logic to check if either “span_process” or “host_name” is empty. If either condition is met, we drop the event. This step helps us exclude irrelevant or incomplete data.

Output

With the data filtered and structured, we can now save it for future analysis and visualization. We’ll save it in two ways: to a CSV file and to a new Elasticsearch index for historical data.

Here’s the output configuration:

1
2
3
4
5
6
7
8
9
10
11
12
output {
csv {
path => "/path/logstash-7.15.0/es_datapoints/auth_failures.csv"
fields => ["span_process", "host_name"]
}

elasticsearch {
hosts => "http://localhost:9200"
index => "auth_failures"
manage_template => true
}
}

In this output configuration:

  • The csv output plugin saves the data to a CSV file with the specified path and includes only the “span_process” and “host_name” fields.
  • The elasticsearch output plugin stores the same data in a new index called “auth_failures” in our Elasticsearch server. The manage_template option ensures that Logstash handles index templates automatically.

To confirm our data is saved in the right index

1
2
3
4
5
6
curl -X POST "http://localhost:9200/auth_failures/_search" -H "Content-Type: application/json" -d '{
"query": {
"match_all": {}
}
}'

Output:

1
2
{"took":2305,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1611,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"auth_failures","_type":"_doc","_id":"z68KWIsBNUfRC-kwet3W","_score":1.0,"_source":{"Date":"15","Month":"Jun","Component":"sshd(pam_unix)","host":"oluchi","tags":["_grokparsefailure"],"@version":"1","EventId":"E16","path":"/path/logstash-7.15.0/es_datapoints/Linux_2k.log_structured.csv","Time":"14:53:32","@timestamp":"2023-10-21T22:08:12.567Z","Level":"combo","EventTemplate":"authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=<*>","LineId":"48","message":"48,Jun,15,14:53:32,combo,sshd(pam_unix),23664,authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=061092085098.ctinets.com,E16,authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=<*>\r","Content":"authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=061092085098.ctinets.com","source_message":"48,Jun,15,14:53:32,combo,sshd(pam_unix),23664,authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=061092085098.ctinets.com,E16,authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=<*>\r","PID":"23664"}},{"_index":"auth_failures","_type":"_doc","_id":"Bq8KWIsBNUfRC-kwd90g","_score":1.0,"_source":{"Date":"21","Month":"Jun","Component":"sshd(pam_unix)","host":"oluchi","span_process":"NODEVssh ruser=","host_name":"217.60.212.66","@version":"1","EventId":"E17","path":"/path/logstash-7.15.0/es_datapoints/Linux_2k.log_structured.csv","Time":"08:56:36","@timestamp":"2023-10-21T22:08:12.599Z","Level":"combo","EventTemplate

Conclusion

In this blog post, we’ve demonstrated how to use Logstash and Elasticsearch to efficiently analyze authentication failures from system logs. By configuring Logstash to fetch, filter, and store the data, we’ve taken the first steps toward proactive log analysis and security monitoring.