Watch AWS resources logs in Kibana

It’s easy to manage Amazon solutions which don’t require any special operations skill. Set load balancer, speed up content delivery with Cloudfront, store enormous amounts of data in S3 in 2 clicks. But what if we wish to get into the bottom about idea how do they all work? Do they work right in our case?

For this issue AWS included opportunity to enable logging in HTTP-based products. It’s ELB, S3 and Cloudfront. Also we can get logs from RDS database instances. But you’re not likely able to track these huge files manually. Leave this job for ELK log management stack and save yourself for analysis.

What we’re going to use


  1. Working AWS resources:
  • ELB (Elastic Load Balancer);
  • S3 bucket (log events source);
  • one more S3 bucket (logging destination, could be the same bucket as source)
  • Cloudfront distribution;
  • RDS DB instance.

They must have at least some accepted request to have any events.

     2. Elastic log management stack:

  • Filebeat (on client node where logfiles are stored);
  • Logstash (pattern management on server);
  • Elasticsearch (search engine && document-oriented database on server);
  • Kibana (Node.js dashboard on server).

To learn how to install this all, you may proceed to the DigitalOcean tutorial.

     3. GeoIP engine attached to ELK stack (learn with DigitalOcean again).

     4. Python3 & Boto3 module as AWS SDK.

Take your poison here and change the Python to any language else. Amazon have SDK for almost all programming languages.

     5. Terraform (if you know it and if you want)

Enable logging


At first we have to get any logs from services. Amazon allows to do it pretty simple in a couple of clicks. That’s explained in AWS documentation: ELB, Cloudfront, S3. You would do it more easily for further maintenance with Terraform. Here is just code snippets of resources with logging set up.

resource "aws_elb" "example-com" {
...
    access_logs {
        bucket = "destination-bucket"
      }
...
}

resource "aws_cloudfront_distribution" "my-distribution" {
...
 logging_config {
    include_cookies = false
    bucket          = "destination-bucket.s3.amazonaws.com"
    prefix          = "cloudfront_distribution_logs"
  }
...
}

resource "aws_s3_bucket" "my-source-bucket" {
...
    logging {
       target_bucket = "destination-bucket"
       target_prefix = "source_bucket_logs/"
    }
...
}

RDS sends log files to your console by default. For logging configuration switch the variables in instance parameter group. For example, PostgreSQL instances have this option splitted on few variables with “log_” prefix.

Pull the data to local machine


You can ship records to Logstash either directly from S3 either from any machine plus Filebeat. I did the second one. At the first look it seems nonsensical. But I think it might be handy to keep the data on the server for faster direct access in emergency. It’s faster than download small  batches from S3 and try to analyze this.

The main trouble with Amazon logging is the enormous amount of files. You may have hundreds of files per hour in your bucket and just a half-dozen records in each file. So, the goal will be get all files for the last time interval (say, hour) from S3, merge them and store in machine. Machine at this example is a EC2 instance. Also we need to properly configure logs prefix to download files for this period.

Another trouble – Cloudfront logging. They sent in gzip format which might be a little bit complicated for processing.

I did this by Python. Connected to S3 through Boto3, I got the list last logs, download them, write the content into the single file and remove downloaded files. Take a look at my code. I also posted this on my Github repository.

import boto3
import os
import gzip
from datetime import datetime, timedelta

os.environ["BOTO_CONFIG"] = "/etc/boto.cfg"                             #set config path
previous_hour = datetime.now() - timedelta(hours=1)                     #count previous hour
previous_hour_default = previous_hour.strftime("%Y-%m-%d-%H")           #check what time was one hour before
previous_hour_elb_prefix = previous_hour.strftime("%Y/%m/%d")           #calculate different time string for ELB prefix
previous_hour_elb_log = previous_hour.strftime("%Y%m%dT%H")             #calculate different string for ELB log prefix

client = boto3.client('s3')                                             #connect to s3 to get info
resource = boto3.resource('s3')                                         #connect to s3 to download files

directory = 'tmp/'                                                      #set path for directory
logs_directory = '/var/log/aws/'
if not os.path.exists(directory):        os.makedirs(directory)
if not os.path.exists(logs_directory):   os.makedirs(logs_directory)

def get_log(bucket, search_prefix, main_log_file, label=None):          #main function for logs collection
    files_list = []
    response = client.list_objects(Bucket=bucket,Prefix=search_prefix)  #look for logs recorded for previous hour
    file_keys_dict = 'Contents'                                         #get list of files under response
    try:
        for i in range(0, len(response[file_keys_dict])):               #parse array
            file = response[file_keys_dict][i]['Key']                   #get log name
            path = directory + file                                     #set path to download
            if label == 'ELB':
                local_dir = 'tmp/elb'                                  #change temporary path for ELB logs
                if not os.path.exists(local_dir):        os.makedirs(local_dir)
                path = local_dir + file.split('/')[-1]
            resource.meta.client.download_file(bucket, file, path)              #download at last
            files_list.append(path)                                             #add file name to list
        if os.path.exists(main_log_file):   outfile = open(main_log_file, 'a')
        else:                               outfile = open(main_log_file, 'w')  #open main log file
        for log_file in files_list:
            if label == 'Cloudfront':
                with gzip.open(log_file) as infile:                     #decompress Cloudfront gzip logs
                    for line in infile:
                        line = line.decode()
                        outfile.write(line)                                #append log content to this file
            else:
                with open(log_file) as infile:
                    outfile.write(infile.read())
            os.remove(log_file)                                             #remove small log file
    except KeyError:                                                    #KeyError means here 0 logs per last hour
        pass

