Shipping Events to Logstash (Part 1)

2023-10-21

For your reference, below is a list of the articles in this series.

  1. Introduction to Logstash
  2. Overview of Logstash plugins
  3. Shipping Events to Logstash (Part 1) (this article)
  4. Shipping Events to Logstash (Part 2)

For this blog post, we’ll be sourcing our data from Loghub . Loghub is a valuable resource that curates a collection of system logs, freely available for AI-driven log analytics research.

In this particular project, we’ll focus on downloading the Linux logs from the Loghub repository.
These logs, collected from Linux systems, provide a rich and comprehensive dataset that reflects the activities, errors, and events that occur within these systems.

Ingest Linux logs into Elasticsearch.

We’ll kick off our journey by configuring Logstash and loading our log data into the Elasticsearch cluster, specifically under the linux index.

Start the Elasticsearch cluster

1
2
cd elasticsearch-7.15.0
bin/elasticsearch

Create a file logstash-7.15.0/pipelines/es_data.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
input {
file {
path => "/path/to/your/Linux_2k.log_structured.csv"
start_position => "beginning"
sincedb_write_interval => "60 seconds"
}
}

filter {
csv {
separator => ","
columns => ["LineId", "Month", "Date", "Time", "Level", "Component", "PID", "Content", "EventId", "EventTemplate"]
}
}

output {
elasticsearch {
hosts => "http://localhost:9200"
index => "linux"
}
stdout {
codec => rubydebug
}
}

  1. Input Configuration:

    • The input section defines the source of data that Logstash should read. In this case, it’s a file input.
    • path: Specifies the path to the input file, which is a CSV file containing log data.
    • start_position: Sets the position from which Logstash should start reading the file. “beginning” indicates it should start from the beginning of the file.
    • sincedb_write_interval: This parameter defines how often Logstash should write to the sincedb, a file that keeps track of the current position in the file. In this case, it’s set to write every 60 seconds.
  2. Filter Configuration:

    • The filter section is where data transformation and parsing take place.
    • The csv filter plugin is used to parse the CSV data. It specifies the delimiter (,) and the expected columns in the CSV.
    • The columns option lists the names of the columns in the CSV file. Logstash will use these names to parse and structure the data.
  3. Output Configuration:

    • The output section defines where Logstash should send the processed data. In this configuration, it’s set to send data to both Elasticsearch and the standard output (stdout).
    • The elasticsearch output plugin is used to send the data to an Elasticsearch cluster.
      • hosts: Specifies the Elasticsearch cluster’s address. In this case, it’s set to http://localhost:9200.
      • index: Sets the index name in Elasticsearch where the data will be stored. Here, it’s named “linux.”
    • The stdout output plugin is used for debugging and displays the data in a human-readable format using the rubydebug codec.

Run the pipeline

1
bin/logstash -f pipelines/es_data.conf
1
2
3
4
curl -X GET "http://localhost:9200/linux/_search?size=1"



Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
"took": 64,
"timed_out": false,
"_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 },
"hits": {
"total": { "value": 2001, "relation": "eq" },
"max_score": 1.0,
"hits": [
{
"_index": "linux",
"_type": "_doc",
"_id": "oa9IVIsBNUfRC-kwHddF",
"_score": 1.0,
"_source": {
"path": "/path/logstash-7.15.0/es_datapoints/Linux_2k.log_structured.csv",
"@timestamp": "2023-10-21T22:08:12.559Z",
"PID": "20898",
"Level": "combo",
"EventId": "E18",
"@version": "1",
"EventTemplate": "authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=<*> user=root",
"Time": "02:04:59",
"Component": "sshd(pam_unix)",
"Date": "15",
"LineId": "13",
"message": "13,Jun,15,02:04:59,combo,sshd(pam_unix),20898,authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=220-135-151-1.hinet-ip.hinet.net user=root,E18,authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=<*> user=root\r",
"host": "oluchi",
"Month": "Jun",
"Content": "authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=220-135-151-1.hinet-ip.hinet.net user=root"
}
}
]
}
}


To view your index on Kibana
Kibana is a browser-based user interface that can be used to search, analyze and visualize the data stored in Elasticsearch indices.

1
2
3
4
curl -O https://artifacts.elastic.co/downloads/kibana/kibana-7.15.0-darwin-x86_64.tar.gz
tar -xzf kibana-7.15.0-darwin-x86_64.tar.gz
cd kibana-7.15.0
bin/kibana

Elastic UI
Kibana Dev Console

What’s Next?

We will:

  1. Explore data transformation techniques to prepare the logs for analysis.
  2. Store the transformed data in Elasticsearch, enabling rapid querying and analysis.