My first Map Reduce without Hadoop in pure python
Ilya
Posted on July 20, 2019
Foreword
I'm working at robotics company (Rozum Robotics). Most of the time I develop algorithmic "kernel" of the robot and internal libraries, public apis (example) or end-user applications (example).
And this is my first post here, criticism is welcome :)
The story
Long story short: there was need to parse and analyze relatively huge amount of metrics. Of course, ASAP.
Motivation: robot was deployed in production. Software versions are stable and tested but sometimes robot was not executing tasks as expected.
Why? We didn't know and it was horrifying.
In the end, we figured out what was happening and successfully fixed it. It was bug on the end-user application side. Happy end.
But I'd like to show how Python could help in such situations by examples.
Input
Just some zip files with metrics like this:
2019-07-18 09:10:20.072 [metrics-consumer-broker-0] INFO c.r.i.m.InLogMetricsStorageHandler - {
"robotId": "D3E14103701D4FD7DE8B11DF7DB94723",
"event": "DEVICE_HEARTBEAT",
"timestamp": 1563441019902,
"payload": {
"device": 37,
"state": "OPERATIONAL"
}
}
2019-07-18 09:10:20.072 [metrics-consumer-broker-0] INFO c.r.i.m.InLogMetricsStorageHandler - {
"robotId": "D3E14103701D4FD7DE8B11DF7DB94723",
"event": "DEVICE_HEARTBEAT",
"timestamp": 1563441019912,
"payload": {
"device": 34,
"state": "OPERATIONAL"
}
}
What should have been done? - Pipeline!
- Extract raw data
- Convert raw data into suitable data types
- Filtering
- Higher-order event restoration
- Regrouping
- Analysis
It needed to be done quick, hot and dirty. I believed in Python
for this task and was not disappointed. We needed to implement simple MapReduce.
"MapReduce is a data processing job which splits the input data into independent chunks, which are then processed by the map function and then reduced by grouping similar sets of the data."
Quick search: python data pipeline framework
->
First link in google -> Searched python
-> Saw some frameworks I heard about (Luigi, Airflow) -> Too complicated -> Search lightweight
-> Joblib -> The journey begins!
The code
Imports:
import glob
import zipfile
import re
import json
import time
import operator
import functools
import itertools
import joblib
Shortcut for Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly:
def parallelize(func, arr):
return joblib.Parallel(n_jobs=6)(joblib.delayed(func)(tgt) for tgt in arr)
1) Extract raw data
def list_zip_targets():
targets = glob.glob(RAW_DATA_GLOB)
return sorted(targets)
def unzip_target(tgt):
with zipfile.ZipFile(tgt, "r") as archive:
archive.extractall("extracted")
def unzip_all():
return parallelize(unzip_target, list_zip_targets())
2) Convert raw data into suitable data types:
But wait, as you can see the log file is not "valid" json.
First we need to extract jsons - ok, lets try regex.
Each record starts with "date" - let's use it.
And lets start our parse_log
function which we will map to each of the logs
def make_json(entry):
try:
return json.loads(re.findall(r"-\s(\{.+\}\})", entry)[0])
except:
return None
def parse_log(tgt):
with open(tgt, "r") as log:
data = log.read()
entries = data.replace("\n", "")
entries = re.split(r"\d\d\d\d-\d\d-\d\d", entries)
jsons = [make_json(e) for e in entries]
3) Filtering
Some of the records may be empty, so:
valid_jsons = [e for e in jsons if e is not None]
And I needed to get the records that could contain important information. In my case this was info about "run" and "build" events:
def extract_runs_and_builds(entries):
return [
e
for e in entries
if "state" in e["payload"]
and (
e["payload"]["state"].lower() == "build"
or e["payload"]["state"].lower() == "run_position"
)
]
4) Higher-order event restoration
Now we got the records which we were interested in. But we needed to combine each run with build. If "run" is successful - "build" started, but not every run is successful.
def zip_run_with_build(runs_and_builds):
rb = runs_and_builds
result = []
for i, o in enumerate(rb):
try:
if o["payload"]["state"].lower() == "run_position":
if rb[i + 1]["payload"]["state"].lower() == "build":
result.append({"run": o, "build": rb[i + 1]})
except Exception:
pass
return result
5) Regrouping
I regrouped each run and build and collected metrics for each unique run.
def foldl(op, acc, arr):
return functools.reduce(op, arr, acc)
def parse_log(tgt):
with open(tgt, "r") as log:
data = log.read()
entries = data.replace("\n", "")
entries = re.split(r"\d\d\d\d-\d\d-\d\d", entries)
jsons = [make_json(e) for e in entries]
valid_jsons = [e for e in jsons if e is not None]
runs_and_builds = extract_runs_and_builds(valid_jsons)
zipped = zip_run_with_build(runs_and_builds)
metrics = {extract_key(z): [] for z in zipped}
for z in zipped:
built_poses = z["build"]["payload"]["builtPoses"]
times = [b["deltaTimeMs"] for b in built_poses]
metrics[extract_key(z)].append(
{
"totalTime": foldl(operator.add, 0, times),
"timestamp": z["build"]["timestamp"],
}
)
return metrics
def parallel_parsing():
return parallelize(parse_log, list_log_targets())
So I mapped parser function to log_files. Last thing remained - reduce.
if __name__ == "__main__":
start = time.time()
data = [d for d in parallel_parsing() if d]
# reduce
result = {}
for d in data:
for k, v in d.items():
if k in result:
result[k] += v
else:
result[k] = v
with open("result.txt", "w") as f:
f.write(json.dumps(result))
dt = time.time() - start
print(f"Finished in {dt}s")
6) Analysis
Is a separate story, not for the first post, maybe next :)
Conclusion
When time matters Python could become your saviour.
This script processes 3 GB of data on my machine in 22 +- 0.5 seconds.
Non parallel version takes 2 minutes for the same amount of data.
In the end, I would like to mention another library which aims to manipulate and transform indexable data (lists, arrays, ...) - SeqTools - take a look, maybe it will save your day someday.
Have a nice day, wherever you are! Happy coding and problem solving!
Posted on July 20, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.