bucket = 'destination-bucket' 

##S3 logs
#customize prefixes for each kind of AWS resources
prefix = 'source_bucket_logs/'
search_prefix = prefix + previous_hour_default
main_log_file = logs_directory + 's3.log'                                            #set path for log files
get_log(bucket, search_prefix, main_log_file)                           #call main function

## Cloudfront logs
distribution_name = 'cloudfront_distribution'
distribution_id = 'E3KLSDFK78ADS56RLE' #check this from file keys of your logs
bucket = 'ticksamisc'
prefix = 'cloudfront_' + distribution_name + '/' + distribution_id
search_prefix = prefix + '.' + previous_hour_default
tmp_dir = 'tmp/' + distribution_name
if not os.path.exists(tmp_dir):        os.makedirs(tmp_dir)
main_log_file = logs_directory + 'cloudfront_' + distribution_name + '.log'
get_log(bucket, search_prefix, main_log_file, label='Cloudfront')

## ELB logs
region = 'us-west-2' #or another region you’ve got
account = '18342893241209' #better as string, not as integer
subprefix = 'AWSLogs/' + account + '/elasticloadbalancing/ + region + '/'
prefix = subprefix + previous_hour_elb_prefix
balancer = 'example-com'
# subprefix.split('AWSLogs/')[1].replace('/', '_') means subprefix conversion for appropriate log name
search_prefix = prefix + '/' + subprefix.split('AWSLogs/')[1].replace('/', '_') + balancer + '_' + previous_hour_elb_log
main_log_file = logs_directory + 'elb.log'
get_log(bucket, search_prefix, main_log_file, label='ELB')

RDS is different. Log files in your database already single and prepared for analysis. All you need – download and store them locally connecting to RDS API, not S3.

import boto3                                                                           
import os
import io
from datetime import datetime, timedelta

os.environ["BOTO_CONFIG"] = "/etc/boto.cfg"                                             #set config path
client = boto3.client('rds', region_name='us-west-2')                                   #connect to Oregon RDS service by default

dbid = 'mydb'       #set db identifier
Log_date = datetime.now() - timedelta(hours=1)                                          #match the log file date
log_date = log_date.strftime('%Y-%m-%d-%H')

log_file_name = 'error/postgresql.log.' + log_date                                      #calculate the name of postgresql log file in RDS instance
log_file_ec2 = '/var/log/postgresql-rds/postgresql.log.' + log_date                     #calculate the name of new log file in ec2 instance

response = client.download_db_log_file_portion(DBInstanceIdentifier=dbid,   LogFileName=log_file_name)    #main API request to get the last log
buf = io.StringIO(response['LogFileData'])                                              #IO module to properly process log data

with open(log_file_ec2, 'w+') as tmp_file:                                                   #write everything to primary log
    tmp_file.write(buf.read())

Setup filebeat config


OK, now we have some logs and schedule Python script in cron. Let’s tell the Filebeat where we keep our files what what type they are. Remember, Filebeat client must be on the same machine where our logs stored.

That’s the full config to skip you from misunderstanding about missed items.

filebeat:
  prospectors:
    - paths:
        - /var/log/aws/s3.log
      input_type: log
      document_type: s3

    - paths:
        - /var/log/aws/elb.log
      input_type: log
      document_type: elb

    - paths:
        - /var/log/aws/cloudfront_*.log
      input_type: log
      document_type: cloudfront

    - paths:
        - /var/log/postgresql-rds/postgresql.log.*
      input_type: log
      document_type: postgresql

  registry_file: /var/lib/filebeat/registry

output:
  logstash:
    hosts: ["{{ your_elk_server_ip }}:5044"]
    bulk_max_size: 1024
    tls:
      certificate_authorities: ["{{ ssl_cert }}"]
      insecure: true
shipper:

logging:
  files:
    rotateeverybytes: 10485760 # = 10MB

Think about Grok patterns


AWS logging gives much information about requests, but it’s so hard to understand what’s written there. Fortunately, Logstash has its own builtin pattern for S3 and ELB. Thus skip this step. But…oh, we still have to deal with RDS and Cloudfront. While PostgreSQL logging is OK…

