Saving Particle Events to Azure Table Storage - A Deep Dive Into Building a Pool Bot
Jason C
Posted on May 5, 2020
This is fourth article in a multipart series. In Part 1 we talked about the idea for this pool bot, Part 2 covered the hardware behind it. In Part 3 we push data up to the Particle Cloud.
Particle makes it super easy to publish events that Subscriptions or Integrations can pick up and process, but Particle does not store these events, we'll need to build that part.
This article will cover:
- Azure IoT Hub
- Particle Webhook
- Azure Functions
- Azure Table Storage
- Aggregate Historic Sensor Data
- Time Triggers / CRON Job
Azure IoT Hub
When I first built this, I saw that Particle had partnered with Microsoft and they have first class support for connecting to an Azure IoT Hub.
So, I spun up an Azure IoT Hub, connected it to a Service Bus and used an Azure Function with a Queue Trigger to store the data into Azure Table Storage.
This worked great! But a free tier IoT Hub can only process 8,000 messages a day, I had to upgrade to the Basic tier which costs $10 a month. (Sending a message every 5 seconds is over 17,000 messages a day).
All of this turned out to be overkill for my needs, so I simplified it to use a Web Hook instead.
Particle Webhook
Particle supports a few different types of Integrations:
Setting up the webhook on the Particle side is straightforward. They have a nice web editor that allows you input these settings into a form, I'm showing the template view here just so you can see all the settings at once:
A few quick call outs:
- The event name must match the name you publish from the device. You will need to configure a webhook for each event name you want to capture. I my case, I set up two hooks and pointed them both to the same url.
- In the "json" property you may notice some mustache variables, You can read more about that here
- Even though my device is publishing a JSON string, it's just a string in the
PARTICLE_EVENT_VALUE
variable. There are fancy ways to parse this out before sending to the webhook, but we'll just pass it along and deal with a string of json inside the backend later.
Azure Functions
If you haven't worked with Azure Functions I suggest checking them out. They are intended to be small pieces of code that can be triggered by different events (http, queues, timers, etc). They can also have other azure resources be bound as input or outputs.
We are going to build a function that is triggered by HTTP and has a Table Storage input binding.
[FunctionName("SensorData_PostData")]
public static async Task PostData(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "SensorData/PostData")] HttpRequestMessage req,
[Table("Data", Connection = "AzureWebJobsStorage")] CloudTable dataTable,
ILogger logger
)
{
try
{
var body = await req.Content.ReadAsStringAsync();
logger.LogInformation(body);
var incomingValues = ParseItem(body);
if (incomingValues.Event == "SensorData")
{
var data = JsonConvert.DeserializeObject<SensorData>(incomingValues.Data.ToString());
await TableStorageRepo.SaveSensorData(dataTable, data);
}
else
{
var newData = new EventData(incomingValues.DeviceId, incomingValues.Event, incomingValues.DataDictionary);
await TableStorageRepo.SaveEvent(dataTable, newData);
}
}
catch (System.Exception ex)
{
logger.LogError("ERROR Posting Pool Bot Service Data", ex);
}
}
The function above will be triggered whenever a http post is sent to the route SensorData/PostData
. Azure will automatically bind a CloudTable
for us based on the connection string stored in the function's settings.
ParseItem
is a helper function I wrote that will turn the request body into an object that inherits from this interface:
public interface IDeviceMessage
{
string DeviceId { get; set; }
string Event { get; set; }
DateTimeOffset? Date { get; set; }
object Data { get; set; }
Dictionary<string, object> DataDictionary { get; }
}
We will then peek at the event name to know if this a sensor data event, or a string message event like "ALERT Low water! Turning pump OFF". We then pass the data to the TableStorageRepo
which we'll talk about below.
Azure Table Storage
Azure Table Storage is a Nosql key-value store for massive semi-structured datasets. It's made for simple retrieval and fast inserts. Records do not need to have the same structure and can be up 1 megabyte per row. While each row can be a different shape you can't store complex objects. One workaround is to store complex objects as JSON strings.
In order to use Table Storage effectitly, you'll need to get out of the relational mindset! If you are looking to join rows of different types and store normalized data structures, this isn't the place for you.
Partitioning Data
A quick sidestep to talk about partitioning data (this could be a whole article in itself, but I'll try to keep it brief here). Every record must have a PartitionKey and RowKey (think composite primary key). Partition key and row key combinations must be unique (Just like a PK). Operations can only deal with a single Partition Key at a time (but can cross RowKeys). All keys are strings and will be compared as strings when filtering data (This can make selecting ranges a bit awkward).
Think about how you want to get the data before saving it. In my case, I want to find the current/latest sensor data and maybe find historic records by a date/time range.
For sensor data, my partitioning strategy will be two partitions: SensorData_Latest
and SensorData_Historic
. The Latest partition will only have one row at any given time, so the row key will be a constant value. The Historic partition will have a row key based on the timestamp of the data. If I was building this as a product I might also partition this by device ID or customer, but I only have one device and one pool.
Saving Data
Since we are running this code in an Azure Function, we'll want to include the microsoft.azure.webjobs.extensions.storage
NuGet package and use the Microsoft.WindowsAzure.Storage.Table
namespace. This packages gives us a nice wrapper around the storage apis and makes our code pretty simple.
public static async Task SaveSensorData(CloudTable dataTable, SensorData data)
{
data.PartitionKey = "SensorData_Latest";
data.RowKey = "SensorData_Latest";
var saveOperation = TableOperation.InsertOrMerge(data);
await dataTable.ExecuteAsync(saveOperation);
data.PartitionKey = "SensorData_Historic";
data.RowKey = "SensorData_Historic_" + data.Time;
var saveHistOperation = TableOperation.InsertOrReplace(data);
await dataTable.ExecuteAsync(saveHistOperation);
}
In the code above, we will save the same data twice. Once in the Latest partition and once in the Historic. Since we crossed partitions, these need to be two seperate data table operations. Remember, store data in way that makes it easy to read, not relational. Storage is cheap!
Another thing to note, in order to pass the SensorData
class to the table operations it needs to inherit from Microsoft.WindowsAzure.Storage.Table.TableEntity
. This base class defines the PartitionKey
and RowKey
properties.
Now we can see the data storage in the table:
Retrieving Data
Getting a single row of data back out of table storage is just as easy as saving, we'll just create a Retrieve Operation for our partition and row key.
public static async Task<SensorData> GetLatestSensorData(CloudTable dataTable)
{
string partition = "SensorData_Latest";
string rowId = "SensorData_Latest";
var retrieveOperation = TableOperation.Retrieve<SensorData>(partition, rowId);
var currentRecord = await dataTable.ExecuteAsync(retrieveOperation);
var data = currentRecord.Result as SensorData;
return data;
}
Aggregate Historic Sensor Data
I'm sending data every 5 seconds, that's 17,280 records a day! Yea, sure storage is cheap but in 90 days we'll have 1.5 million rows! I don't necessarily need to know every single temperature reading every 5 seconds. I decided to aggregate historic data into 15 minutes buckets.
Why 15 every minutes?
- If we want to build a weekly chart, pulling raw historical data for a week would be nearly 121,000 rows!
- If we put data into 15 minute buckets, we'd end up with 4 reading an hour; 96 readings a day; 672 readings a week.
- Table Storage will only return a maximum of 1,000 rows per batch, it will issue continuation tokens and we'd need to make another call to get more data.
- There is a charge per operation!
Let's look at some logic to build these buckets. First let's grab some historic data.
public static async Task<bool> AggregateHistoricSensorData(CloudTable dataTable)
{
//get last full bucket with a 3 minute buffer added
var enddate = RoundDown(new DateTimeOffset(DateTime.Now.AddMinutes(-3)), TimeSpan.FromMinutes(15));
string partition = "SensorData_Historic";
string rowIdStart = "SensorData_Historic_";
string rowIdEnd = "SensorData_Historic_" + enddate.ToUnixTimeSeconds().ToString();
// ... more code shortly ...
First we call the RoundDown
function that will simply return a DateTime
rounded down to the nearest 15 minutes. i.e. If we pass in 6:14, it would return 6:00; 6:16 would return 6:15 etc. This is used to build our row key filter.
public static DateTimeOffset RoundDown(this DateTimeOffset dt, TimeSpan d)
{
return new DateTime(((dt.Ticks + 1) / d.Ticks) * d.Ticks);
}
Now we can take that partition and row filters and build a query.
// ... continuing the AggregateHistoricSensorData function ...
TableQuery<SensorData> query = new TableQuery<SensorData>().Where(
TableQuery.CombineFilters(
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(nameof(SensorData.PartitionKey), QueryComparisons.Equal, partition),
TableOperators.And,
TableQuery.GenerateFilterCondition(nameof(SensorData.RowKey), QueryComparisons.GreaterThanOrEqual, rowIdStart)
),
TableOperators.And,
TableQuery.GenerateFilterCondition(nameof(SensorData.RowKey), QueryComparisons.LessThan, rowIdEnd)
)
);
List<SensorData> entities = await GetRange(dataTable, query);
That's a whole lot of extension methods to make a simple where clause. This basically works out to be: WHERE PartitionKey = 'SensorData_Historic' AND RowKey >= 'SensorData_Historic_' AND RowKey < 'SensorData_Historic_1588654800'
Remember all keys are strings! Storing the historic records with a Unix epoch suffix makes this comparison work. This query will grab all history before the timestamp of 2020-05-05 5AM (not just the last 15 minutes). Later we'll clean up the processed records so they will only be aggregated once. But we grab all the records incase our job hasn't ran in a while and we need to process older buckets.
Let's take a quick peek at the GetRange
helper. This just checks if we were sent a continuation token, which means we'd need to make another call to get the rest of our data. Table Storage will return "up to" 1000 rows per segment. But it should be noted, you are not guaranteed to get 1000 per batch. Depending on your query and key strategy, you may get less then a 1000 and still get a token.
private static async Task<List<T>> GetRange<T>(CloudTable dataTable,
TableQuery<T> query) where T : ITableEntity, new()
{
List<T> entities = new List<T>();
TableContinuationToken next = null;
do
{
var results = await dataTable.ExecuteQuerySegmentedAsync(query, next);
entities.AddRange(results.Results);
next = results.ContinuationToken;
} while (next != null);
return entities;
}
Cool, so now we have all the historical data to make our bucket(s), lets throw some linq at it to group and track min/max/averages of the data.
var aggs = entities
.GroupBy(i => new
{
DateRounded = RoundDown(FromTimeString(i.RowKey.Replace("SensorData_Historic_", "")), TimeSpan.FromMinutes(15))
})
.Select(g => new AggregateSensorData
{
Range = AggregatRange.QuaterHour,
RangeStart = g.Key.DateRounded,
PartitionKey = "SensorData_QuarterHour",
RowKey = "SensorData_QuarterHour_" + g.Key.DateRounded.ToString("yyyy_MM_dd_HH_mm"),
GroundTempAvg = Math.Round(g.Average(i => i.GroundTemp), 3),
GroundTempMin = g.Min(i => i.GroundTemp),
GroundTempMax = g.Max(i => i.GroundTemp),
GroundTempMinTime = g.Last(w => w.GroundTemp == g.Min(i => i.GroundTemp)).Time,
GroundTempMaxTime = g.Last(w => w.GroundTemp == g.Max(i => i.GroundTemp)).Time,
IntakeTempAvg = Math.Round(g.Average(i => i.IntakeTemp), 3),
IntakeTempMin = g.Min(i => i.IntakeTemp),
IntakeTempMax = g.Max(i => i.IntakeTemp),
IntakeTempMinTime = g.Last(w => w.IntakeTemp == g.Min(i => i.IntakeTemp)).Time,
IntakeTempMaxTime = g.Last(w => w.IntakeTemp == g.Max(i => i.IntakeTemp)).Time,
ReturnTempAvg = Math.Round(g.Average(i => i.ReturnTemp), 3),
ReturnTempMin = g.Min(i => i.ReturnTemp),
ReturnTempMax = g.Max(i => i.ReturnTemp),
ReturnTempMinTime = g.Last(w => w.ReturnTemp == g.Min(i => i.ReturnTemp)).Time,
ReturnTempMaxTime = g.Last(w => w.ReturnTemp == g.Max(i => i.ReturnTemp)).Time,
PhAvg = Math.Round(g.Average(i => i.Ph), 3),
PhMin = g.Min(i => i.Ph),
PhMax = g.Max(i => i.Ph),
PhMinTime = g.Last(w => w.Ph == g.Min(i => i.Ph)).Time,
PhMaxTime = g.Last(w => w.Ph == g.Max(i => i.Ph)).Time,
FilterPresureAvg = Math.Round(g.Average(i => i.FilterPresure), 3),
FilterPresureMin = g.Min(i => i.FilterPresure),
FilterPresureMax = g.Max(i => i.FilterPresure),
FilterPresureMinTime = g.Last(w => w.FilterPresure == g.Min(i => i.FilterPresure)).Time,
FilterPresureMaxTime = g.Last(w => w.FilterPresure == g.Max(i => i.FilterPresure)).Time,
LevelAvg = Math.Round(g.Average(i => i.Level), 3),
LevelMin = g.Min(i => i.Level),
LevelMax = g.Max(i => i.Level),
LevelMinTime = g.Last(w => w.Level == g.Min(i => i.Level)).Time,
LevelMaxTime = g.Last(w => w.Level == g.Max(i => i.Level)).Time,
PumpStatusOn = g.Count(i => i.PumpStatus == 1),
PumpStatusOff = g.Count(i => i.PumpStatus == 0),
RangeDataPoints = g.Count(),
}).ToArray();
Now we have an array of new entities with a Partition Key of "SensorData_QuarterHour" and Row Keys like "SensorData_QuarterHour_2020_05_05_04_45" (yyyy_MM_dd_HH_mm).
Table storage has batches operation, but limits them to 100 operations at a time. So we'll chop this array into 100 record batches and save these new rows.
int rowOffset = 0;
while (rowOffset < aggs.Count())
{
// next batch
var rows = aggs.Skip(rowOffset).Take(100).ToList();
rowOffset += rows.Count;
var batch = new TableBatchOperation();
foreach (var row in rows)
{
batch.InsertOrReplace(row);
}
try
{
await dataTable.ExecuteBatchAsync(batch);
}
catch (Exception ex)
{
Console.WriteLine("Error Saving Aggregated Sensor Data . " + ex);
}
}
Now since we've processed the raw historic data, we'll want to delete those rows so they don't get processed again.
Once note about delete operations: It requires an ETag to be sent! ETags are used for optimistic concurrency. For this we'll just pass *, meaning we don't care if the record has changed since we pulled it.
//delete old data
rowOffset = 0;
while (rowOffset < entities.Count())
{
// next batch
var rows = entities.Skip(rowOffset).Take(100).ToList();
rowOffset += rows.Count;
var batch = new TableBatchOperation();
foreach (var row in rows)
{
row.ETag = "*";
batch.Delete(row);
}
try
{
await dataTable.ExecuteBatchAsync(batch);
}
catch (Exception ex)
{
Console.WriteLine("Error Deleting Historic Sensor Data . " + ex);
}
}
return true;
}
Time Triggers / CRON Job
We built all the logic to parse the history and create the buckets, now we just need to call it every once and while. We could expose another http endpoint and having something call it, or we could make a TimeTrigger function!
[FunctionName("AggregateHistoricTrigger")]
public static async Task Run(
[TimerTrigger("0 */20 * * * *")]TimerInfo myTimer,
[Table("Data", Connection = "AzureWebJobsStorage")] CloudTable dataTable,
ILogger log)
{
log.LogInformation($"Aggregate Historic Sensor Data Timer trigger function executing at: {DateTimeOffset.Now}");
await TableStorageRepo.AggregateHistoricSensorData(dataTable);
}
I'm not going to go into detail about the time argument being set here, just know that "0 */20 * * * *"
means execute every 20 minutes. You can see this helpful cheat sheet to figure out the CRON time you might need.
Now the data is saved in Azure Table Storage! The next part of this series we'll cover building a Vue PWA to display this data as information.
Posted on May 5, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.