So you have a bunch of things to do. Why not build a pipeline?

krofdrakula

Klemen Slavič

Posted on November 18, 2018

So you have a bunch of things to do. Why not build a pipeline?

When developing software, it's a good idea to write code that reads well. And, like any good storyteller, you want to leave out details that aren't important. You also want to leave breadcrumbs for the reader to get at the details when they need to.

Sit back, grab a hot drink, and let's get straight into it.

The elements of a good story

What do stories, procedures, processes, functions and algorithms have in common?

They all have a start, a middle and an end.

When we describe procedures, we start by describing the prerequisites and materials that we need to execute, the inputs of the procedure. We describe the steps needed to execute the procedure. When all is said and done, the description also includes the expected result, the output.

If you're thinking that that sounds remarkably like a function call, you're absolutely correct. But if that deduction escapes you, don't worry, this article is a process by which you'll become familiar with the concept. 😁

Defining inputs

Let's put our cosplay suit on. Your role in this story will be that of an analyst who is tasked with delivering reports on selected subreddits. You will be given a list of subreddits to generate several types of reports based on the page.

Your task will be to generate a few reports for every given subreddit front page:

  1. the median of the word count for each post
  2. the median of the number of comments for each post
  3. the ratio of posts with images attached vs. all posts

As for the URL, take your pick, but in this example, we'll be using /r/dataisbeautiful:

https://www.reddit.com/r/dataisbeautiful/

When you're done having a look, try out the JSON URL so you'll get a feeling for how the data is structured:

const fetch = require('node-fetch'); const url = 'https://www.reddit.com/r/dataisbeautiful.json'; fetch(url) .then(response => response.json()) .then(json => console.log(json));

Defining steps

So first things first – we need to break the problem down into well-defined steps. The more granular, the easier they will be to understand, debug and reuse. The rule of the game is do one thing and do it well.

Let's take the first report and write down the steps. The more granular, the better.

  1. generate URL
  2. fetch JSON data
  3. extract posts
  4. extract post text and title for each post
  5. generate a word count for each text
  6. calculate median value for all texts

Ideally, you'd have tests for each of these steps. For brevity, I'm omitting the tests in this article, but this would definitely not fly if I were reviewing your code in a code review!

Step 1: generate URL

This one is simple: take a Reddit URL, remove the trailing slash (if any) and append the .json string.

const getRedditJSONUrl = url => url.replace(/\/?$/, '.json');
Enter fullscreen mode Exit fullscreen mode

Step 2: fetch JSON data

A simple call with fetch and converting the response to JSON does the trick.

const fetchData = url => fetch(url).then(response => response.json());
Enter fullscreen mode Exit fullscreen mode

Step 3: extract posts

We know that every page contains the data.children property which holds the array of posts we're interested in.

const extractPosts = redditPage => redditPage.data.children;
Enter fullscreen mode Exit fullscreen mode

Step 4: extract post text for each post

The title in each post can be found in the data.title attribute, and the text in data.selftext. We'll concatenate them using a newline, \n.

const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext;
Enter fullscreen mode Exit fullscreen mode

Step 5: generate word count for each text

This one is a bit tricky. There's no quick way to reliably count the number of words, so we're going to use a more sophisticated utility function from NPM, @iarna/word-count.

Note that we are still creating a function that wraps the library function. This is to isolate ourselves from the library in case we need to change out the implementation, or if the function call changes from refactoring on our side of the code.

const _wordCount = require('@iarna/word-count');

const countWords = text => _wordCount(text);
Enter fullscreen mode Exit fullscreen mode

Step 6: calculate the median

To calculate the median of a set of numbers, we order them from smallest to largest. The median is the value that splits the ordered set into two equal halves. For sets with an odd count of values, it will be the middle value. For evenly counted sets, it will be the midpoint between the two values in the center.

Here's the median value of an odd and an even set of numbers:

[1 1 2 3 5 8 13] ~ size = 7
       ^ median = 3

[1 1 2 3 5 8 13 21] ~ size = 8
        ^ median = (3+5)/2
Enter fullscreen mode Exit fullscreen mode

Here's the implementation:

