Event Tracking and Analytics via Ruby on Rails, DynamoDB (with Streams), Kinesis Firehose and Athena and CloudWatch Dashboard!
Andrew Brown 🇨🇦
Posted on August 11, 2019
p.s. This is not an end-to-end guide, I documented my journey and figured I would publish with what I had time to document instead of vaulting this knowledge in our private Knowledgebase. Then I happen to put a tech talk together so between the video and content below I hope it helps you create your own Event Tracking and Analytics on AWS.
AWS SDK Initializer
Since we only need DynamoDB add to your Gemfile:
gem 'aws-sdk-dynamodb'
To make it easier to work with the SDK I have in an initializer RAILS_ROOT/config/intializers/aws.rb
You will notice I am aggressively setting credentials. The SDK is supposed to pick up these Environment Variables implicitly but I found in practice it did not when I wrote this. Maybe you don't have to be as verbose here like me here.
creds = Aws::Credentials.new(
ENV['AWS_ACCESS_KEY_ID'],
ENV['AWS_SECRET_ACCESS_KEY']
)
Aws.config.update credentials: creds
module DynamoDB
def self.resource
@@dynamodb ||= Aws::DynamoDB::Resource.new({
region: 'us-east-1',
credentials: Aws::Credentials.new(
ENV['AWS_ACCESS_KEY_ID'],
ENV['AWS_SECRET_ACCESS_KEY']
)})
@@dynamodb
end
end
Probably should be storing the region as an Environment Variable in Figaro
When we want to use DynamoDB all we have to do is the following:
DynamoDB.resource.client.put_item({
# ...
})
Primary and Sort
Very unique ids such as User IDs make good Primary keys since its better for distribution across partitions.
Dates make very good Sort keys. Your table when queried will be stored ASC based on your SORT key. Explore the DynamoDB table explorer so you have an idea of the limitations of how you can filter.
Notice for Primary we only have the ability to do =
and for Sort we have many options.
There are more advanced filter options in the documentation if you can make sense of it.
Tracker
First I define how I want to use my tracker before writing a module.
So this would write to the Dynamo
Putting data:
Tracker::Put.event({
user_id: user.id,
event_type: 'login',
user_agent: request.user_agent,
ip_address: request.remote_ip
})
Getting Data
@recent_activity = Tracker::Get.recent_activity @model.id
For the Putting data probably want to put this in an ActiveJob since it's possible having these event calls littered throughout your application can cause the code to block resulting in latency experienced by your team. I think DynamoDB blocks as it waits for a response even though we don't need one.
I created a new module in my lib directory eg. RAILS_ROOT/lib/tracker.rb
# This class is responsible for writing event data
# to various DynamoDB tables and fetching that data
# for display.
module Tracker
class Entity
include ActiveModel::Validations
def initialize(opts={})
opts.each { |k,v| instance_variable_set("@#{k}", v) }
end
attr_accessor :user_id,
:event_type,
:user_agent,
:ip_address,
:event_at,
:event_id
validates :user_id , presence: true, numericality: { only_integer: true }
validates :event_type , presence: true, inclusion: { in: %w(
login
material-view
)}
validates :user_agent , presence: true
validates :ip_address , presence: true
validates :event_at , presence: true
def event_at
@event_at || Time.now.iso8601
end
end
class Put
def self.event attrs={}
entity = Tracker::Entity.new attrs
unless entity.valid?
raise ArgumentError, "Tracker Entity invalid permissions"
end
DynamoDB.resource.client.put_item({
item: {
'user_id' => entity.user_id,
'ip_address' => entity.ip_address,
'user_agent' => entity.user_agent,
'event_id' => entity.event_id,
'event_type' => entity.event_type,
'event_at' => Time.now.iso8601 # sort key
},
# We don't care about returning cosumed capactiy
# We can handle looking event tracking data and
# don't need to be alerted.
return_consumed_capacity: 'NONE',
table_name: 'exampro-events'
})
end
end ## Put
class Get
def self.recent_activity user_id
result =
DynamoDB.resource.client.query({
expression_attribute_values: {
":user_id" => user_id
},
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.KeyConditionExpressions
key_condition_expression: "user_id = :user_id",
limit: 50,
projection_expression: 'ip_address,event_type,event_at,user_agent', # select statement
scan_index_forward: false, # descending order
table_name: 'exampro-events'
}).items
result.each do |t|
t['event_at'] = DateTime.parse(t['event_at'])
unless t['user_agent'].blank?
t['user_agent'] = DeviceDetector.new(t['user_agent'])
end
end
result
end
def self.logins user_id, event_type
result =
DynamoDB.resource.client.query({
expression_attribute_values: {
":user_id" => user_id,
":event_type" => event_type
},
key_condition_expression: "user_id = :user_id",
filter_expression: "event_type = :event_type",
limit: 10,
projection_expression: 'ip_address,event_type,event_at,user_agent', # select statement
scan_index_forward: false, # descending order
table_name: 'exampro-events'
}).items
result.each do |t|
t['event_at'] = DateTime.parse(t['event_at'])
unless t['user_agent'].blank?
t['user_agent'] = DeviceDetector.new(t['user_agent'])
end
end
result
end
end ## Get
end
Tracker::Entity
I have this Entity class. Its purpose is to validate the format of arguments. I would probably enrich this further in the future with a metadata attribute.
Tracker::Put
I have a class for Put
which for writing to DynamoDB. Currently, I only have one method but may in the future add more.
Tracker::Get
I have another class called Get
which queries data from DyanmoDB
DateTime as String
Another thing to note is that I am converting the time to a string Time.now.iso8601
. DynamoDB does not have a DateTime datatype.
This StackOverflow does a good explaining what to consider when choosing what format to use your dates.
I care about readability so ISO 8601 is a good format.
I don't care about using TTL (Time to live) since I don't need to expire records from my DynamoDB to prune the DB.
You have DynamoDB only stream TTL events which is interesting.
What matters most is when filtering the date I can use the BETWEEN
to filter between two ranges.
scan_index_forward
We are using scan_index_forward: false
to change the sort to be DESC instead of ASC.
projection_expression
We only want specific attributes returned from the database so thati s the purposes of:
projection_expression: 'ip_address,event_type,event_at,user_agent'
return_consumed_capacity
We are using return_consumed_capacity: 'NONE'
because I don't care about getting a response back. If there was a capacity issue I have an alarm where I would take action. Since this is event data I don't are some event tracking is dropped.
DeviceDetector
We are passing our user_agent through DeviceDetector gem eg.
DeviceDetector.new(t['user_agent'])
It so in our dashboard for our app I can get human readable values such as if they are on a phone/desktop, windows/mac or using a specific web browser.
DynamoDB
Enabling DyanmoDB Streams
We are going to need to turn on DynamoDB streams.
To have streams trigger a lambda under the Triggers tab we will add an existing function. You may need to click more to find this Triggers tab.
When a record is inserted into DynamoDB. Streams will allow us to pass the puts in batches to a Lambda function.
We only want New Images. I believe a record it first put its an "Old Image" and does not contain all data. Then when all data is written it is a "New Image".
We will leave it for batches of 100. This doesn't mean the Streams will wait until it has 100 records to send but can send up to 100 at a time.
We can see our Lambda is attached. If an error occurs on this Lambda sometimes its smart to check here to find out at a glance if the Lambda is failing.
Here we can see the records in our DynamoDB table
We need to create a policy which allows Lambda to accept data from a specific DynamoDB Stream.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:GetRecords"
],
"Resource": "arn:aws:dynamodb:us-east-1:ACCOUNT-ID:table/exampro-events/stream/2019-06-30T11:17:05.770"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "dynamodb:ListStreams",
"Resource": "*"
}
]
}
We need to allow our lambda function to stream data to our Kinesis Firehose
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Resource": "arn:aws:firehose:us-east-1:ACCOUNT-ID:deliverystream/exampro-events"
}
]
}
Then I attach these two new policies to a role which is then attached to my Lambda function.
Lambda that streams data from DynamoDB to Firehose
Since DynamoDB Streams can deliver data in batches we are going to use the put_record_batch
We need to supply the delivery_stream_name
. Probably should place this in Environment Variables instead of how I'm hardcoding here.
Even though we are never going to update DynamoDB records we are going to only publish events to stream for INSERT
require 'json'
require 'aws-sdk-firehose'
def lambda_handler(event:, context:)
records = []
event['Records'].each do |t|
if t['eventName'] == 'INSERT'
records.push({data: {
user_id: t['dynamodb']['NewImage']['user_id']['N'],
event_at: t['dynamodb']['NewImage']['event_at']['S'],
event_id: t['dynamodb']['NewImage']['event_id']['N'],
event_type: t['dynamodb']['NewImage']['event_type']['S'],
ip_address: t['dynamodb']['NewImage']['ip_address']['S'],
user_agent: t['dynamodb']['NewImage']['user_agent']['S']
}.to_json + "\n" })
end
end
json = {records_size: records.size}.to_json
puts json
unless records.size.zero?
firehose = Aws::Firehose::Resource.new
resp = firehose.client.put_record_batch({
delivery_stream_name: "exampro-events", # required
records: records
})
json = {failed_put_count: resp.failed_put_count}.to_json
puts json
end
return true
end
Json records on newline
You will notice I am adding a new line at then of our json string.
.to_json + "\n"
This is very important because when Athena reads our json files it expects each json record to be on its own line. If they are all on one line it will read only one record.
Json Log Events
Notice that I am converting my hash to json and then using puts
to log it. This is how you log Json events so we can then use a Metric Filter for later. You cannot just puts a hash, you have to convert it to json.
json = {records_size: records.size}.to_json
puts json
SDK vs KPL
If you're wondering why I'm not using KPL (Kinesis Producer Library) I could have but I would have had to use a Java Lambda and its configuration is more complicated. KPL is more efficient but for our use-case we don't need to KPL. You can read more about KPL in the documentation
Metric Filter
Based on the Filter and Pattern Syntax under Publishing Numerical Values Found in Log Entries we can select an attribute of a JSON Log Event and then log it.
So for the metric filter, we want to filter json log events with an attribute of records_size greater than 0
{ $.records_size > 0 }
For the metric value, we will supply the attribute we want it to then collect
$.records_size
Define Metric Filter
View created metric filter
You cannot add a Metric Filter to your Cloudwatch Dashboard until data has been published to it.
How to find metric filter after its created
If you ever need to find this filter metric its shows up under Logs as a column in the logs table.
Kinesis Firehose
Pricing
Kinesis Firehose is incredibly affordable at $0.029/GB so 500 GB = $14 USD. Other Kinesis can have a very expensive base cost.
But what about Kinesis Data Analytics?
You will see there is another AWS Kinesis service called Kinesis Data Analytics and you make think you nee this expensive service based on its name.
Kinesis Data Analytics lets you run queries (SQL) on incoming streaming data. I am thinking that Kinesis Data Analytics might be faster at proactivity producing real-time analytics because it crunches data as it comes in.
Using Firehose we just dump out data to S3. When someone needs to see an up to date dashboard we can just query Athena with a Lamba function, dump the results back into DynamoDB or maybe as a json file and then display that to the user. We can decide to only generate new analytics only if the last version compiled is out of date by say 5 mins.
Creating Firehose
The dashboard is a bit confusing so you look where I created my Firehose stream.
We could transform our data via Kinesis but for us, this is not necessary since we can apply our transformation prior Lambda and we do. If you have data coming from multiple sources you may want this lambda to normalize the data as guarantee its consistent. Since we only ingest data from one lambda function this is a minimal risk for us.
I have this option set to disabled but I just wanted to show you then the data can be transformed by Glue into Parquet files which are much more performant when using Athena. This is not a pain point for us currently so we are going to leave the data as is which is json. Also, I didn't feel like calculating the cost of Glue here at scale.
I had read somewhere in the docs that compression was needed for encryption in a specific use-case. When I used Glue create table using a crawler on snappy compression it produced a bizarre schema so I rolled back on this and just encrypted using KMS.
Since I am storing IP addresses I consider this sensitive data. We run Macie, so uncertain if it would alert on this if unencrypted.
The reason we collect IP Addresses for our click event data is to detect abnormal behaviour of a user. Such as account sharing, scraping or etc.
Athena
Database and Table via Glue Catalog and Glue Crawler
This is one way for you to create your Database and table.
So create a database. I am not going to recommend this way but showing you it can be done.
We will also need a table. We could easily define the columns manually but if we already have data in our S3 bucket we can just use a crawler once to determine that schema for us. So you choose your datastore being the s3 bucket and it does the rest.
If we check out table it should have determined our schema.
Database and Athena via SQL
When using Glue via automatic cralwer it would guess the wrong column types and did not partition based on date. We can just create what we need directly in Athena.
Create our database
CREATE DATABASE exampro_events
LOCATION 's3://exampro-events/';
And now create the table
CREATE EXTERNAL TABLE exampro_events.events (
user_id INT,
event_at STRING,
event_id INT,
event_type STRING,
user_agent STRING,
ip_address STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('paths' = 'user_id,event_at,event_id,event_type,user_agent,ip_address')
LOCATION 's3://exampro-events/';
Ensure the location ends with a forward slash or you'll get an error about the path.
ROW FORMAT SERDE
tells it the data will be in JSON format.
A SerDe (Serializer/Deserializer) is a way in which Athena interacts with data in various formats.
Notice that for event_at
I set it as STRING instead of TIMESTAMP. iso8601 is not the correct format for date, and we could change all our code to comply though since Athena has this sql function from_iso8601_timestamp
I'm not concerned unless I run into a performance or limitations on the ability to query.
Athena expects this format: 2008-09-15 03:04:05.324
Partitions
You can partition your tables on things such as date eg. Year 2020. This might be something I want to do in the future but for the time being, I am ignoring partitions.
Querying in Athena
To get started click on the ellipses beside the table and Preview Table. It will create the query and show you some data so you can save yourself the trouble to type all this yourself.
Writing Athena queries can be a painful experience even with prior SQL knowledge. Read the docs to help you learn the SQL syntax
CloudWatch Dashboard
If something goes wrong we want to have a CloudWatch Dashboard to gain some insight.
We are going to add a widget
Here we can see our custom Metric. If you don't see it here its because data has yet to ever be collected so ensure data is being logged and your metric filter is correctly filtered.
So there is our record-size. The other filter is just an old test one.
So here is my line graph. I don't know how useful it is but just getting something in here. Remember to Save dashboard !!!!
In DynamoDB there is the metric which could be useful to compare against the records which could be filtered in our Lambda.
Added a few more widgets.
We can see how many records are streaming, how many records the lambda passes to Firehose, how many incoming records were received, and how many were delivered to S3. Still missing Athena. We will get there.
Fake Data via Rake Command
I wanted some login data for the past 7 days so I can compose my Athena query to group logins per day for the week.
Rake commands are great for this. Also, I suppose you could test your read/write capacity using this method.
require 'faker'
namespace :track do
namespace :put do
task :login => :environment do
50.times.each do |t|
ip_address = Faker::Internet.public_ip_v4_address
user_agent = Faker::Internet.user_agent
event_at = rand(1..7).days.ago.iso8601
v = [0..4].sample
Tracker::Put.event({
user_id: 1,
event_type: 'login',
user_agent: user_agent,
ip_address: ip_address,
event_at: event_at
})
puts "#{ip_address} - #{user_agent} - #{event_at}"
sleep 0.2 # sleep 1/5th of a second
end # x.times
end #login
end #put
namespace :get do
task :logins => :environment do
results = Tracker::Get.logins 1
puts results
end #logins
end #get
end # track
So here I am running my rake command to create logins:
~/Sites/exampro-projects/exampro[master]: rake track:put:login
11.174.250.238 - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A - 2019-06-29T16:46:23Z
143.251.23.90 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-23T16:46:24Z
57.161.250.74 - Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko - 2019-06-29T16:46:24Z
170.128.151.22 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-29T16:46:24Z
65.166.116.179 - Mozilla/5.0 (Windows NT x.y; Win64; x64; rv:10.0) Gecko/20100101 Firefox/10.0 - 2019-06-29T16:46:25Z
54.85.94.162 - Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko - 2019-06-24T16:46:25Z
56.33.98.190 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-23T16:46:25Z
139.173.42.58 - Mozilla/5.0 (Windows NT x.y; Win64; x64; rv:10.0) Gecko/20100101 Firefox/10.0 - 2019-06-29T16:46:25Z
107.234.132.121 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts
Posted on August 11, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.