Skip to Content
Back to Blog

Logstash for ft_transcendence: Advanced Log Processing

Logstash for ft_transcendence: Advanced Log Processing

What is Logstash?

Logstash is a server-side data processing pipeline that ingests data from multiple sources, transforms it, and then sends it to a destination like Elasticsearch. In our ft_transcendence observability stack, Logstash sits between Filebeat and Elasticsearch, enhancing the value of our log data.

Current Logstash Version: 8.17.4 (matching our Elasticsearch version)

Why Logstash for ft_transcendence?

While Filebeat can send logs directly to Elasticsearch, Logstash provides several important benefits:

  1. Complex Processing: More advanced filtering and transformation than Filebeat alone
  2. Data Enrichment: Adding context and structure to logs
  3. Format Conversion: Standardizing log formats across services
  4. Error Handling: Robust processing of malformed or unexpected log entries

Logstash Pipeline Design

Our Logstash configuration uses multiple pipelines to process different types of logs efficiently.

Main Pipeline Configuration (pipelines.yml)

- pipeline.id: main
  path.config: '/usr/share/logstash/pipeline/main.conf'
  pipeline.workers: 1

- pipeline.id: nginx
  path.config: '/usr/share/logstash/pipeline/nginx.conf'
  pipeline.workers: 1

- pipeline.id: django
  path.config: '/usr/share/logstash/pipeline/django.conf'
  pipeline.workers: 1

- pipeline.id: nextjs
  path.config: '/usr/share/logstash/pipeline/nextjs.conf'
  pipeline.workers: 1

- pipeline.id: postgres
  path.config: '/usr/share/logstash/pipeline/postgres.conf'
  pipeline.workers: 1

- pipeline.id: redis
  path.config: '/usr/share/logstash/pipeline/redis.conf'
  pipeline.workers: 1

Main Pipeline Example (main.conf)

input {
  beats {
    port => 5044
    ssl => false
  }
}

filter {
  # Extract log source information
  if [fields][service] {
    mutate {
      add_field => { "service" => "%{[fields][service]}" }
    }
  } else {
    mutate {
      add_field => { "service" => "unknown" }
    }
  }

  # Route to specific pipelines based on service
  if [service] == "nginx" {
    mutate { add_field => { "[@metadata][pipeline]" => "nginx" } }
  } else if [service] == "django" {
    mutate { add_field => { "[@metadata][pipeline]" => "django" } }
  } else if [service] == "nextjs" {
    mutate { add_field => { "[@metadata][pipeline]" => "nextjs" } }
  } else if [service] == "postgresql" {
    mutate { add_field => { "[@metadata][pipeline]" => "postgres" } }
  } else if [service] == "redis" {
    mutate { add_field => { "[@metadata][pipeline]" => "redis" } }
  }
}

output {
  # Route to appropriate pipeline
  if [@metadata][pipeline] {
    pipeline { send_to => [@metadata][pipeline] }
  } else {
    # Default output for logs that don't match a specific pipeline
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      user => "${ELASTIC_USER}"
      password => "${ELASTIC_PASSWORD}"
      ssl => true
      ssl_certificate_verification => false
      index => "logs-%{+YYYY.MM.dd}"
    }
  }
}

Service-Specific Pipeline Example (django.conf)

input {
  pipeline { address => "django" }
}

filter {
  # Parse Django log format
  grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:log_level}\] \[%{DATA:module}\] %{GREEDYDATA:log_message}" }
  }

  # Handle traceback logs
  if [log_level] == "ERROR" {
    grok {
      match => { "log_message" => "(?m)%{GREEDYDATA:error_type}: %{GREEDYDATA:error_message}(\n%{GREEDYDATA:traceback})?" }
    }
  }

  # Extract HTTP request details if present
  if [log_message] =~ /HTTP/ {
    grok {
      match => { "log_message" => ".*\"(?<http_method>[A-Z]+) (?<http_path>[^ ]+) HTTP/%{NUMBER:http_version}\" (?<http_status>[0-9]+)" }
    }
  }

  # Add timestamps
  date {
    match => [ "timestamp", "ISO8601" ]
    target => "@timestamp"
  }

  # Add severity level numeric value for easier filtering
  if [log_level] == "DEBUG" {
    mutate { add_field => { "severity_level" => 0 } }
  } else if [log_level] == "INFO" {
    mutate { add_field => { "severity_level" => 1 } }
  } else if [log_level] == "WARNING" {
    mutate { add_field => { "severity_level" => 2 } }
  } else if [log_level] == "ERROR" {
    mutate { add_field => { "severity_level" => 3 } }
  } else if [log_level] == "CRITICAL" {
    mutate { add_field => { "severity_level" => 4 } }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    user => "${ELASTIC_USER}"
    password => "${ELASTIC_PASSWORD}"
    ssl => true
    ssl_certificate_verification => false
    index => "django-%{+YYYY.MM.dd}"
  }
}

Advanced Processing Techniques

Grok Patterns

We use Grok patterns to parse unstructured log data into structured fields:

grok {
  match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:log_message}" }
}

Data Enrichment

We add additional context to logs to make them more useful:

# Add request duration categorization
if [http_response_time] {
  ruby {
    code => "
      time = event.get('http_response_time').to_f
      if time < 0.1
        event.set('response_category', 'fast')
      elsif time < 0.5
        event.set('response_category', 'medium')
      elsif time < 2.0
        event.set('response_category', 'slow')
      else
        event.set('response_category', 'very_slow')
      end
    "
  }
}

# Add geographic data for IP addresses
if [client_ip] {
  geoip {
    source => "client_ip"
  }
}

Error Handling

We ensure robustness with error handling:

filter {
  # Attempt to parse the JSON
  json {
    source => "message"
    skip_on_invalid_json => true
    target => "parsed_json"
  }

  # Handle the case where JSON parsing fails
  if "_jsonparsefailure" in [tags] {
    mutate {
      add_field => { "parse_error" => "Failed to parse JSON log" }
      remove_tag => [ "_jsonparsefailure" ]
    }
  }
}

Docker Compose Integration

Here's how we've integrated Logstash into our Docker Compose setup:

logstash:
  image: docker.elastic.co/logstash/logstash:8.17.4
  container_name: ft_logstash
  volumes:
    - ./config/logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
    - ./config/logstash/pipeline:/usr/share/logstash/pipeline:ro
    - ./config/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
  environment:
    - ELASTIC_USER=elastic
    - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
    - 'LS_JAVA_OPTS=-Xms256m -Xmx256m'
  ports:
    - '5044:5044'
  networks:
    - elastic
  depends_on:
    - elasticsearch
  restart: unless-stopped

Performance Optimization

Given our limited resources, we've optimized Logstash:

Memory Settings

# logstash.yml
pipeline.batch.size: 125
pipeline.batch.delay: 50
pipeline.workers: 1

Java Options

-Xms256m -Xmx256m

Pipeline Workers

We limit each pipeline to a single worker to reduce resource consumption:

pipeline.workers: 1

Monitoring Logstash

To ensure smooth operation:

  1. Monitor Logstash with its built-in API: http://localhost:9600/_node/stats
  2. Check pipeline performance via Kibana Monitoring
  3. Watch for dropped events or processing delays

Next Steps

In the next article, we'll explore how to create useful dashboards in Kibana to visualize and analyze the data processed by our Elasticsearch stack.