const numberValueSorter = (a, b) => a - b;

const calculateMedian = list => {
  // an empty list has no median
  if (list.length == 0) return undefined;

  // sort the values
  const sorted = Array.from(list).sort(numberValueSorter);

  if (sorted.length % 2 == 0) {
    // we're dealing with an even-sized set, so take the midpoint
    // of the middle two values
    const a = sorted.length / 2 - 1;
    const b = a + 1;
    return (list[a] + list[b]) / 2;
  } else {
    // pick the middle value
    const i = Math.floor(sorted.length / 2);
    return list[i];
  }
}
Enter fullscreen mode Exit fullscreen mode

Connecting the steps

Now that we have the steps in place, let's just write out the code in classic, imperative style so we get a better understanding of what the process looks like.

const fetch = require('node-fetch'); const _wordCount = require('@iarna/word-count'); const getRedditJSONUrl = url => url.replace(/\/?$/, '.json'); const fetchData = url => fetch(url).then(response => response.json()); const extractPosts = redditPage => redditPage.data.children; const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext; const countWords = text => _wordCount(text); const numberValueSorter = (a, b) => a - b; const calculateMedian = list => { if (list.length == 0) return undefined; const sorted = Array.from(list).sort(numberValueSorter); if (sorted.length % 2 == 0) { const a = sorted.length / 2 - 1; const b = a + 1; return (list[a] + list[b]) / 2; } else { const i = Math.floor(sorted.length / 2); return list[i]; } } const URL = 'https://www.reddit.com/r/dataisbeautiful/'; // because some of the steps require resolving Promises, we'll // use an async function so we can await the result (async () => { // step 1 const jsonURL = getRedditJSONUrl(URL); // step 2 – needs awaiting const pageData = await fetchData(jsonURL); // step 3 const posts = extractPosts(pageData); // step 4 – we need to map over the elements of the array const texts = posts.map(extractPostTextAndTitle); // step 5 - same here const wordCounts = texts.map(countWords); // step 6 const median = calculateMedian(wordCounts); console.log('Median word count for ' + URL, median); })();

As far as storytelling goes, the flow seems all over the place. Instead of simply listing the steps, we call each step in turn, saving the intermediate result and handing the result to the next step.

There's also a couple of gotchas in that story; some require awaiting results, some require wrapping calls with map to process each item.

What if we could just connect these steps into something that would pass these results down the chain? he asks with a twinkle in his eye.

Enter the pipeline

Here's where we need to introduce a new concept – the pipeline function. Let's start by analysing our original process that takes a subreddit URL and generates a median word count for the page:

const getMedianWordCountReport = async subredditUrl => {
  /* something something spaceship */
  return 'voilá!';
};
Enter fullscreen mode Exit fullscreen mode

We said that our process is defined by the six steps described above. Let's assume pipeline exists and write the fantasy code that lets us create the process function from sequence of steps:

const getMedianWordCountReport = pipeline(
  getRedditJSONUrl,
  fetchData,
  extractPosts,
  map(extractPostTextAndTitle),
  map(countWords),
  calculateMedian
);

const URL = 'https://www.reddit.com/r/dataisbeautiful/';

// it's an async function, so we need to wait for it to resolve
getMedianWordCountReport(URL)
  .then(median =>
    console.log('Median word count for ' + URL, median)
  )
  .catch(error => console.error(error));
Enter fullscreen mode Exit fullscreen mode

Ah, but what about that map() function there? It's just the Array::map function changed so that it is curried with the mapping function before accepting the array:

const map = mapper => array => array.map(mapper);
Enter fullscreen mode Exit fullscreen mode

So far, so good. We now know what the function should do, we just need to define it. Let's start by defining its signature:

const pipeline = (...steps) => {  // take a list of steps,
  return async input => {         // return an async function that takes an input,
    return input;                 // and eventually returns a result
  };
};
Enter fullscreen mode Exit fullscreen mode

We've created a function that takes an arbitrary number of functions (steps) and returns an async function, the process function.

For every step, the function should take the last intermediate result, feed it to the next step, and save that intermediate result.

If there are no more steps, return the last intermediate result.

