Migrate log analysis from AWS ElasticSearch to BigQuery
Jinwook Baek
Posted on January 29, 2021
Introduction
I have been using AWS ElasticSearch for near-real time analysis for API servers. AWS provides built-in Elasticsearch subscription filter for CloudWatch. With less than a hour of effort I can spin up an Elasticsearch cluster to visualize and analyze server logs. However AWS Elasticsearch is not cheap in production setup, also as logs accumulate, it needs some maintenance (lifecycle policy, JVM pressure, etc). Also dev team was not fully utilizing the elastic stack as much. I have been meaning to decommission the Elasticsearch and find alternative option for several month. And finally I decided that it's time to migrate to other solution for following reason.
Amazon: NOT OK - why we had to change Elastic licensing
After some survey, I listed requirements for the substitue
- Near-realtime analysis (no batch load)
- Simple filter and aggregation by time range, api status, etc
- Access management
- Support for visualization tool (substitue Kibana)
- No extra setup on server code (no splunk, fluntd, etc)
Anchoring CloudWatch logs are the inception point of all the logs(nginx, api logs, etc) There were couple of alternatives I could think about.
- Vanilla CloudWatch (manually searching through logs and no visulization, query support → no need to implement anything else)
- CloudWatch → S3(firehose) → Athena
- CloudWatch → S3(firehose) → Redshift Spectrum
- CloudWatch → Kinesis Stream → Kinesis Analysis
- CloudWatch → Lambda → BigQuery
I was initially inclined to choose something from AWS since I prefer to have less moving pieces and do less coding. However I am already using BigQuery for Data warehouse. I concluded that if I was going to stash some data somewhere I might as well centralize all of them in single place. So I chose to go with lambda+Bigquery. For this post, I will be using nginx log for example, since these formats are common for most of people.
Load vs. Stream
Before going into actual setup process I must acknowledge couple things such as distinction betweenLoad
and streaming
. Load is when you load batch of data for once or in recurrence. In BigQuery, Load
is free and it only charge for storage. However Stream
incurs charge and each insert is $0.010 per 200 MB
.
Prerequisite
- GCP Account with billing enabled (streaming is disabled on free tier)
- Nginx log on Cloudwatch
- SAM environment setup + Lambda role (refer to previous blog post regarding lambda env setup) Migrate python async worker to asynchrounous Lambda
Setup
I am going to cover following components
- Inspect Cloudwatch logs
- BigQuery table
- Lambda code
- Cloudwatch subscription
- Biquery + datastudio
BigQuery Table and Schema
First of all, I need to create BigQuery table for the nginx logs to be stored. In order to create correct schema, let's take a look at how the logs will be sent from CloudWatch to lambda subscription filter.
Nginx log
Default nginx log format looks like this. (My log sample has remote ip appended at the end.)
log_format combined '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent"';
//sample
172.11.1.1111 - - [28/Jan/2021:16:24:03 +0000] "GET /api/healthcheck/ HTTP/1.1" 200 2 "-" "CRAZY_USER_AGENT/2.0" "111.111.111.111"
I need to transform this log in order to insert to bigquery table in certain format. Thankfully, CloudWatch subscription filter provide filter pattern.
If I provide following filter pattern like [internal_ip, identitiy , auth_user, date, request, status, bytes, referer, useragent, remote_addr]
, subscription filter automatically breaks down the log in json format as extractedFields
. You can even select certain logs for this filter like [internal_ip, identity , auth_user, date, request !="**/api/healthcheck/**", status, bytes, referer, useragent, remote_addr]
Now we know that the log will come in certain format, I will create a table with following format.
id:string,
timestamp:timestamp,
remote_addr:string,
useragent:string,
referer:string,
bytes:numeric,
status:string,
request:string,
auth_user:string,
date:string,
internal_ip:string,
identity:string
If I need more fields later, I will add them. Make sure that the table is partitioned by timestamp
for the query performance and also expire the table after certain time for the cost reduction. (BQ applies cold storage for untouched data after certain periods of time so you might just want to keep them)
Create BigQuery table
Let's go to GCP console and create table with following schema.
You can use cloudshell if you prefer CLI.
bq mk \
--table \
--description description \
--time_partitioning_field timestamp \
--time_partitioning_type DAY \
--label key:value, key:value \
<project_id>:<dataset>.<table> \
id:string,timestamp:timestamp,remote_addr:string,\
useragent:string,referer:string,bytes:numeric,status:string,\
request:string,auth_user:string,date:string,internal_ip:string,\
identity:string,
Once table is created, copy the table name from detail tab. <project_id>.<dataset>.<table-name>
Service Account
Before going back to AWS and create a lambda function, we need to create a service account for lambda in order to gain permission to insert rows to BQ.
Go to service account under IAM menu and click + create service account
service account name : lambda_bq_stream
permissions : bigquery dataeditor
you can defined custom role with more restrictive permission you only need
- bigquery.tables.list
- bigquery.tables.get
- bigquery.tables.updateData
Once you created a service account you can download a json key click Add key -> create new key
. Download and stash somewhere safe, we will be using this key later.
Json key sample
{
"type": "service_account",
"project_id": "****",
"private_key_id": "****",
"private_key": "****",
"client_email": "****",
"client_id": "****",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/lambda-bigquery-stream%40hangfive-26bb4.iam.gserviceaccount.com"
}
I will use SAM for lambda deployment, please refer to previous post for the SAM setup.
Migrate python async worker to asynchrounous Lambda
Lambda Fucntion
Ok, now let's focus on the code. This function will receive stream of cloudwatch logs and stream insert rows to BQ table using google bigquery SDK written in python. I will break down the code in pieces to explanation.
Library
All you need is single pip library.
Authentication
Client requires authentication and there are two ways to authenticate; from file
or json dictionary
. There are couple options to do this.
google-auth
- Json file
- upload file to s3 and load them from lambda
- package json with the lambda code → (x)
- dictionary
- environment variable → I will use this for the purpose of simplicity
- load key info from secret manager or parameter store
Environment Variable
Although it is better to manage key from secret manager, I will use env var for the simplicity of exercise. I will supply following Environment Variable from json dictionary I have downloaded from previous sections
"project_id":
"private_key_id":
"private_key":
"client_email":
"client_id":
Decompress and decode cloudwatch logs
When you add lambda subscription filter to CloudWatch, the CloudWatch log event will be sent in gzipped / base64 encoded data. This is an example format. (Don't try to decode the data it's broken.)
You decompress and decode the message with following code.
compressed_payload = b64decode(data)
cloudwatch_payload = zlib.decompress(compressed_payload, 16 + zlib.MAX_WBITS)
json_payload = json.loads(cloudwatch_payload)
Resulting message will be decoded as following example format. Notice logEvents
for the list of logs. Notice that message
is actually log and they are broken as into extractedFields
according to filter supplied as [internal_ip, identity , auth_user, date, request, status, bytes, referer, useragent, remote_addr]
{
'messageType': 'DATA_MESSAGE',
'owner': '12341244',
'logGroup': '/ecs/nginx',
'logStream': 'ecs/nginx/65366ed9299f4554a2cf0dbd4c49ee08',
'subscriptionFilters': ['nginx_filter_sample'],
'logEvents': [{
'id': '35945479414329140852051181308733124824760966176424001536',
'timestamp': 1611851043287,
'message': '172.11.1.1111 - - [28/Jan/2021:16:24:03 +0000] "GET /api/healthcheck/ HTTP/1.1" 200 2 "-" "CRAZY_USER_AGENT/2.0" "111.111.111.111"',
'extractedFields': {
'date': '28/Jan/2021:16:24:03 +0000',
'request': 'GET /api/healthcheck/ HTTP/1.1',
'referer': '-',
'remote_addr': '-',
'bytes': '2',
'ip': '172.11.1.111',
'useragent': 'ELB-HealthChecker/2.0',
'identity': '-',
'auth_user': '-',
'status': '200'
}
}]
}
SKIP CONTROL MESSAGE
If control message is passed, we will skip insert
if json_payload['messageType'] == 'CONTROL_MESSAGE':
return
Create payload
We will create payload for the nginx log. Notice I have devided timestamp by 1000 since BQ supports microsecond resolution for timestamp type column.
rows_to_insert = []
for row in json_payload['logEvents']:
item = {}
item['id'] = row['id']
item['timestamp'] = row['timestamp'] / 1000
if 'extractedFields' in row:
for k, v in row['extractedFields'].items():
item[k] = v
rows_to_insert.append(item)
Inserting rows
Before inserting to BQ, let's take a quick look at the description of insert_rows_json
method.
-
insert_rows_json
method description Make sure to readrow_ids
params. It's a unique identifier in order to maintain deduplication.
"""Insert rows into a table without applying local type conversions.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
Args:
table (Union[ \
google.cloud.bigquery.table.Table \
google.cloud.bigquery.table.TableReference, \
str \
]):
The destination table for the row data, or a reference to it.
json_rows (Sequence[Dict]):
Row data to be inserted. Keys must match the table schema fields
and values must be JSON-compatible representations.
row_ids (Optional[Sequence[Optional[str]]]):
Unique IDs, one per row being inserted. An ID can also be
``None``, indicating that an explicit insert ID should **not**
be used for that row. If the argument is omitted altogether,
unique IDs are created automatically.
skip_invalid_rows (Optional[bool]):
Insert all valid rows of a request, even if invalid rows exist.
The default value is ``False``, which causes the entire request
to fail if any invalid rows exist.
ignore_unknown_values (Optional[bool]):
Accept rows that contain values that do not match the schema.
The unknown values are ignored. Default is ``False``, which
treats unknown values as errors.
template_suffix (Optional[str]):
Treat ``name`` as a template table and provide a suffix.
BigQuery will create the table ``<name> + <template_suffix>``
based on the schema of the template table. See
https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
Sequence[Mappings]:
One mapping per row with insert errors: the "index" key
identifies the row, and the "errors" key contains a list of
the mappings describing one or more problems with the row.
Raises:
TypeError: if `json_rows` is not a `Sequence`.
"""
Following code will insert a row into BQ table
table_id = '<project_id>.<dataset>.<table-name>'
errors = client.insert_rows_json(table_id, rows_to_insert) # Make an API request.
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
Full Lambda Code
Now that we have covered all the parts, let's combine together and create a function.
Full Lambda Code
Now that we have covered all the parts, let's combine together and create a function.
Test the function with sample events
Let's test the code for sample event payload. As a reminder, you can invoke function with following command
sam local invoke --env-vars env.json -e events/payload.json
Once you see following success message, let's go to BigQuery table and check if data is inserted successfully.
Successfully streamed to the table!
CloudWatch Subscription filter
IT's time to attach lambda subscription on log group. Refer to following CLI command or console.
put-subscription-filter - AWS CLI 1.18.221 Command Reference
put-subscription-filter
--log-group-name <value>
--filter-name nginxToBQ
--filter-pattern [internal_ip, identity , auth_user, date, request, status, bytes, referer, useragent, remote_addr]
--destination-arn <lambdaFUNCTIONARN>
Check Lambda Log
Let's make sure everything is working as expected before setting it on cruise mode. Check cloudwatch logs for the lambda for the cloudwatch subscription filter. If you don't see any error logs, it's all set!
If you are not certain that the logs will be delivered as expected, there are couple way to get notification from abnormal errors.
-
Create another lambda to detect error from logs to trigger SNS notification. Refer to the post for detailed setup
-
Setup AWS DevOps Guru with SNS notification. This ML powered devops support service will notify you when there are error on your lambda function automatically. (It's really useful!)
Data in BigQuery
After a while if everything is working as expected, you will see streams of data in your table.
It's time to make some visualization with the data we have. I will make a heat map of user request from the data. First using aggregation, create a query to get count of each valid remote addr
SELECT remote_addr ip, count(*) c
FROM `<project_id>.<dataset>.<table>`
WHERE DATE(timestamp) = "2021-01-29" and remote_addr not like "-"
group by 1
Then join this info with geolite2
table. (Since this query uses huge geolite2
table, it will incur cost.)
WITH source_of_ip_addresses AS (
SELECT remote_addr ip, count(*) c
FROM `hangfive-26bb4.cloudwatch.hangfive-nginx`
WHERE DATE(timestamp) = "2021-01-29" and remote_addr not like "-"
group by 1
)
SELECT city_name, SUM(c) c, ST_GeogPoint(AVG(longitude), AVG(latitude)) point
FROM (
SELECT ip, city_name, c, latitude, longitude, geoname_id
FROM (
SELECT *, NET.SAFE_IP_FROM_STRING(ip) & NET.IP_NET_MASK(4, mask) network_bin
FROM source_of_ip_addresses, UNNEST(GENERATE_ARRAY(9,32)) mask
WHERE BYTE_LENGTH(NET.SAFE_IP_FROM_STRING(ip)) = 4
)
JOIN `fh-bigquery.geocode.201806_geolite2_city_ipv4_locs`
USING (network_bin, mask)
)
WHERE city_name IS NOT null
GROUP BY city_name, geoname_id
ORDER BY c DESC
LIMIT 5000
This query will result in count of user in each city.
Visualize
Since we don't have support from Kibana anymore, I will visualize the data with Data Studio
.
Data studio
visualize with data studio or your favorite BI tool.
Conclusion
We have looked bunch of different services to get here. At the core, as long as you know how to write a lambda code, you can stream and ingest app logs to anywhere regardless cloudplatform. It's different feeling knowing that you span across other cloud platform. I hope my blog helped you and thank you for reading.
Extra
Concurrency
After letting the lambda to work for a while, my devops guru
setup actaully kicked in warning me of concurrency issues. If traffic spike, lambda is bound to be throttled without conccurency configuration.
{
'AccountId': '1111111111',
'Region': 'us-east-1',
'MessageType': 'NEW_RECOMMENDATION',
'InsightId': 'AGp3byn_PgRryjAItOHdAlcAAAAAAAAAAV8rYO_CKM4aynEOZWZiTd2_OV_0bsqS',
'Recommendations': [{
'Name': 'Troubleshoot errors and set up automatic retries in AWS Lambda',
'Description': 'Your Lambda function is throwing a high number of errors. To learn about common Lambda errors, their causes, and mitigation strategies, see this link.',
'Reason': 'The Errors metric in AWS::Lambda::FunctionName breached a high threshold. ',
'Link': 'https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html',
'RelatedEvents': [],
'RelatedAnomalies': [{
'SourceDetails': {
'CloudWatchMetrics': [{
'MetricName': 'Errors',
'Namespace': 'AWS::Lambda::FunctionName'
}]
},
'Resources': [{
'Name': 'CloudwatchToBQ',
'Type': 'AWS::Lambda::FunctionName'
}]
}]
}, {
'Name': 'Configure provisioned concurrency for AWS Lambda',
'Description': 'Your Lambda function is having trouble scaling. To learn how to enable provisioned concurrency, which allows your function to scale without fluctuations in latency, see this link.',
'Reason': 'The Duration metric in AWS::Lambda::FunctionName breached a high threshold. ',
'Link': 'https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html#configuration-concurrency-provisioned',
'RelatedEvents': [],
'RelatedAnomalies': [{
'SourceDetails': {
'CloudWatchMetrics': [{
'MetricName': 'Duration',
'Namespace': 'AWS::Lambda::FunctionName'
}]
},
'Resources': [{
'Name': 'CloudwatchToBQ',
'Type': 'AWS::Lambda::FunctionName'
}]
}]
}]
}
Managing concurrency for a Lambda function
For remedial action I have inspected lambda metrics and updated reserved concurrencies.
you should inspect your lambda metrics and update concurrency accordingly. Also if this is not enough, I recommend you to add pub/sub or queue service such as SQS to handle the traffic.
Streaming Quota
I need to remind you that Google BigQuery also has streaming quota. (It's 500000 rows per second)
Quotas and limits | BigQuery | Google Cloud
Reference
Streaming data into BigQuery | Google Cloud
Streaming insert | BigQuery | Google Cloud
Geolocation with BigQuery: De-identify 76 million IP addresses in 20 seconds | Google Cloud Blog
Posted on January 29, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.