Function pipelines: Building functional programming into PostgreSQL using custom operators
davidkohn88
Posted on November 24, 2021
Table of contents
- Function pipelines: why are they useful?
- How we built function pipelines without forking PostgreSQL
- A custom data type: the
timevector
- A custom operator:
->
- Custom functions: pipeline elements
timevector
transformstimevector
finalizers- Aggregate accessors and mutators
- Next steps
We are announcing function pipelines, a new capability that introduces functional programming concepts inside PostgreSQL (and SQL) using custom operators.
Function pipelines radically improve the developer ergonomics of analyzing data in PostgreSQL and SQL, by applying principles from functional programming and popular tools like Python’s Pandas and PromQL.
At Timescale our mission is to serve developers worldwide, and enable them to build exceptional data-driven products that measure everything that matters: e.g., software applications, industrial equipment, financial markets, blockchain activity, user actions, consumer behavior, machine learning models, climate change, and more.
We believe SQL is the best language for data analysis. We’ve championed the benefits of SQL for several years, even back when many were abandoning the language for custom domain-specific languages. And we were right - SQL has resurged and become the universal language for data analysis, and now many NoSQL databases are adding SQL interfaces to keep up.
But SQL is not perfect, and at times can get quite unwieldy. For example,
SELECT device id,
sum(abs_delta) as volatility
FROM (
SELECT device_id,
abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts))
as abs_delta
FROM measurements
WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id;
Pop quiz: What does this query do?
Even if you are a SQL expert, queries like this can be quite difficult to read - and even harder to express. Complex data analysis in SQL can be hard.
Function pipelines let you express that same query like this:
SELECT device id,
sum(abs_delta) as volatility
FROM (
SELECT device_id,
abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts))
as abs_delta
FROM measurements
WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id;
Now it is much clearer what this query is doing. It:
- Gets the last day’s data from the measurements table, grouped by
device_id
- Sorts the data by the time column
- Calculates the delta (or change) between values
- Takes the absolute value of the delta
- And then takes the sum of the result of the previous steps
Function pipelines improve your own coding productivity, while also making your SQL code easier for others to comprehend and maintain.
Inspired by functional programming languages, function pipelines enable you to analyze data by composing multiple functions, leading to a simpler, cleaner way of expressing complex logic in PostgreSQL.
And the best part: we built function pipelines in a way that is fully PostgreSQL compliant - we did not change any SQL syntax - meaning that any tool that speaks PostgreSQL will be able to support data analysis using function pipelines.
How did we build this? By taking advantage of the incredible extensibility of PostgreSQL, in particular: custom types, custom operators, and custom functions.
In our previous example, you can see the key elements of function pipelines:
-
Custom data types: in this case, the
timevector
, which is a set of(time, value)
pairs. -
Custom operator:
->
, used to compose and apply function pipeline elements to the data that comes in. - And finally, custom functions: called pipeline elements. Pipeline elements can transform and analyze
timevector
s (or other data types) in a function pipeline. For this initial release, we’ve built 60 custom functions! (Full list here).
We’ll go into more detail on function pipelines in the rest of this post, but if you just want to get started as soon as possible, the easiest way to try function pipelines is through a fully managed Timescale Cloud service. Try it for free (no credit card required) for 30 days.
Function pipelines are pre-loaded on each new database service on Timescale Cloud, available immediately - so after you’ve created a new service, you’re all set to use them!
If you prefer to manage your own database instances, you can install the timescaledb_toolkit
into your existing PostgreSQL installation, completely for free.
We’ve been working on this capability for a long time, but in line with our belief of “move fast but don’t break things”, we’re initially releasing function pipelines as an experimental feature - and we would absolutely love to get your feedback. You can open an issue or join a discussion thread in GitHub (And, if you like what you see, GitHub ⭐ are always welcome and appreciated too!).
We’d also like to take this opportunity to give a huge shoutout to pgx
, the Rust-based framework for building PostgreSQL extensions it handles a lot of the heavy lifting for this project. We have over 600 custom types, operators, and functions in the timescaledb_toolkit
extension at this point; managing this without pgx
(and the ease of use that comes from working with Rust) would be a real bear of a job.
Function pipelines: why are they useful?
In the Northern hemisphere (where most of Team Timescale sits, including your authors), it is starting to get cold at this time of the year.
Now imagine a restaurant in New York City whose owners care about their customers and their customers’ comfort. And you are working on an IoT product designed to help small businesses like these owners minimize their heating bill while maximizing their customers happiness. So you install two thermometers, one at the front measuring the temperature right by the door, and another at the back of the restaurant.
Now, as many of you may know (if you’ve ever had to sit by the door of a restaurant in the fall or winter), when someone enters, the temperature drops - and once the door is closed, the temperature warms back up. The temperature at the back of the restaurant will vary much less than at the front, right by the door. And both of them will drop slowly down to a lower set point during non-business hours and warm back up sometime before business hours based on the setpoints on our thermostat. So overall we’ll end up with a graph that looks something like this:
As we can see, the temperature by the front door varies much more than at the back of the restaurant. Another way to say this is the temperature by the front door is more volatile. Now, the owners of this restaurant want to measure this because frequent temperature changes means uncomfortable customers.
In order to measure volatility, we could first subtract each point from the point before to calculate a delta. If we add this up directly, large positive and negative deltas will cancel out. But, we only care about the magnitude of the delta, not its sign - so what we really should do is take the absolute value of the delta, and then take the total sum of the previous steps.
We now have a metric that might help us measure customer comfort, and also the efficacy of different weatherproofing methods (for example, adding one of those little vestibules that acts as a windbreak).
To track this, we collect measurements from our thermometers and store them in a table:
CREATE TABLE measurements(
device_id BIGINT,
ts TIMESTAMPTZ,
val DOUBLE PRECISION
);
The device_id
identifies the thermostat, ts
the time of reading and val
the temperature.
Using the data in our measurements table, let’s look at how we calculate volatility using function pipelines.
Note: Because all of the function pipeline features are still experimental, they exist in the toolkit_experimental
schema. Before running any of the SQL code in this post you will need to set your search_path
to include the experimental schema as we do in the example below, we won’t repeat this throughout the post so as not to distract.
set search_path to toolkit_experimental, public; --still experimental, so do this to make it easier to read
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> abs() -> sum()
as volatility
FROM measurements
WHERE ts >= now()-'1 day'::interval
GROUP BY device_id;
And now we have the same query that we used as our example in the introduction.
In this query, the function pipeline
timevector(ts, val) -> sort() -> delta() -> abs() -> sum()
succinctly expresses the following operations:
- Create
timevector
s (more detail on this later) out of thets
andval
columns - Sort each
timevector
by the time column - Calculate the delta (or change) between each pair in the
timevector
by subtracting the previousval
from the current - Take the absolute value of the delta
- Take the sum of the result from the previous steps
The FROM
, WHERE
and GROUP BY
clauses do the rest of the work telling us:
- We’re getting data FROM the
measurements
table -
WHERE the
ts
, or timestamp column, contains values over the last day - Showing one pipeline output per
device_id
(the GROUP BY column)
As we noted before, if you were to do this same calculation using SQL and PostgreSQL functionality, your query would look like this:
SELECT device id,
sum(abs_delta) as volatility
FROM (
SELECT
abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts) )
as abs_delta
FROM measurements
WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id;
This does the same 5 steps as the above, but is much harder to understand, because we have to use a window function and aggregate the results - but also, because aggregates are performed before window functions, we need to actually execute the window function in a subquery.
As we can see, function pipelines make it significantly easier to comprehend the overall analysis of our data. There’s no need to completely understand what’s going on in these functions just yet, but for now it’s enough to understand that we’ve essentially implemented a small functional programming language inside of PostgreSQL. You can still use all of the normal, expressive SQL you’ve come to know and love. Function pipelines just add new tools to your SQL toolbox that make it easier to work with time-series data.
Some avid SQL users might find the syntax a bit foreign at first, but for many people who work in other programming languages, especially using tools like Python’s Pandas Package, this type of successive operation on data sets will feel natural.
And again, this is still fully PostgreSQL compliant: We introduce no changes to the parser or anything that should break compatibility with PostgreSQL drivers.
How we built function pipelines without forking PostgreSQL
We built function pipelines- without modifying the parser or anything that would require a fork of PostgreSQL- by taking advantage of three of the many ways that PostgreSQL enables extensibility: custom types, custom functions, and custom operators.
Custom data types, starting with the
timevector
, which is a set of(time, value)
pairsA custom operator:
->
, which is used to compose and apply function pipeline elements to the data that comes in.Custom functions, called pipeline elements, which can transform and analyze
timevectors
(or other data types) in a function pipeline (with 60 functions in this initial release)
We believe that new idioms like these are exactly what PostgreSQL was meant to enable. That’s why it has supported custom types, functions and operators from its earliest days. (And is one of the many reasons why we love PostgreSQL.)
A custom data type: the timevector
A timevector
is a collection of (time, value)
pairs. As of now, the times must be TIMESTAMPTZ
s and the values must be DOUBLE PRECISION
numbers. (But this may change in the future as we continue to develop this data type. If you have ideas/input, please file feature requests on GitHub explaining what you’d like!)
You can think of the timevector
as something like this:
One of the first questions you might ask is: how does a timevector
relate to time-series data? (If you want to know more about time-series data, we have a great blog post on that).
Let’s consider our example from above, where we were talking about a restaurant that was measuring temperatures, and we had a measurements
table like so:
CREATE TABLE measurements(
device_id BIGINT,
ts TIMESTAMPTZ,
val DOUBLE PRECISION
);
In this example, we can think of a single time-series dataset as all historical and future time and temperature measurements from a device.
Given this definition, we can think of a timevector
as a finite subset of a time-series dataset. The larger time-series dataset may extend back into the past and it may extend into the future, but the timevector
is bounded.
In order to construct a timevector
from the data gathered from a thermometer, we use a custom aggregate and pass in the columns we want to become our (time, value)
pairs. We can use the WHERE
clause to define the extent of the timevector
(i.e., the limits of this subset), and the GROUP BY
clause to provide identifying information about the time-series that’s represented.
Building on our example, this is how we construct a timevector
for each thermometer in our dataset:
SELECT device_id,
timevector(ts, val)
FROM measurements
WHERE ts >= now()-'1 day'::interval
GROUP BY device_id;
But a timevector
doesn't provide much value by itself. So now, let’s also consider some complex calculations that we can apply to the timevector
, starting with a custom operator used to apply these functions.
A custom operator: ->
In function pipelines, the ->
operator is used to apply and compose multiple functions, in an easy to write and read format.
Fundamentally, ->
means: “apply the operation on the right to the inputs on the left”, or, more simply “do the next thing”.
We created a general-purpose operator for this because we think that too many operators meaning different things can get very confusing and difficult to read.
One thing that you’ll notice about the pipeline elements is that the arguments are in an unusual place in a statement like:
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> abs() -> sum() as volatility
FROM measurements
WHERE ts >= now()-'1 day'::interval
GROUP BY device_id;
It appears (from the semantics) that the timevector(ts, val)
is an argument to sort()
, the resulting timevector
is an argument to delta()
and so on.
The thing is that sort()
(and the others) are regular function calls; they can’t see anything outside of their parentheses and don’t know about anything to their left in the statement; so we need a way to get the timevector
into the sort()
(and the rest of the pipeline).
The way we solved this is by taking advantage of one of the same fundamental computing insights that functional programming languages use: code and data are really the same thing.
Each of our functions returns a special type that describes the function and its arguments. We call these types pipeline elements (more later).
The ->
operator then performs one of two different types of actions depending on the types on its right and left sides. It can either:
- Apply a pipeline element to the left hand argument - perform the function described by the pipeline element on the incoming data type directly.
- Compose pipeline elements into a combined element that can be applied at some point in the future (this is an optimization that allows us to apply multiple elements in a “nested” manner so that we don’t perform multiple unnecessary passes).
The operator determines the action to perform based on its left and right arguments.
Let’s look at ourtimevector
from before: timevector(ts, val) -> sort() -> delta() -> abs() -> sum()
. If you remember from before, I noted that this function pipeline performs the following steps:
- Create
timevector
s out of thets
andval
columns - Sort it by the time column
- Calculate the delta (or change) between each pair in the
timevector
by subtracting the previousval
from the current - Take the absolute value of the delta
- Take the sum of the result from the previous steps
And logically, at each step, we can think of the timevector
being materialized and passed to the next step in the pipeline.
However, while this will produce a correct result, it’s not the most efficient way to compute this. Instead, it would be more efficient to compute as much as possible in a single pass over the data.
In order to do this, we allow not only the apply operation, but also the compose operation. Once we’ve composed a pipeline into a logically equivalent higher order pipeline with all of the elements we can choose the most efficient way to execute it internally. (Importantly, even if we have to perform each step sequentially, we don’t need to materialize it and pass it between each step in the pipeline so it has significantly less overhead even without other optimization).
Custom functions: pipeline elements
Now let’s discuss the third, and final, key piece that makes up function pipelines: custom functions, or as we call them, pipeline elements.
We have implemented over 60 individual pipeline elements, which fall into 4 categories (with a few subcategories):
timevector
transforms
These elements take in a timevector
and produce atimevector
. They are the easiest to compose, as they produce the same type.
Example pipeline:
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> map($$ ($value^3 + $value^2 + $value * 2) $$) -> lttb(100)
FROM measurements
Organized by sub-category:
Unary mathematical
Simple mathematical functions applied to the value in each point in a timevector
Element | Description |
---|---|
abs() |
Computes the absolute value of each value |
cbrt() |
Computes the cube root of each value |
ceil() |
Computes the first integer greater than or equal to each value |
floor() |
Computes the first integer less than or equal to each value |
ln() |
Computes the natural logarithm of each value |
log10() |
Computes the base 10 logarithm of each value |
round() |
Computes the closest integer to each value |
sign() |
Computes +/-1 for each positive/negative value |
sqrt() |
Computes the square root for each value |
trunc() |
Computes only the integer portion of each value |
Binary mathematical
Simple mathematical functions with a scalar input applied to the value in each point in a timevector
.
Element | Description |
---|---|
add(N) |
Computes each value plus N
|
div(N) |
Computes each value divided by N
|
logn(N) |
Computes the logarithm base N of each value |
mod(N) |
Computes the remainder when each number is divided by N
|
mul(N) |
Computes each value multiplied by N
|
power(N) |
Computes each value taken to the N power |
sub(N) |
Computes each value less N
|
Compound transforms
Transforms involving multiple points inside of a timevector
- see here for more information.
Element | Description |
---|---|
delta() |
Subtracts each value from the previous` |
fill_to(interval, fill_method) |
Fills gaps larger than interval with points at interval from the previous using fill_method
|
lttb(resolution) |
Downsamples a timevector using the largest triangle three buckets algorithm at `resolution, requires sorted input. |
sort() |
Sorts the timevector by the time column ascending |
Lambda elements
These elements use lambda expressions, which allows the user to write small functions to be evaluated over each point in a timevector
.
Lambda expressions can return a DOUBLE PRECISION
value like $$ $value^2 + $value + 3 $$
. They can return a BOOL
like $$ $time > ‘2020-01-01’t $$
. They can also return a (time, value)
pair like $$ ($time + ‘1 day’i, sin($value) * 4)$$
. You can apply them using the elements below:
Element | Description |
---|---|
filter(lambda (bool) ) |
Removes points from the timevector where the lambda expression evaluates to false
|
map(lambda (value) ) |
Applies the lambda expression to all the values in the timevector
|
map(lambda (time, value) ) |
Applies the lambda expression to all the times and values in the timevector
|
timevector
finalizers
These elements end the timevector
portion of a pipeline, they can either help with output or produce an aggregate over the entire timevector
. They are an optimization barrier to composition as they (usually) produce types other than timevector
.
Example pipelines:
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> unnest()
FROM measurements
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> time_weight()
FROM measurements
Finalizer pipeline elements organized by sub-category:
timevector
output
These elements help with output, and can produce a set of (time, value)
pairs or a Note: this is an area where we’d love further feedback, are there particular data formats that would be especially useful for, say graphing that we can add? File an issue in our GitHub!
Element | Description |
---|---|
unnest( ) |
Produces a set of (time, value) pairs. You can wrap and expand as a composite type to produce separate columns (pipe -> unnest()).*
|
materialize() |
Materializes a timevector to pass to an application or other operation directly, blocks any optimizations that would materialize it lazily. |
timevector
aggregates
Aggregate all the points in a timevector
to produce a single value as a result.
Element | Description |
---|---|
average() |
Computes the average of the values in the timevector
|
couter_agg() |
Computes the counter_agg aggregate over the times and values in the timevector
|
stats_agg() |
Computes a range of statistical aggregates and returns a 1DStatsAgg over the values in the timevector
|
sum() |
Computes the sum of the values in the timevector
|
num_vals() |
Counts the points in the timevector
|
Aggregate accessors and mutators
These function pipeline elements act like the accessors that I described in our previous post on aggregates. You can use them to get a value from the aggregate part of a function pipeline like so:
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> stats_agg() -> variance()
FROM measurements
But these don’t just work on timevector
s - they also work on a normally produced aggregate as well.
When used instead of normal function accessors and mutators they can make the syntax more clear by getting rid of nested functions like:
SELECT approx_percentile(0.5, percentile_agg(val))
FROM measurements
Instead, we can use the arrow accessor to convey the same thing:
SELECT percentile_agg(val) -> approx_percentile(0.5)
FROM measurements
By aggregate family:
Counter aggregates
Counter aggregates deal with resetting counters, (and were stabilized in our 1.3 release this week!). Counters are a common type of metric in the application performance monitoring and metrics world. All values have resets accounted for. These elements must have a CounterSummary
to their left when used in a pipeline, from a counter_agg()
aggregate or pipeline element.
Element | Description |
---|---|
counter_zero_time() |
The time at which the counter value is predicted to have been zero based on the least squares fit of the points input to the CounterSummary (x intercept) |
corr() |
The correlation coefficient of the least squares fit line of the adjusted counter value. |
delta() |
Computes the last - first value of the counter |
extapolated_delta(method) |
Computes the delta extrapolated using the provided method to bounds of range. Bounds must have been provided in the aggregate or a with_bounds call |
idelta_left() / idelta_right()
|
Computes the instantaneous difference between the second and first points (left) or last and next-to-last points (right) |
intercept() |
The y-intercept of the least squares fit line of the adjusted counter value. |
irate_left() / irate_right()
|
Computes the instantaneous rate of change between the second and first points (left) or last and next-to-last points (right) |
num_changes() |
Number of times the counter changed values. |
num_elements() |
Number of items - any with the exact same time will have been counted only once. |
num_changes() |
Number of times the counter reset. |
slope() |
The slope of the least squares fit line of the adjusted counter value. |
with_bounds(range) |
Applies bounds using the range (a TSTZRANGE ) to the CounterSummary if they weren’t provided in the aggregation step |
Percentile approximation
These aggregate accessors deal with percentile approximation. For now we’ve only implemented them for percentile_agg
and uddsketch
based aggregates. We have not yet implemented them for tdigest
.
Element | Description |
---|---|
approx_percentile(p) |
The approximate value at percentile p
|
approx_percentile_rank(v) |
The approximate percentile a value v would fall in |
error() |
The maximum relative error guaranteed by the approximation |
mean() |
The exact average of the input values. |
num_vals() |
The number of input values |
Statistical aggregates
These aggregate accessors add support for common statistical aggregates (and were stabilized in our 1.3 release this week!). These allow you to compute and rollup()
common statistical aggregates like average
, stddev
and more advanced ones like skewness
as well as 2 dimensional aggregates like slope
and covariance
. Because there are both 1D and 2D versions of these, the accessors can have multiple forms, for instance, average()
calculates the average on a 1D aggregate while average_y()
& average_x()
do so on each dimension of a 2D aggregate.
Element | Description |
---|---|
average() / average_y() / average_x() |
The average of the values. |
corr() |
The correlation coefficient of the least squares fit line. |
covariance(method) |
The covariance of the values using either population or sample method. |
determination_coeff() |
The determination coefficient (aka R squared) of the values. |
kurtosis(method) / kurtosis_y(method) / kurtosis_x(method) |
The kurtosis (4th moment) of the values using either population or sample method. |
intercept() |
The intercept of the least squares fit line. |
num_vals() |
The number of (non-null) values seen. |
sum() / sum_x() / sum_y() |
The sum of the values seen. |
skewness(method) / skewness_y(method) / skewness_x(method) |
The skewness (3rd moment) of the values using either population or sample method. |
slope() |
The slope of the least squares fit line. |
stddev(method) / stddev_y(method) / stddev_x(method) |
The standard deviation of the values using either population or sample method. |
variance(method) / variance_y(method) / variance_x(method) |
The variance of the values using either population or sample method. |
x_intercept() |
The x intercept of the least squares fit line. |
Time weighted averages
The average()
accessor may be called on the output of a time_weight()
like so:
SELECT time_weight('Linear', ts, val) -> average() FROM measurements;
Approximate count distinct (Hyperloglog)
This is an approximation for distinct counts that was stabilized in our 1.3 release! The distinct_count()
accessor may be called on the output of a hyperloglog()
like so:
SELECT hyperloglog(device_id) -> distinct_count() FROM measurements;
Next steps
We hope this post helped you understand how function pipelines leverage PostgreSQL extensibility to offer functional programming concepts in a way that is fully PostgreSQL compliant. And how function pipelines can improve the ergonomics of your code making it easier to write, read, and maintain.
You can try function pipelines today with a fully-managed Timescale Cloud service (no credit card required, free for 30 days). Function pipelines are available now on every new database service on Timescale Cloud, so after you’ve created a new service, you’re all set to use them!
If you prefer to manage your own database instances, you can download and install the timescaledb_toolkit
extension on GitHub for free, after which you’ll be able to use function pipelines.
We love building in public. You can view our upcoming roadmap on GitHub for a list of proposed features, as well as features we’re currently implementing and those that are available to use today. We also welcome feedback from the community (it helps us prioritize the features users really want). To contribute feedback, comment on an open issue or in a discussion thread in GitHub.
Posted on November 24, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 24, 2021