Ready? Go!

const pipeline = (...steps) => {    // take a list of steps defining the process
  return async input => {           // and return an async function that takes input;
    let result = input;             // the first intermediate result is the input;
    for (const step of steps)       // iterate over each step;
      result = await step(result);  // run the step on the result and update it;
    return result;                  // return the last result!
  };
};
Enter fullscreen mode Exit fullscreen mode

You might be thinking, "no, that can't be it. Is that really all of it?"

Yep. Try it yourself:

const fetch = require('node-fetch'); const _wordCount = require('@iarna/word-count'); const getRedditJSONUrl = url => url.replace(/\/?$/, '.json'); const fetchData = url => fetch(url).then(response => response.json()); const extractPosts = redditPage => redditPage.data.children; const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext; const countWords = text => _wordCount(text); const numberValueSorter = (a, b) => a - b; const calculateMedian = list => { if (list.length == 0) return undefined; const sorted = Array.from(list).sort(numberValueSorter); if (sorted.length % 2 == 0) { const a = sorted.length / 2 - 1; const b = a + 1; return (list[a] + list[b]) / 2; } else { const i = Math.floor(sorted.length / 2); return list[i]; } } const map = mapper => array => array.map(mapper); const pipeline = (...steps) => { return async input => { let result = input; for (const step of steps) result = await step(result); return result; }; }; const getMedianWordCount = pipeline( getRedditJSONUrl, fetchData, extractPosts, map(extractPostTextAndTitle), map(countWords), calculateMedian ); const URL = 'https://www.reddit.com/r/dataisbeautiful/'; getMedianWordCount(URL) .then(median => console.log('Median word count', median));

Streamlining the pipeline

We have a few bends in the pipeline that we'd like to straighten out. There is a point where the result changes from a single value to a list of values (extractPosts) and back again (calculateMedian). It would be nicer if we could group together functions that have to deal with individual items.

In order to do that, let's create a composition function that will take a number of steps meant to process a single value and string them together to operate on a list of values:

const map = (...mappers) =>                 // take an array of mappers,
  array =>                                  // and return a function that takes an array;
    array.map(                              // map each item of the array
      item => mappers.reduce(               // through a function that passes each item
        (result, mapper) => mapper(result)  // and runs them through the chain of mappers
      )
    );
Enter fullscreen mode Exit fullscreen mode

Now, there is a caveat to this function: the mapper functions passed into this map function must be synchronous. For completeness, let's assume that each mapper might be an async function and should be treated accordingly.

const map = (...mappers) =>
  async array => {                      // we now have to return an async function
    const results = [];
    for (const value of array) {        // for each value of the array,
      let result = value;               // set the first intermediate result to the first value;
      for (const mapper of mappers)     // take each mapper;
        result = await mapper(result);  // and pass the intermediate result to the next;
      results.push(result);             // and push the result onto the results array;
    }
    return results;                     // return the final array
  };
Enter fullscreen mode Exit fullscreen mode

Now that we've solved that edge case, we can reformulate our process function by grouping the two single-item functions into a single step:

const fetch = require('node-fetch'); const _wordCount = require('@iarna/word-count'); const getRedditJSONUrl = url => url.replace(/\/?$/, '.json'); const fetchData = url => fetch(url).then(response => response.json()); const extractPosts = redditPage => redditPage.data.children; const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext; const countWords = text => _wordCount(text); const numberValueSorter = (a, b) => a - b; const calculateMedian = list => { if (list.length == 0) return undefined; const sorted = Array.from(list).sort(numberValueSorter); if (sorted.length % 2 == 0) { const a = sorted.length / 2 - 1; const b = a + 1; return (list[a] + list[b]) / 2; } else { const i = Math.floor(sorted.length / 2); return list[i]; } } const pipeline = (...steps) => { return async input => { let result = input; for (const step of steps) result = await step(result); return result; }; }; const map = (...mappers) => async array => { const results = []; for (const value of array) { let result = value; for (const mapper of mappers) result = await mapper(result); results.push(result); } return results; }; const getMedianWordCount = pipeline( getRedditJSONUrl, fetchData, extractPosts, map( extractPostTextAndTitle, countWords ), calculateMedian ); const URL = 'https://www.reddit.com/r/dataisbeautiful/'; getMedianWordCount(URL) .then(median => console.log('Median word count', median));

