My first Map Reduce without Hadoop in pure python

grozail

Ilya

Posted on July 20, 2019

My first Map Reduce without Hadoop in pure python

Foreword

I'm working at robotics company (Rozum Robotics). Most of the time I develop algorithmic "kernel" of the robot and internal libraries, public apis (example) or end-user applications (example).

And this is my first post here, criticism is welcome :)

The story

Long story short: there was need to parse and analyze relatively huge amount of metrics. Of course, ASAP.

Motivation: robot was deployed in production. Software versions are stable and tested but sometimes robot was not executing tasks as expected.
Why? We didn't know and it was horrifying.

In the end, we figured out what was happening and successfully fixed it. It was bug on the end-user application side. Happy end.

But I'd like to show how Python could help in such situations by examples.

Input



Just some zip files with metrics like this:

2019-07-18 09:10:20.072 [metrics-consumer-broker-0] INFO  c.r.i.m.InLogMetricsStorageHandler - {
  "robotId": "D3E14103701D4FD7DE8B11DF7DB94723",
  "event": "DEVICE_HEARTBEAT",
  "timestamp": 1563441019902,
  "payload": {
    "device": 37,
    "state": "OPERATIONAL"
  }
}
2019-07-18 09:10:20.072 [metrics-consumer-broker-0] INFO  c.r.i.m.InLogMetricsStorageHandler - {
  "robotId": "D3E14103701D4FD7DE8B11DF7DB94723",
  "event": "DEVICE_HEARTBEAT",
  "timestamp": 1563441019912,
  "payload": {
    "device": 34,
    "state": "OPERATIONAL"
  }
}
Enter fullscreen mode Exit fullscreen mode

What should have been done? - Pipeline!

  1. Extract raw data
  2. Convert raw data into suitable data types
  3. Filtering
  4. Higher-order event restoration
  5. Regrouping
  6. Analysis

It needed to be done quick, hot and dirty. I believed in Python for this task and was not disappointed. We needed to implement simple MapReduce.

"MapReduce is a data processing job which splits the input data into independent chunks, which are then processed by the map function and then reduced by grouping similar sets of the data."

Quick search: python data pipeline framework ->
First link in google -> Searched python -> Saw some frameworks I heard about (Luigi, Airflow) -> Too complicated -> Search lightweight -> Joblib -> The journey begins!

The code

Imports:

import glob
import zipfile
import re
import json
import time
import operator
import functools
import itertools
import joblib
Enter fullscreen mode Exit fullscreen mode

Shortcut for Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly:

def parallelize(func, arr):
    return joblib.Parallel(n_jobs=6)(joblib.delayed(func)(tgt) for tgt in arr)
Enter fullscreen mode Exit fullscreen mode

1) Extract raw data

def list_zip_targets():
    targets = glob.glob(RAW_DATA_GLOB)
    return sorted(targets)

def unzip_target(tgt):
    with zipfile.ZipFile(tgt, "r") as archive:
        archive.extractall("extracted")

def unzip_all():
    return parallelize(unzip_target, list_zip_targets())
Enter fullscreen mode Exit fullscreen mode

Intermediate output:

2) Convert raw data into suitable data types:

But wait, as you can see the log file is not "valid" json.
First we need to extract jsons - ok, lets try regex.
Each record starts with "date" - let's use it.
And lets start our parse_log function which we will map to each of the logs

def make_json(entry):
    try:
        return json.loads(re.findall(r"-\s(\{.+\}\})", entry)[0])
    except:
        return None

def parse_log(tgt):
    with open(tgt, "r") as log:
        data = log.read()
        entries = data.replace("\n", "")
        entries = re.split(r"\d\d\d\d-\d\d-\d\d", entries)
        jsons = [make_json(e) for e in entries]

Enter fullscreen mode Exit fullscreen mode

3) Filtering

Some of the records may be empty, so:

        valid_jsons = [e for e in jsons if e is not None]
Enter fullscreen mode Exit fullscreen mode

And I needed to get the records that could contain important information. In my case this was info about "run" and "build" events:

def extract_runs_and_builds(entries):
    return [
        e
        for e in entries
        if "state" in e["payload"]
        and (
            e["payload"]["state"].lower() == "build"
            or e["payload"]["state"].lower() == "run_position"
        )
    ]
Enter fullscreen mode Exit fullscreen mode

4) Higher-order event restoration

Now we got the records which we were interested in. But we needed to combine each run with build. If "run" is successful - "build" started, but not every run is successful.

def zip_run_with_build(runs_and_builds):
    rb = runs_and_builds
    result = []
    for i, o in enumerate(rb):
        try:
            if o["payload"]["state"].lower() == "run_position":
                if rb[i + 1]["payload"]["state"].lower() == "build":
                    result.append({"run": o, "build": rb[i + 1]})
        except Exception:
            pass
    return result
Enter fullscreen mode Exit fullscreen mode

5) Regrouping

I regrouped each run and build and collected metrics for each unique run.


def foldl(op, acc, arr):
    return functools.reduce(op, arr, acc)

def parse_log(tgt):
    with open(tgt, "r") as log:
        data = log.read()
        entries = data.replace("\n", "")
        entries = re.split(r"\d\d\d\d-\d\d-\d\d", entries)
        jsons = [make_json(e) for e in entries]
        valid_jsons = [e for e in jsons if e is not None]
        runs_and_builds = extract_runs_and_builds(valid_jsons)
        zipped = zip_run_with_build(runs_and_builds)
        metrics = {extract_key(z): [] for z in zipped}
        for z in zipped:
            built_poses = z["build"]["payload"]["builtPoses"]
            times = [b["deltaTimeMs"] for b in built_poses]
            metrics[extract_key(z)].append(
                {
                    "totalTime": foldl(operator.add, 0, times),
                    "timestamp": z["build"]["timestamp"],
                }
            )
        return metrics

def parallel_parsing():
    return parallelize(parse_log, list_log_targets())
Enter fullscreen mode Exit fullscreen mode

So I mapped parser function to log_files. Last thing remained - reduce.

if __name__ == "__main__":
    start = time.time()
    data = [d for d in parallel_parsing() if d]
    # reduce
    result = {}
    for d in data:
        for k, v in d.items():
            if k in result:
                result[k] += v
            else:
                result[k] = v
    with open("result.txt", "w") as f:
        f.write(json.dumps(result))
    dt = time.time() - start
    print(f"Finished in {dt}s")
Enter fullscreen mode Exit fullscreen mode

6) Analysis

Is a separate story, not for the first post, maybe next :)

Conclusion

When time matters Python could become your saviour.
This script processes 3 GB of data on my machine in 22 +- 0.5 seconds.
Non parallel version takes 2 minutes for the same amount of data.

In the end, I would like to mention another library which aims to manipulate and transform indexable data (lists, arrays, ...) - SeqTools - take a look, maybe it will save your day someday.

Have a nice day, wherever you are! Happy coding and problem solving!

💖 💪 🙅 🚩
grozail
Ilya

Posted on July 20, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related