So you have a bunch of things to do. Why not build a pipeline?
Klemen Slavič
Posted on November 18, 2018
When developing software, it's a good idea to write code that reads well. And, like any good storyteller, you want to leave out details that aren't important. You also want to leave breadcrumbs for the reader to get at the details when they need to.
Sit back, grab a hot drink, and let's get straight into it.
The elements of a good story
What do stories, procedures, processes, functions and algorithms have in common?
They all have a start, a middle and an end.
When we describe procedures, we start by describing the prerequisites and materials that we need to execute, the inputs of the procedure. We describe the steps needed to execute the procedure. When all is said and done, the description also includes the expected result, the output.
If you're thinking that that sounds remarkably like a function call, you're absolutely correct. But if that deduction escapes you, don't worry, this article is a process by which you'll become familiar with the concept. 😁
Defining inputs
Let's put our cosplay suit on. Your role in this story will be that of an analyst who is tasked with delivering reports on selected subreddits. You will be given a list of subreddits to generate several types of reports based on the page.
Your task will be to generate a few reports for every given subreddit front page:
- the median of the word count for each post
- the median of the number of comments for each post
- the ratio of posts with images attached vs. all posts
As for the URL, take your pick, but in this example, we'll be using /r/dataisbeautiful
:
https://www.reddit.com/r/dataisbeautiful/
When you're done having a look, try out the JSON URL so you'll get a feeling for how the data is structured:
Defining steps
So first things first – we need to break the problem down into well-defined steps. The more granular, the easier they will be to understand, debug and reuse. The rule of the game is do one thing and do it well.
Let's take the first report and write down the steps. The more granular, the better.
- generate URL
- fetch JSON data
- extract posts
- extract post text and title for each post
- generate a word count for each text
- calculate median value for all texts
Ideally, you'd have tests for each of these steps. For brevity, I'm omitting the tests in this article, but this would definitely not fly if I were reviewing your code in a code review!
Step 1: generate URL
This one is simple: take a Reddit URL, remove the trailing slash (if any) and append the .json
string.
const getRedditJSONUrl = url => url.replace(/\/?$/, '.json');
Step 2: fetch JSON data
A simple call with fetch
and converting the response to JSON does the trick.
const fetchData = url => fetch(url).then(response => response.json());
Step 3: extract posts
We know that every page contains the data.children
property which holds the array of posts we're interested in.
const extractPosts = redditPage => redditPage.data.children;
Step 4: extract post text for each post
The title in each post can be found in the data.title
attribute, and the text in data.selftext
. We'll concatenate them using a newline, \n
.
const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext;
Step 5: generate word count for each text
This one is a bit tricky. There's no quick way to reliably count the number of words, so we're going to use a more sophisticated utility function from NPM, @iarna/word-count
.
Note that we are still creating a function that wraps the library function. This is to isolate ourselves from the library in case we need to change out the implementation, or if the function call changes from refactoring on our side of the code.
const _wordCount = require('@iarna/word-count');
const countWords = text => _wordCount(text);
Step 6: calculate the median
To calculate the median of a set of numbers, we order them from smallest to largest. The median is the value that splits the ordered set into two equal halves. For sets with an odd count of values, it will be the middle value. For evenly counted sets, it will be the midpoint between the two values in the center.
Here's the median value of an odd and an even set of numbers:
[1 1 2 3 5 8 13] ~ size = 7
^ median = 3
[1 1 2 3 5 8 13 21] ~ size = 8
^ median = (3+5)/2
Here's the implementation:
const numberValueSorter = (a, b) => a - b;
const calculateMedian = list => {
// an empty list has no median
if (list.length == 0) return undefined;
// sort the values
const sorted = Array.from(list).sort(numberValueSorter);
if (sorted.length % 2 == 0) {
// we're dealing with an even-sized set, so take the midpoint
// of the middle two values
const a = sorted.length / 2 - 1;
const b = a + 1;
return (list[a] + list[b]) / 2;
} else {
// pick the middle value
const i = Math.floor(sorted.length / 2);
return list[i];
}
}
Connecting the steps
Now that we have the steps in place, let's just write out the code in classic, imperative style so we get a better understanding of what the process looks like.
As far as storytelling goes, the flow seems all over the place. Instead of simply listing the steps, we call each step in turn, saving the intermediate result and handing the result to the next step.
There's also a couple of gotchas in that story; some require await
ing results, some require wrapping calls with map
to process each item.
What if we could just connect these steps into something that would pass these results down the chain? he asks with a twinkle in his eye.
Enter the pipeline
Here's where we need to introduce a new concept – the pipeline
function. Let's start by analysing our original process that takes a subreddit URL and generates a median word count for the page:
const getMedianWordCountReport = async subredditUrl => {
/* something something spaceship */
return 'voilá!';
};
We said that our process is defined by the six steps described above. Let's assume pipeline
exists and write the fantasy code that lets us create the process function from sequence of steps:
const getMedianWordCountReport = pipeline(
getRedditJSONUrl,
fetchData,
extractPosts,
map(extractPostTextAndTitle),
map(countWords),
calculateMedian
);
const URL = 'https://www.reddit.com/r/dataisbeautiful/';
// it's an async function, so we need to wait for it to resolve
getMedianWordCountReport(URL)
.then(median =>
console.log('Median word count for ' + URL, median)
)
.catch(error => console.error(error));
Ah, but what about that map()
function there? It's just the Array::map
function changed so that it is curried with the mapping function before accepting the array:
const map = mapper => array => array.map(mapper);
So far, so good. We now know what the function should do, we just need to define it. Let's start by defining its signature:
const pipeline = (...steps) => { // take a list of steps,
return async input => { // return an async function that takes an input,
return input; // and eventually returns a result
};
};
We've created a function that takes an arbitrary number of functions (steps
) and returns an async function
, the process function.
For every step, the function should take the last intermediate result, feed it to the next step, and save that intermediate result.
If there are no more steps, return the last intermediate result.
Ready? Go!
const pipeline = (...steps) => { // take a list of steps defining the process
return async input => { // and return an async function that takes input;
let result = input; // the first intermediate result is the input;
for (const step of steps) // iterate over each step;
result = await step(result); // run the step on the result and update it;
return result; // return the last result!
};
};
You might be thinking, "no, that can't be it. Is that really all of it?"
Yep. Try it yourself:
Streamlining the pipeline
We have a few bends in the pipeline that we'd like to straighten out. There is a point where the result changes from a single value to a list of values (extractPosts
) and back again (calculateMedian
). It would be nicer if we could group together functions that have to deal with individual items.
In order to do that, let's create a composition function that will take a number of steps meant to process a single value and string them together to operate on a list of values:
const map = (...mappers) => // take an array of mappers,
array => // and return a function that takes an array;
array.map( // map each item of the array
item => mappers.reduce( // through a function that passes each item
(result, mapper) => mapper(result) // and runs them through the chain of mappers
)
);
Now, there is a caveat to this function: the mapper functions passed into this map
function must be synchronous. For completeness, let's assume that each mapper might be an async
function and should be treated accordingly.
const map = (...mappers) =>
async array => { // we now have to return an async function
const results = [];
for (const value of array) { // for each value of the array,
let result = value; // set the first intermediate result to the first value;
for (const mapper of mappers) // take each mapper;
result = await mapper(result); // and pass the intermediate result to the next;
results.push(result); // and push the result onto the results array;
}
return results; // return the final array
};
Now that we've solved that edge case, we can reformulate our process function by grouping the two single-item functions into a single step:
And it still works!
Forking pipelines
So now we have a pipeline
function that we can use to declaratively construct a single function that describes our process. But so far, we've only covered one of the three original goals we were tasked in our cosplay scenario.
Oh noes!
Let's write up all of the processes to take stock of what we still have to do.
const getMedianWordCount = pipeline(
getRedditJSONUrl,
fetchData,
extractPosts,
map(
extractPostTextAndTitle,
countWords
),
calculateMedian
);
const getMedianCommentCount = pipeline(
getRedditJSONUrl,
fetchData,
extractPosts,
map(countComments),
calculateMedian
);
const getImagePresentRatio = pipeline(
getRedditJSONUrl,
fetchData,
extractPosts,
map(hasImageAttached),
calculateRatio
);
OK, so we need to write up a couple of steps so that we have all the functions available to assemble the processes. Let's add them now:
const countComments = post => post.data.num_comments;
const hasImageAttached = post => post.data.post_hint == 'image';
const calculateRatio = array => {
if (array.length == 0) return undefined;
return array.filter(value => !!value).length / array.length;
};
With that done, let's see if this all runs:
Great, we now know that we can build processes with these building blocks. There is a slight problem, though. Each process has to do much of the same things, and it seems wasteful to have to have each process fetch the same data and go through the same motions every time.
Let's create a fork
function to handle that problem.
Ideally, we'd like to split the pipeline into specific pipelines for each process, then join them together to get the end result. Let's write some fantasy code to make the goal a bit clearer:
const getMedianWordCount = pipeline(
map(
extractPostTextAndTitle,
countWords
),
calculateMedian
);
const getMedianCommentCount = pipeline(
map(countComments),
calculateMedian
);
const getImagePresentRatio = pipeline(
map(hasImageAttached),
calculateRatio
);
// this is a convenience function that associates names to the results returned
const joinResults = ([
medianWordCount,
medianCommentCount,
imagePresentRatio
]) => ({
medianWordCount,
medianCommentCount,
imagePresentRatio
});
// the process function, now with forking!
const getSubredditMetrics = pipeline(
getRedditJSONUrl,
fetchData,
extractPosts,
fork(
getMedianWordCount,
getMedianCommentCount,
getImagePresentRatio
),
joinResults
);
According to the above requirements, the fork
function takes a series of pipelines.
At this point, I would advise you to go ahead and try to write your own implementation of fork
, given the above constraints. Your implementation might be very similar to the extended map
.
Here's my take on the fork
function:
const fork = (...pipelines) => // a function that takes a list of pipelines,
async value => // returns an async function that takes a value;
await Promise.all( // it returns the results of promises...
pipelines.map( // ...mapped over pipelines...
pipeline => pipeline(value) // ...that are passed the value.
)
);
If it looks confusing, don't worry. It takes a lot to unpack what the function does.
The trick is to remember that Promise.all()
takes an array of promises and returns a promise that resolves when all of the values have resolved. The result is the array of promise results in the same order. If any of the values isn't a promise, it just treats it as an immediately resolved promise with that result.
The final result
So, will the fork
work and save us the extra overhead? Let's see.
One last magic trick
Still with me? OK, remember when we started our cosplay that we wanted to generate these reports for a list or URLs, not just one? Can we create a sort of process of processes that would take an array or URLs and return an array of reports?
Maybe.
Let's break down the problem. We have an array of URLs. We know we can pass each URL through the pipeline and get back a promise that resolves to the report. If we map the array of URLs with the pipeline, then we get back an array of promises.
And we already know how to resolve an array of promises!
const distribute = pipeline => // distribute takes a pipeline,
values => // and returns a function that takes a list of values;
Promise.all( // it returns a promise of all the values...
values.map(pipeline) // ...passed through each pipeline
);
Yup, I think that does it! Let's try it out by passing an array of URLs to see how it does:
...and they lived happily ever after.
Congratulations on making it this far! You've successfully gone through the process of designing & developing an entire system of asynchronous coordination mechanisms from scratch, which is no easy feat.
Just to wrap things up, let's extract the general utility functions that we've used to build up our process functions and make them available as modules:
export const pipeline = (...steps) =>
async input => {
let result = input;
for (const step of steps)
result = await step(result);
return result;
};
export const map = (...mappers) =>
async array => {
const results = [];
for (const value of array) {
let result = value;
for (const mapper of mappers)
result = await mapper(result);
results.push(result);
}
return results;
};
export const fork = (...pipelines) =>
async value =>
await Promise.all(
pipelines.map(pipeline => pipeline(value))
);
export const distribute = pipeline =>
values =>
Promise.all(
values.map(pipeline)
);
Using just these four functions we've managed to build out a complete suite of generic primitives that can process a finite amount of work in under 350 characters of minifed code. 😉
You can get out of that cosplay costume now.
Posted on November 18, 2018
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.