Getting Started with Apache Beam: Hands-On Code Samples for Beginners

masudahiroto

masudahiroto

Posted on January 1, 2024

Getting Started with Apache Beam: Hands-On Code Samples for Beginners

In my recent exploration of Apache Beam during a day off, I dived into creating simple example programs, and I'm excited to share them with you. If you're a beginner venturing into Apache Beam, I hope these programs will prove valuable in easing your initiation. Let's make your Apache Beam journey smoother!

!pip install apache-beam
Enter fullscreen mode Exit fullscreen mode
import apache_beam as beam
Enter fullscreen mode Exit fullscreen mode

The Simplest Program: Displaying Data

This basic program reads local variables and prints each row. Initially, the program defines the local variable data, which contains three elements: "Hello", "Apache", and "Beam". The pipeline instance is established using with statements. The beam.Create statement reads the local variable data and creates a PCollection, a fundamental concept in Apache Beam representing a collection of input/output data. Subsequently, the beam.Map statements map the print function to each row, resulting in the printing of the three elements of the local variable data on stdout.

data = ['Hello', 'Apache', 'Beam']

with beam.Pipeline() as p:
    (p
        | "Create Data" >> beam.Create(data)
        | "Print" >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
Hello
Apache
Beam
Enter fullscreen mode Exit fullscreen mode

Add Columns

In this example, data is retrieved from the local variable data, structured like a table showcasing prices of various fruits. Each row consists of the fruit's name and its corresponding price. The goal is to calculate the price of each fruit, including a 10% tax, and introduce a new column for the updated prices.

class AddTaxColumn(beam.DoFn):
    def process(self, element):
        name, value = element
        new_value = value * 1.1
        yield (name, value, new_value)

data = [("apple", 100), ("banana", 200), ("orange", 150)]

with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'AddColumn' >> beam.ParDo(AddTaxColumn())
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('apple', 100, 110.00000000000001)
('banana', 200, 220.00000000000003)
('orange', 150, 165.0)
Enter fullscreen mode Exit fullscreen mode

In Apache Beam, two major options, beam.Map and beam.ParDo, are available to apply functions to each row. The key difference is that beam.Map generates only one output row per input row, whereas beam.ParDo can produce multiple output rows per input row.

You can pass an ordinary Python function or a class instance inheriting from beam.DoFn to the beam.ParDo argument. However, a beam.DoFn instance cannot be passed to beam.Map. In the case of beam.Map, you need to implement the function to return each output row based on its input element. With beam.ParDo, you must implement the function to yield each output row for its input element. If you yield multiple elements, you can obtain multiple outputs per row.

Here are examples to perform the same transformation:

def add_tax_column_map(element):
    name, value = element
    new_value = value * 1.1
    return (name, value, new_value)

data = [("apple", 100), ("banana", 200), ("orange", 150)]

with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'AddColumn' >> beam.Map(add_tax_column_map)
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('apple', 100, 110.00000000000001)
('banana', 200, 220.00000000000003)
('orange', 150, 165.0)
Enter fullscreen mode Exit fullscreen mode
def add_tax_column_pardo(element):
    name, value = element
    new_value = value * 1.1
    yield (name, value, new_value)

data = [("apple", 100), ("banana", 200), ("orange", 150)]

with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'AddColumn' >> beam.ParDo(add_tax_column_pardo)
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('apple', 100, 110.00000000000001)
('banana', 200, 220.00000000000003)
('orange', 150, 165.0)
Enter fullscreen mode Exit fullscreen mode

Parsing JSON Strings

Parsing JSON is a straightforward task using beam.Map (or beam.ParDo) in Apache Beam. This is achieved by applying the json.loads function through mapping, resulting in a collection of dict objects.

import json

data = [
    '{"name": "John", "age": 30, "city": "New York"}',
    '{"name": "Alice", "age": 25, "city": "San Francisco"}',
    '{"name": "Bob", "age": 35, "city": "Los Angeles"}'
]

with beam.Pipeline() as p:
    (p
        | "Create Data" >> beam.Create(data)
        | "Parse JSON" >> beam.Map(json.loads)
        | "Print" >> beam.Map(print)
     )
Enter fullscreen mode Exit fullscreen mode
{'name': 'John', 'age': 30, 'city': 'New York'}
{'name': 'Alice', 'age': 25, 'city': 'San Francisco'}
{'name': 'Bob', 'age': 35, 'city': 'Los Angeles'}
Enter fullscreen mode Exit fullscreen mode

Read or Write Data in CSV, JSON, or Parquet Format

So far, we've used simple input and output operations with local variables and the print function to display results on stdout. However, Apache Beam supports reading and writing various data formats, such as CSV, JSON, and Parquet. Let's explore examples of reading and writing files in these formats.

To read CSV data, we can use beam.io.ReadFromCsv and beam.io.ReadFromParquet. For JSON data, we first read it as text data using beam.io.ReadFromText, identifying each line as a separate row, and then parse it using beam.Map(json.loads).

# make sample csv, json, and parquet data.
import pandas as pd

data = {
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 22],
    'City': ['New York', 'San Francisco', 'Los Angeles']
}