And it still works!

Forking pipelines

So now we have a pipeline function that we can use to declaratively construct a single function that describes our process. But so far, we've only covered one of the three original goals we were tasked in our cosplay scenario.

Oh noes!

Let's write up all of the processes to take stock of what we still have to do.

const getMedianWordCount = pipeline(
  getRedditJSONUrl,
  fetchData,
  extractPosts,
  map(
    extractPostTextAndTitle,
    countWords
  ),
  calculateMedian
);

const getMedianCommentCount = pipeline(
  getRedditJSONUrl,
  fetchData,
  extractPosts,
  map(countComments),
  calculateMedian
);

const getImagePresentRatio = pipeline(
  getRedditJSONUrl,
  fetchData,
  extractPosts,
  map(hasImageAttached),
  calculateRatio
);
Enter fullscreen mode Exit fullscreen mode

OK, so we need to write up a couple of steps so that we have all the functions available to assemble the processes. Let's add them now:

const countComments = post => post.data.num_comments;

const hasImageAttached = post => post.data.post_hint == 'image';

const calculateRatio = array => {
  if (array.length == 0) return undefined;
  return array.filter(value => !!value).length / array.length;
};
Enter fullscreen mode Exit fullscreen mode

With that done, let's see if this all runs:

const fetch = require('node-fetch'); const _wordCount = require('@iarna/word-count'); const getRedditJSONUrl = url => url.replace(/\/?$/, '.json'); const fetchData = url => fetch(url).then(response => response.json()); const extractPosts = redditPage => redditPage.data.children; const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext; const countWords = text => _wordCount(text); const numberValueSorter = (a, b) => a - b; const calculateMedian = list => { if (list.length == 0) return undefined; const sorted = Array.from(list).sort(numberValueSorter); if (sorted.length % 2 == 0) { const a = sorted.length / 2 - 1; const b = a + 1; return (list[a] + list[b]) / 2; } else { const i = Math.floor(sorted.length / 2); return list[i]; } } const pipeline = (...steps) => { return async input => { let result = input; for (const step of steps) result = await step(result); return result; }; }; const map = (...mappers) => async array => { const results = []; for (const value of array) { let result = value; for (const mapper of mappers) result = await mapper(result); results.push(result); } return results; }; const countComments = post => post.data.num_comments; const hasImageAttached = post => post.data.post_hint == 'image'; const calculateRatio = array => { if (array.length == 0) return undefined; return array.filter(value => !!value).length / array.length; }; const getMedianWordCount = pipeline( getRedditJSONUrl, fetchData, extractPosts, map( extractPostTextAndTitle, countWords ), calculateMedian ); const getMedianCommentCount = pipeline( getRedditJSONUrl, fetchData, extractPosts, map(countComments), calculateMedian ); const getImagePresentRatio = pipeline( getRedditJSONUrl, fetchData, extractPosts, map(hasImageAttached), calculateRatio ); const URL = 'https://www.reddit.com/r/dataisbeautiful/'; // now we need to call all three processes and report the final count Promise.all([ getMedianWordCount(URL), getMedianCommentCount(URL), getImagePresentRatio(URL) ]).then(([medianWordCount, medianCommentCount, imagePresentRatio]) => { console.log( 'Results for ' + URL, { medianWordCount, medianCommentCount, imagePresentRatio } ); });

Great, we now know that we can build processes with these building blocks. There is a slight problem, though. Each process has to do much of the same things, and it seems wasteful to have to have each process fetch the same data and go through the same motions every time.

Let's create a fork function to handle that problem.

Ideally, we'd like to split the pipeline into specific pipelines for each process, then join them together to get the end result. Let's write some fantasy code to make the goal a bit clearer:

const getMedianWordCount = pipeline(
  map(
    extractPostTextAndTitle,
    countWords
  ),
  calculateMedian
);