POSTGRESQL %{TIMESTAMP_ISO8601:postgresql_timestamp} %{TZ:timezone}:%{IPV4:postgresql_clientip}\(%{INT:pid}\):%{USERNAME:user}@%{WORD:database}:\[%{INT:group_id}\]:%{DATA:log_level}:  %{GREEDYDATA:log_message}

…Cloudfront looks nasty. But Cloudront pattern exists in the web and you can easily set this pattern ensured it will work.

CLOUDFRONT_ACCESS_LOG %{DATE_EU:date}\t%{TIME:time}\t%{WORD:x_edge_location}\t(?:%{NUMBER:sc_bytes}|-)\t%{IPORHOST:c_ip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User_Agent}\t%{GREEDYDATA:cs_uri_stem}\t%{GREEDYDATA:cookies}\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes}\t%{GREEDYDATA:time_taken}\t%{GREEDYDATA:x_forwarded_for}\t%{GREEDYDATA:ssl_protocol}\t%{GREEDYDATA:ssl_cipher}\t%{GREEDYDATA:x_edge_response_result_type}

Let’s save both patterns in the single file in /etc/logstash/patterns/aws path.

Setup Logstash config


At this step we already have 4 types of logs with patterns prepared. Let’s configure it. You may append rules to existing config or create the new one in /etc/logstash/conf.d directory. If it’s hard to read this code in this post – got to my repo.

filter {
if [type] == "postgresql" {
 
  grok {
     add_tag => [ "valid" ]
     patterns_dir => "/etc/logstash/patterns"
     match => { "message" => "%{POSTGRESQL}" }
     named_captures_only => true
 }

date {
     match => [ "postgresql_timestamp", "yyyy-MM-dd HH:mm:ss" ]
}

if "valid" not in [tags] {            
     drop { }
   }

    mutate {
      remove_tag => [ "valid" ]
    }
}

if [type] == "s3" {

  grok {
      add_tag => [ "valid" ]
      patterns_dir => "/etc/logstash/patterns"
      match => { "message" => "%{S3_ACCESS_LOG}" }
      named_captures_only => true
 }

date {
     match => ["timestamp", "d/MMM/YYYY:HH:mm:ss Z"]
}

geoip {
       source => "clientip"
       target => "geoip"
       database => "/etc/logstash/GeoLiteCity.dat"
       add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
       add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
 }

if "valid" not in [tags] {            
     drop { }
   }

    mutate {
      convert => [ "[geoip][coordinates]", "float"]
      remove_tag => [ "valid" ]
    }
}

if [type] == "elb" {


 grok {
 add_tag => [ "valid" ]
 patterns_dir => "/etc/logstash/patterns"
 match => { "message" => "%{ELB_ACCESS_LOG}" }
 named_captures_only => true
 }

date {
 match => ["timestamp", "d/MMM/YYYY:HH:mm:ss Z"]
}

geoip {
 source => "clientip"
 target => "geoip"
 database => "/etc/logstash/GeoLiteCity.dat"
 add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
 add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
 }

if "valid" not in [tags] { 
 drop { }
 }

 mutate {
 convert => [ "[geoip][coordinates]", "float"]
 remove_tag => [ "valid" ]
  }
}

if [type] == "cloudfront" {

grok {
add_tag => [ "valid" ]
patterns_dir => "/etc/logstash/patterns"
match => { "message" => "%{CLOUDFRONT_ACCESS_LOG}" }
named_captures_only => true
}

if "valid" not in [tags] {            
     drop { }
   }

geoip {
       source => "c_ip"
       target => "geoip"
       database => "/etc/logstash/GeoLiteCity.dat"
       add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
       add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
       }

    mutate {
      convert => [ "[geoip][coordinates]", "float"]
      add_field => [ "listener_timestamp", "%{date} %{time}" ]
      remove_tag => [ "valid" ]
    }

date {
match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
    }
  }
}

Here I’m skipping failed-to-parse records and setup GeoIP processing to client ip fields. Postgres logs also include client IP. But I think database should be closed and accessible only for application node. So, there is no reason to track GeoIP with one address.

And don’t forget to test config after change and restart Logstash at last.

Check the new fields


After Logstash restart go to Kibana and filter logs by type. Let’s filter by the “ELB” type and check do we see anything.

You will see new fields. Explore what they mean, now you got plenty of useful data for common analysis. Don’t forget to refresh filebeat index in Kibana to sort the fields.

trml86ieebe

Save Kibana discoveries


Pick the most necessary fields for you, merge into one search and save this discovery in Kibana.

jqzqsifsjz0

Make a GeoIP map


I left the most delicious step for the end. Understood, how to add GeoIP map from saved discovery search, you will now track the users map.

en9gkzwc85c

Conclusion


Just look at the idea: manage AWS services based on real-time events and setup alarms on all actions we want. And we actually make this with open source technologies such Elastic products are. If I would seen this at first, I would definitely thought it’s like from movie about hackers. But that’s real and easy to make.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s