pd.DataFrame(data).to_csv("input.csv", index=False)
pd.DataFrame(data).to_json("input.json", orient='records', lines=True)
pd.DataFrame(data).to_parquet("input.parquet")
Enter fullscreen mode Exit fullscreen mode

Now we have data in the following formats:

input.csv

Name,Age,City
Alice,25,New York
Bob,30,San Francisco
Charlie,22,Los Angeles
Enter fullscreen mode Exit fullscreen mode

input.json

{"Name":"Alice","Age":25,"City":"New York"}
{"Name":"Bob","Age":30,"City":"San Francisco"}
{"Name":"Charlie","Age":22,"City":"Los Angeles"}
Enter fullscreen mode Exit fullscreen mode

We can read this data with the following code:

print("1. read csv")
with beam.Pipeline() as p:
    (p
        | 'Read CSV' >> beam.io.ReadFromCsv('input.csv')
        | 'Print' >> beam.Map(print)
    )

print("2. read json")
with beam.Pipeline() as p:
    (p
        | 'Read JSON' >> beam.io.ReadFromText('input.json')
        | "Parse JSON" >> beam.Map(json.loads)
        | 'Print' >> beam.Map(print)
    )

print("3. read parquet")
with beam.Pipeline() as p:
    (p
        | 'Read Parquet' >> beam.io.ReadFromParquet('input.parquet')
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
1. read csv
BeamSchema_15bbd568_c8ba_4bea_aec0_a0120a8c4705(Name='Alice', Age=25, City='New York')
BeamSchema_15bbd568_c8ba_4bea_aec0_a0120a8c4705(Name='Bob', Age=30, City='San Francisco')
BeamSchema_15bbd568_c8ba_4bea_aec0_a0120a8c4705(Name='Charlie', Age=22, City='Los Angeles')
2. read json
{'Name': 'Alice', 'Age': 25, 'City': 'New York'}
{'Name': 'Bob', 'Age': 30, 'City': 'San Francisco'}
{'Name': 'Charlie', 'Age': 22, 'City': 'Los Angeles'}
3. read parquet
{'Name': 'Alice', 'Age': 25, 'City': 'New York'}
{'Name': 'Bob', 'Age': 30, 'City': 'San Francisco'}
{'Name': 'Charlie', 'Age': 22, 'City': 'Los Angeles'}
Enter fullscreen mode Exit fullscreen mode

Writing is also straightforward using beam.io.WriteToCsv, beam.io.WriteToText, and beam.io.WriteToParquet. However, there are some points to consider.

Firstly, you need to provide a schema for the data. One approach is to create and specify a class inheriting from typing.NamedTuple. In this example, we use this approach to output CSV and JSON. However, when outputting a Parquet file, you need to specify the pyarrow data type when using beam.io.WriteToParquet with its schema argument.

Secondly, the output will be sharded. So, if you set the output filename as output.csv, the actual output filename will be output.csv-00000-of-00001. This holds true for both JSON and Parquet files, similar to the CSV case.

data = [
    {'Name': 'Alice', 'Age': 25, 'City': 'New York'},
    {'Name': 'Bob', 'Age': 30, 'City': 'San Francisco'},
    {'Name': 'Charlie', 'Age': 22, 'City': 'Los Angeles'}
]

import typing

class PersonalInformation(typing.NamedTuple):
    Name: str
    Age: int
    City: str

# Write CSV
with beam.Pipeline() as p:
  (p
      | 'Create Data' >> beam.Create(data).with_output_types(PersonalInformation)
      | 'Write CSV' >> beam.io.WriteToCsv('output.csv')
  )

# Write JSON
with beam.Pipeline() as p:
  (p
      | 'Create Data' >> beam.Create(data).with_output_types(PersonalInformation)
      | 'Write JSON' >> beam.io.WriteToJson('output.json')
  )

import pyarrow

schema = pyarrow.schema(
    [('Name', pyarrow.string()), ('Age', pyarrow.int64()), ('City', pyarrow.string())]
)

# Write Parquet
with beam.Pipeline() as p:
  (p
      | 'Create Data' >> beam.Create(data)
      | 'Write Parquet' >> beam.io.WriteToParquet("output.parquet", schema)
  )
Enter fullscreen mode Exit fullscreen mode
assert pd.read_csv("output.csv-00000-of-00001").equals(pd.DataFrame(data))
assert pd.read_json("output.json-00000-of-00001", orient='records', lines=True).equals(pd.DataFrame(data))
assert pd.read_parquet("output.parquet-00000-of-00001").equals(pd.DataFrame(data))
Enter fullscreen mode Exit fullscreen mode

Filtering Data Based on Conditions

Data filtering is a common task in data processing, and Apache Beam provides a convenient method to achieve this using the beam.Filter transform. To implement this, we simply need to supply a Python function that returns a boolean value based on the elements of each row.

data = [
    ('Alice', 25, 'New York'),
    ('Bob', 30, 'San Francisco'),
    ('Charlie', 22, 'Los Angeles')
]

with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'Filter Age' >> beam.Filter(lambda x: x[1] > 23)
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', 25, 'New York')
('Bob', 30, 'San Francisco')
Enter fullscreen mode Exit fullscreen mode

Aggregation

For operations similar to GROUP BY in SQL, Apache Beam provides the beam.GroupByKey transform. Before applying this transform, it's necessary to convert the data into tuples, where the first element represents the key. After applying beam.GroupByKey, the resulting data will be a tuple consisting of the key and a list of associated elements.

data = [
    {"Name": 'Alice', "Price": 100},
    {"Name": 'Alice', "Price": 200},
    {"Name": 'Alice', "Price": 100},
    {"Name": 'Bob', "Price": 300},
    {"Name": 'Bob', "Price": 150},
    {"Name": 'Charlie', "Price": 220}
]

with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'Transform' >> beam.Map(lambda x: (x["Name"], x["Price"]))
        | 'Filter Age' >> beam.GroupByKey()
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', [100, 200, 100])
('Bob', [300, 150])
('Charlie', [220])
Enter fullscreen mode Exit fullscreen mode

For calculating the sum for each Name, you can use beam.Map (or beam.ParDo) to perform the summation.

with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'Transform' >> beam.Map(lambda x: (x["Name"], x["Price"]))
        | 'Filter Age' >> beam.GroupByKey()
        | 'Sum' >> beam.Map(lambda x: (x[0], sum(x[1])))
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', 400)
('Bob', 450)
('Charlie', 220)
Enter fullscreen mode Exit fullscreen mode

Joining Data

Data can be joined using beam.CoGroupByKey(), which first performs a GroupBy operation and then a Join. Multiple inputs can be read using the p | 'Create Data' >> beam.Create(data) statement multiple times. The following code results in a somewhat complex output structure.

data_age = [
    ('Alice', 25),
    ('Bob', 30),
    ('Charlie', 22)
]

data_city = [
    ('Alice', 'New York'),
    ('Bob', 'San Francisco'),
    ('Charlie', 'Los Angeles')
]

with beam.Pipeline() as p:
    age = p | 'Create Data Age' >> beam.Create(data_age)
    city = p | 'Create Data City' >> beam.Create(data_city)
    (
        (age, city)
        | 'Merge' >> beam.CoGroupByKey()
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', ([25], ['New York']))
('Bob', ([30], ['San Francisco']))
('Charlie', ([22], ['Los Angeles']))
Enter fullscreen mode Exit fullscreen mode

For a dictionary-structured output, you can pass a dictionary as an input to the beam.CoGroupByKey() operation.

with beam.Pipeline() as p:
    age = p | 'Create Data Age' >> beam.Create(data_age)
    city = p | 'Create Data City' >> beam.Create(data_city)
    (
        {"Age": age, "City": city}
        | 'Merge' >> beam.CoGroupByKey()
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', {'Age': [25], 'City': ['New York']})
('Bob', {'Age': [30], 'City': ['San Francisco']})
('Charlie', {'Age': [22], 'City': ['Los Angeles']})
Enter fullscreen mode Exit fullscreen mode

Since this performs the GroupBy operation first, the output is aggregated if there are many rows per key in the input data.

data_age = [
    ('Alice', 25),
    ('Alice', 30),
    ('Charlie', 22)
]

data_city = [
    ('Alice', 'New York'),
    ('Alice', 'San Francisco'),
    ('Charlie', 'Los Angeles')
]

with beam.Pipeline() as p:
    age = p | 'Create Data Age' >> beam.Create(data_age)
    city = p | 'Create Data City' >> beam.Create(data_city)
    (
        (age, city)
        | 'Merge' >> beam.CoGroupByKey()
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', ([25, 30], ['New York', 'San Francisco']))
('Charlie', ([22], ['Los Angeles']))
Enter fullscreen mode Exit fullscreen mode

The CoGroupByKey merge strategy is equivalent to a full outer join in SQL, allowing for keys present only in the right or left data.

data_age = [
    ('Alice', 25),
    ('Charlie', 22)
]

data_city = [
    ('Alice', 'New York'),
    ('Bob', 'San Francisco'),
]

with beam.Pipeline() as p:
    age = p | 'Create Data Age' >> beam.Create(data_age)
    city = p | 'Create Data City' >> beam.Create(data_city)
    (
        (age, city)
        | 'Merge' >> beam.CoGroupByKey()
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', ([25], ['New York']))
('Charlie', ([22], []))
('Bob', ([], ['San Francisco']))
Enter fullscreen mode Exit fullscreen mode

Additionally, left join or inner join operations in SQL can be emulated by sequentially using beam.Filter.

# left outer join
with beam.Pipeline() as p:
    age = p | 'Create Data Age' >> beam.Create(data_age)
    city = p | 'Create Data City' >> beam.Create(data_city)
    (
        (age, city)
        | 'Merge' >> beam.CoGroupByKey()
        | 'Left' >> beam.Filter(lambda x: len(x[1][0]) > 0)
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', ([25], ['New York']))
('Charlie', ([22], []))
Enter fullscreen mode Exit fullscreen mode
# inner join
with beam.Pipeline() as p:
    age = p | 'Create Data Age' >> beam.Create(data_age)
    city = p | 'Create Data City' >> beam.Create(data_city)
    (
        (age, city)
        | 'Merge' >> beam.CoGroupByKey()
        | 'Left' >> beam.Filter(lambda x: len(x[1][0]) > 0 and len(x[1][1]) > 0)
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('Alice', ([25], ['New York']))
Enter fullscreen mode Exit fullscreen mode

Windowing Data

In Apache Beam, we can perform transformations, aggregations, or other complex operations while considering timestamps for each row. This crucial concept allows us to use the same code and pipeline structure for both batch processing and streaming, though the streaming process is beyond the scope of this article. Each row can have its timestamp, and we can separate it by timestamp window and apply common operations to all windows. Here is an example:

The input data consists of logs from some program. This example first extracts the time from the log and attaches a timestamp. Then, it counts occurrences for each type of log: INFO, WARNING, and ERROR. Window information, including start and end times, can be obtained by accessing beam.DoFn.WindowParam, as shown in the example.

from datetime import datetime

data = [
    '2022-03-08 12:00:00 INFO Something ...',
    '2022-03-08 12:01:00 WARNING ...',
    '2022-03-08 12:02:00 ERROR ...',
    '2022-03-08 12:03:00 INFO ...',
    '2022-03-08 12:04:00 WARNING ...',
    '2022-03-08 12:05:00 ERROR ...',
    '2022-03-08 12:06:00 ERROR ...',
    '2022-03-08 12:08:00 WARNING ...',
    '2022-03-08 12:08:30 ERROR ...'
]

def extract_timestamp_and_log(line):
    parts = line.split()
    timestamp_str = parts[0] + ' ' + parts[1]
    timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
    unix_time = int(timestamp.timestamp())
    return unix_time, parts[2]


def add_start_end_info(row, window=beam.DoFn.WindowParam):
    # window.start and window.end are instances of the `apache_beam.utils.timestamp.Timestamp` class.
    start = datetime.utcfromtimestamp(window.start.seconds()).strftime("%Y-%m-%d %H:%M:%S")
    end = datetime.utcfromtimestamp(window.end.seconds()).strftime("%Y-%m-%d %H:%M:%S")

    return row + (start, end)


with beam.Pipeline() as p:
    (p
        | 'Create Data' >> beam.Create(data)
        | 'Extract Time' >> beam.Map(extract_timestamp_and_log)
        | 'Assign Timestamp' >> beam.Map(lambda x: beam.window.TimestampedValue(x[1], x[0]))
        | 'Fixed Windows' >> beam.WindowInto(beam.window.FixedWindows(300))  # 5 minutes window
        | 'Make Key and Count' >> beam.Map(lambda x: (x, 1))
        | 'Group By Type' >> beam.GroupByKey()
        | 'Count' >> beam.Map(lambda x: (x[0], sum(x[1])))
        | 'Add Window Start and End' >> beam.Map(add_start_end_info)
        | 'Print' >> beam.Map(print)
    )
Enter fullscreen mode Exit fullscreen mode
('INFO', 2, '2022-03-08 12:00:00', '2022-03-08 12:05:00')
('WARNING', 2, '2022-03-08 12:00:00', '2022-03-08 12:05:00')
('WARNING', 1, '2022-03-08 12:05:00', '2022-03-08 12:10:00')
('ERROR', 1, '2022-03-08 12:00:00', '2022-03-08 12:05:00')
('ERROR', 3, '2022-03-08 12:05:00', '2022-03-08 12:10:00')
Enter fullscreen mode Exit fullscreen mode

Displaying a Pipeline DAG

To visualize the Directed Acyclic Graph (DAG) structure of a sample pipeline, we can use the apache_beam.runners.interactive.display.pipeline_graph.PipelineGraph.get_dot() method. The following code defines a pipeline, generates a .dot file illustrating the pipeline diagram, and converts it into a .png image. It's worth noting that we do not use the with statement to define the pipeline, as we are not executing the pipeline; our goal is solely to define its structure.

from apache_beam.runners.interactive.display.pipeline_graph import PipelineGraph

data = ['Hello', 'Apache', 'Beam']

p = beam.Pipeline()
(p
    | "Create Data" >> beam.Create(data)
    | "Print" >> beam.Map(print)
)

dag_dot = PipelineGraph(p).get_dot()

%store PipelineGraph(p).get_dot() >dag.dot

!apt -qqq install graphviz
!dot -Tpng dag.dot > dag.png
from IPython.display import Image
Image('dag.png')
Enter fullscreen mode Exit fullscreen mode
Writing 'PipelineGraph(p).get_dot()' (str) to file 'dag.dot'.
Enter fullscreen mode Exit fullscreen mode

Pipeline DAG

Further Studies

There are plenty of documents about Apache Beam, and the following two sites are good introductions:

💖 💪 🙅 🚩
masudahiroto
masudahiroto

Posted on January 1, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related