const getMedianCommentCount = pipeline(
  map(countComments),
  calculateMedian
);

const getImagePresentRatio = pipeline(
  map(hasImageAttached),
  calculateRatio
);

// this is a convenience function that associates names to the results returned
const joinResults = ([
  medianWordCount,
  medianCommentCount,
  imagePresentRatio
]) => ({
  medianWordCount,
  medianCommentCount,
  imagePresentRatio
});

// the process function, now with forking!
const getSubredditMetrics = pipeline(
  getRedditJSONUrl,
  fetchData,
  extractPosts,
  fork(
    getMedianWordCount,
    getMedianCommentCount,
    getImagePresentRatio
  ),
  joinResults
);
Enter fullscreen mode Exit fullscreen mode

According to the above requirements, the fork function takes a series of pipelines.

At this point, I would advise you to go ahead and try to write your own implementation of fork, given the above constraints. Your implementation might be very similar to the extended map.

Here's my take on the fork function:

const fork = (...pipelines) =>       // a function that takes a list of pipelines,
  async value =>                     // returns an async function that takes a value;
    await Promise.all(               // it returns the results of promises...
      pipelines.map(                 // ...mapped over pipelines...
        pipeline => pipeline(value)  // ...that are passed the value.
      )
    );
Enter fullscreen mode Exit fullscreen mode

If it looks confusing, don't worry. It takes a lot to unpack what the function does.

The trick is to remember that Promise.all() takes an array of promises and returns a promise that resolves when all of the values have resolved. The result is the array of promise results in the same order. If any of the values isn't a promise, it just treats it as an immediately resolved promise with that result.

The final result

So, will the fork work and save us the extra overhead? Let's see.

const fetch = require('node-fetch'); const _wordCount = require('@iarna/word-count'); const getRedditJSONUrl = url => url.replace(/\/?$/, '.json'); const fetchData = url => fetch(url).then(response => response.json()); const extractPosts = redditPage => redditPage.data.children; const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext; const countWords = text => _wordCount(text); const numberValueSorter = (a, b) => a - b; const calculateMedian = list => { if (list.length == 0) return undefined; const sorted = Array.from(list).sort(numberValueSorter); if (sorted.length % 2 == 0) { const a = sorted.length / 2 - 1; const b = a + 1; return (list[a] + list[b]) / 2; } else { const i = Math.floor(sorted.length / 2); return list[i]; } } const pipeline = (...steps) => { return async input => { let result = input; for (const step of steps) result = await step(result); return result; }; }; const map = (...mappers) => async array => { const results = []; for (const value of array) { let result = value; for (const mapper of mappers) result = await mapper(result); results.push(result); } return results; }; const countComments = post => post.data.num_comments; const hasImageAttached = post => post.data.post_hint == 'image'; const calculateRatio = array => { if (array.length == 0) return undefined; return array.filter(value => !!value).length / array.length; }; const fork = (...pipelines) => async value => await Promise.all(pipelines.map(pipeline => pipeline(value))); const getMedianWordCount = pipeline( map( extractPostTextAndTitle, countWords ), calculateMedian ); const getMedianCommentCount = pipeline( map(countComments), calculateMedian ); const getImagePresentRatio = pipeline( map(hasImageAttached), calculateRatio ); // this is a convenience function that associates names to the results returned const joinResults = ([ medianWordCount, medianCommentCount, imagePresentRatio ]) => ({ medianWordCount, medianCommentCount, imagePresentRatio }); const getSubredditMetrics = pipeline( getRedditJSONUrl, fetchData, extractPosts, fork( getMedianWordCount, getMedianCommentCount, getImagePresentRatio ), joinResults ); const URL = 'https://www.reddit.com/r/dataisbeautiful/'; getSubredditMetrics(URL) .then(results => console.log('Report for ' + URL, results));

One last magic trick

Still with me? OK, remember when we started our cosplay that we wanted to generate these reports for a list or URLs, not just one? Can we create a sort of process of processes that would take an array or URLs and return an array of reports?

Maybe.

Let's break down the problem. We have an array of URLs. We know we can pass each URL through the pipeline and get back a promise that resolves to the report. If we map the array of URLs with the pipeline, then we get back an array of promises.

