Logstash for ft_transcendence: Advanced Log Processing
Logstash for ft_transcendence: Advanced Log Processing
What is Logstash?
Logstash is a server-side data processing pipeline that ingests data from multiple sources, transforms it, and then sends it to a destination like Elasticsearch. In our ft_transcendence observability stack, Logstash sits between Filebeat and Elasticsearch, enhancing the value of our log data.
Current Logstash Version: 8.17.4
(matching our Elasticsearch version)
Why Logstash for ft_transcendence?
While Filebeat can send logs directly to Elasticsearch, Logstash provides several important benefits:
- Complex Processing: More advanced filtering and transformation than Filebeat alone
- Data Enrichment: Adding context and structure to logs
- Format Conversion: Standardizing log formats across services
- Error Handling: Robust processing of malformed or unexpected log entries
Logstash Pipeline Design
Our Logstash configuration uses multiple pipelines to process different types of logs efficiently.
Main Pipeline Configuration (pipelines.yml)
- pipeline.id: main
path.config: '/usr/share/logstash/pipeline/main.conf'
pipeline.workers: 1
- pipeline.id: nginx
path.config: '/usr/share/logstash/pipeline/nginx.conf'
pipeline.workers: 1
- pipeline.id: django
path.config: '/usr/share/logstash/pipeline/django.conf'
pipeline.workers: 1
- pipeline.id: nextjs
path.config: '/usr/share/logstash/pipeline/nextjs.conf'
pipeline.workers: 1
- pipeline.id: postgres
path.config: '/usr/share/logstash/pipeline/postgres.conf'
pipeline.workers: 1
- pipeline.id: redis
path.config: '/usr/share/logstash/pipeline/redis.conf'
pipeline.workers: 1
Main Pipeline Example (main.conf)
input {
beats {
port => 5044
ssl => false
}
}
filter {
# Extract log source information
if [fields][service] {
mutate {
add_field => { "service" => "%{[fields][service]}" }
}
} else {
mutate {
add_field => { "service" => "unknown" }
}
}
# Route to specific pipelines based on service
if [service] == "nginx" {
mutate { add_field => { "[@metadata][pipeline]" => "nginx" } }
} else if [service] == "django" {
mutate { add_field => { "[@metadata][pipeline]" => "django" } }
} else if [service] == "nextjs" {
mutate { add_field => { "[@metadata][pipeline]" => "nextjs" } }
} else if [service] == "postgresql" {
mutate { add_field => { "[@metadata][pipeline]" => "postgres" } }
} else if [service] == "redis" {
mutate { add_field => { "[@metadata][pipeline]" => "redis" } }
}
}
output {
# Route to appropriate pipeline
if [@metadata][pipeline] {
pipeline { send_to => [@metadata][pipeline] }
} else {
# Default output for logs that don't match a specific pipeline
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "${ELASTIC_USER}"
password => "${ELASTIC_PASSWORD}"
ssl => true
ssl_certificate_verification => false
index => "logs-%{+YYYY.MM.dd}"
}
}
}
Service-Specific Pipeline Example (django.conf)
input {
pipeline { address => "django" }
}
filter {
# Parse Django log format
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:log_level}\] \[%{DATA:module}\] %{GREEDYDATA:log_message}" }
}
# Handle traceback logs
if [log_level] == "ERROR" {
grok {
match => { "log_message" => "(?m)%{GREEDYDATA:error_type}: %{GREEDYDATA:error_message}(\n%{GREEDYDATA:traceback})?" }
}
}
# Extract HTTP request details if present
if [log_message] =~ /HTTP/ {
grok {
match => { "log_message" => ".*\"(?<http_method>[A-Z]+) (?<http_path>[^ ]+) HTTP/%{NUMBER:http_version}\" (?<http_status>[0-9]+)" }
}
}
# Add timestamps
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
# Add severity level numeric value for easier filtering
if [log_level] == "DEBUG" {
mutate { add_field => { "severity_level" => 0 } }
} else if [log_level] == "INFO" {
mutate { add_field => { "severity_level" => 1 } }
} else if [log_level] == "WARNING" {
mutate { add_field => { "severity_level" => 2 } }
} else if [log_level] == "ERROR" {
mutate { add_field => { "severity_level" => 3 } }
} else if [log_level] == "CRITICAL" {
mutate { add_field => { "severity_level" => 4 } }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "${ELASTIC_USER}"
password => "${ELASTIC_PASSWORD}"
ssl => true
ssl_certificate_verification => false
index => "django-%{+YYYY.MM.dd}"
}
}
Advanced Processing Techniques
Grok Patterns
We use Grok patterns to parse unstructured log data into structured fields:
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:log_message}" }
}
Data Enrichment
We add additional context to logs to make them more useful:
# Add request duration categorization
if [http_response_time] {
ruby {
code => "
time = event.get('http_response_time').to_f
if time < 0.1
event.set('response_category', 'fast')
elsif time < 0.5
event.set('response_category', 'medium')
elsif time < 2.0
event.set('response_category', 'slow')
else
event.set('response_category', 'very_slow')
end
"
}
}
# Add geographic data for IP addresses
if [client_ip] {
geoip {
source => "client_ip"
}
}
Error Handling
We ensure robustness with error handling:
filter {
# Attempt to parse the JSON
json {
source => "message"
skip_on_invalid_json => true
target => "parsed_json"
}
# Handle the case where JSON parsing fails
if "_jsonparsefailure" in [tags] {
mutate {
add_field => { "parse_error" => "Failed to parse JSON log" }
remove_tag => [ "_jsonparsefailure" ]
}
}
}
Docker Compose Integration
Here's how we've integrated Logstash into our Docker Compose setup:
logstash:
image: docker.elastic.co/logstash/logstash:8.17.4
container_name: ft_logstash
volumes:
- ./config/logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
- ./config/logstash/pipeline:/usr/share/logstash/pipeline:ro
- ./config/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
environment:
- ELASTIC_USER=elastic
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- 'LS_JAVA_OPTS=-Xms256m -Xmx256m'
ports:
- '5044:5044'
networks:
- elastic
depends_on:
- elasticsearch
restart: unless-stopped
Performance Optimization
Given our limited resources, we've optimized Logstash:
Memory Settings
# logstash.yml
pipeline.batch.size: 125
pipeline.batch.delay: 50
pipeline.workers: 1
Java Options
-Xms256m -Xmx256m
Pipeline Workers
We limit each pipeline to a single worker to reduce resource consumption:
pipeline.workers: 1
Monitoring Logstash
To ensure smooth operation:
- Monitor Logstash with its built-in API:
http://localhost:9600/_node/stats
- Check pipeline performance via Kibana Monitoring
- Watch for dropped events or processing delays
Next Steps
In the next article, we'll explore how to create useful dashboards in Kibana to visualize and analyze the data processed by our Elasticsearch stack.