And we already know how to resolve an array of promises!

const distribute = pipeline =>  // distribute takes a pipeline,
  values =>                     // and returns a function that takes a list of values;
    Promise.all(                // it returns a promise of all the values...
      values.map(pipeline)      // ...passed through each pipeline
    );
Enter fullscreen mode Exit fullscreen mode

Yup, I think that does it! Let's try it out by passing an array of URLs to see how it does:

const fetch = require('node-fetch'); const _wordCount = require('@iarna/word-count'); const getRedditJSONUrl = url => url.replace(/\/?$/, '.json'); const fetchData = url => fetch(url).then(response => response.json()); const extractPosts = redditPage => redditPage.data.children; const extractPostTextAndTitle = post => post.data.title + '\n' + post.data.selftext; const countWords = text => _wordCount(text); const numberValueSorter = (a, b) => a - b; const calculateMedian = list => { if (list.length == 0) return undefined; const sorted = Array.from(list).sort(numberValueSorter); if (sorted.length % 2 == 0) { const a = sorted.length / 2 - 1; const b = a + 1; return (list[a] + list[b]) / 2; } else { const i = Math.floor(sorted.length / 2); return list[i]; } } const pipeline = (...steps) => { return async input => { let result = input; for (const step of steps) result = await step(result); return result; }; }; const map = (...mappers) => async array => { const results = []; for (const value of array) { let result = value; for (const mapper of mappers) result = await mapper(result); results.push(result); } return results; }; const countComments = post => post.data.num_comments; const hasImageAttached = post => post.data.post_hint == 'image'; const calculateRatio = array => { if (array.length == 0) return undefined; return array.filter(value => !!value).length / array.length; }; const fork = (...pipelines) => async value => await Promise.all(pipelines.map(pipeline => pipeline(value))); const getMedianWordCount = pipeline( map( extractPostTextAndTitle, countWords ), calculateMedian ); const getMedianCommentCount = pipeline( map(countComments), calculateMedian ); const getImagePresentRatio = pipeline( map(hasImageAttached), calculateRatio ); // this is a convenience function that associates names to the results returned const joinResults = ([ medianWordCount, medianCommentCount, imagePresentRatio ]) => ({ medianWordCount, medianCommentCount, imagePresentRatio }); const getSubredditMetrics = pipeline( getRedditJSONUrl, fetchData, extractPosts, fork( getMedianWordCount, getMedianCommentCount, getImagePresentRatio ), joinResults ); const distribute = pipeline => values => Promise.all(values.map(pipeline)); const URLs = [ 'https://www.reddit.com/r/dataisbeautiful/', 'https://www.reddit.com/r/proceduralgeneration/' ]; const getAllReports = distribute(getSubredditMetrics); getAllReports (URLs) .then(results => { const reports = results.map((report, idx) => ({ url: URLs[idx], report })); console.log(reports); });

...and they lived happily ever after.

Congratulations on making it this far! You've successfully gone through the process of designing & developing an entire system of asynchronous coordination mechanisms from scratch, which is no easy feat.

Just to wrap things up, let's extract the general utility functions that we've used to build up our process functions and make them available as modules:

export const pipeline = (...steps) =>
  async input => {
    let result = input;
    for (const step of steps)
      result = await step(result);
    return result;
  };

export const map = (...mappers) =>
  async array => {
    const results = [];
    for (const value of array) {
      let result = value;
      for (const mapper of mappers)
        result = await mapper(result);
      results.push(result);
    }
    return results;
  };

export const fork = (...pipelines) =>
  async value =>
    await Promise.all(
      pipelines.map(pipeline => pipeline(value))
    );

export const distribute = pipeline =>
  values =>
    Promise.all(
      values.map(pipeline)
    );
Enter fullscreen mode Exit fullscreen mode

Using just these four functions we've managed to build out a complete suite of generic primitives that can process a finite amount of work in under 350 characters of minifed code. 😉

You can get out of that cosplay costume now.

💖 💪 🙅 🚩
krofdrakula
Klemen Slavič

Posted on November 18, 2018

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related