ParDo and DoFn Implementation in Apache Beam in Details
Apache Beam, Python

ParDo and DoFn Implementation in Apache Beam in Details

I wrote a tutorial on some common transform functions in Apache Beam in a previous tutorial that covered map, filter, and combinePerKey(). This tutorial will be for ParDo transform which is nothing but another way of doing Map. But the difference is ParDo applies the transform in each PCollection and returns zero or more elements to the output PCollection. On the other hand, Map transform outputs exactly one element for each input element. In that way, ParDo provides us with a lot of flexibility to work with.

Another important aspect of Pardo transform is, it requires the user code in the form of DoFn. Let’s look at some examples.

Please feel free to download this public dataset and follow along:

Sample Sales Data | Kaggle

I used a Google Colab notebook to work on this code, so it is very easy to install. Here is the code to install it:

!pip install --quiet apache_beam

I created a directory named ‘data’ to put CSV file we will use and to put the outputs of our exercise today.

mkdir -p data

To start with, I will only work on the simplest thing on the dataset. Reading the dataset and make a list out of every row in the dataset and output them in a text file.

Reading a text file in a beam pipeline is very simple and straightforward. We have a CSV file. So. we will define a CustomCoder() class for this that encodes the objects into byte string first, then decode the bytes to its objects, and lastly specifies if the coder is guaranteed to encode values deterministically. Here is the documentation for the coder.

from apache_beam.coders.coders import Coder
class CustomCoder(Coder): “””A custom coder used for reading and writing strings as UTF-8.””” def encode(self, value): return value.encode(“utf-8”, “replace”) def decode(self, value): return value.decode(“utf-8”, “ignore”) def is_deterministic(self): return True

There is also SplitRow() class that simply uses Python’s .split() function.

class SplitRow(beam.DoFn):
  def process(self, element):
    return [element.split(',')]

Once the necessary classes are done, we define the pipeline that starts with calling of the Pipeline() function and save it in a variable. Each transform starts with ‘|’ sign. The first transform here is a read transform and last transform is a Write transform. The ParDo transform comes in the middle after the read and before the write. In the ParDo transform SplitRow class is passed as a DoFn, which should apply the transform in each element of this PCollection. That means each row of the CSV file will use the split() function as we defined in the SplitRow() class.

Here is the full pipeline:

import apache_beam as beam
p = beam.Pipeline()
read_write = (p |‘data/sales_data_sample.csv’, coder = CustomCoder(), skip_header_lines=1) |beam.ParDo(SplitRow()) |‘data/output’) )

Checking the output:

!head -n 5 data/output-00000-of-00001


['10107', '30', '95.7', '2', '2871', '2/24/2003 0:00', 'Shipped', '1', '2', '2003', 'Motorcycles', '95', 'S10_1678', 'Land of Toys Inc.', '2125557818', '897 Long Airport Avenue', '', 'NYC', 'NY', '10022', 'USA', 'NA', 'Yu', 'Kwai', 'Small']
['10121', '34', '81.35', '5', '2765.9', '5/7/2003 0:00', 'Shipped', '2', '5', '2003', 'Motorcycles', '95', 'S10_1678', 'Reims Collectables', '26.47.1555', "59 rue de l'Abbaye", '', 'Reims', '', '51100', 'France', 'EMEA', 'Henriot', 'Paul', 'Small']
['10134', '41', '94.74', '2', '3884.34', '7/1/2003 0:00', 'Shipped', '3', '7', '2003', 'Motorcycles', '95', 'S10_1678', 'Lyon Souveniers', '+33 1 46 62 7555', '27 rue du Colonel Pierre Avia', '', 'Paris', '', '75508', 'France', 'EMEA', 'Da Cunha', 'Daniel', 'Medium']
['10145', '45', '83.26', '6', '3746.7', '8/25/2003 0:00', 'Shipped', '3', '8', '2003', 'Motorcycles', '95', 'S10_1678', '', '6265557265', '78934 Hillside Dr.', '', 'Pasadena', 'CA', '90003', 'USA', 'NA', 'Young', 'Julie', 'Medium']
['10159', '49', '100', '14', '5205.27', '10/10/2003 0:00', 'Shipped', '4', '10', '2003', 'Motorcycles', '95', 'S10_1678', 'Corporate Gift Ideas Co.', '6505551386', '7734 Strong St.', '', 'San Francisco', 'CA', '', 'USA', 'NA', 'Brown', 'Julie', 'Medium']

In the next example, a more complex example is shown.

We wanted to show the number of motorcyles in each status.

For this one, we need some more classes. This will start with the SplitRow() class we defined earlier for the first example. The class filterMotorcycles below is defined to filter out all the other rows and keep only the rows with Motorcycles int he ‘Productline’ column, which is the 10th column of the data considering that the count is zero-indexed.

class filterMotorcycles(beam.DoFn):
  def process(self, element):
    if element[10] == "Motorcycles":
      return [element]

The next class is PairStatus class that just adds the Status column (that is the 6th column) to the 10th column separated the values by a comma. The example output looks like ‘motorcycle, shipped’ as one string. And then we will make a tuple where this string is the key and the value is 1.

class PairStatus(beam.DoFn):
  def process(self, element):
    return [(element[10] + ', ' + element[6], 1)]

The last one is the Counting class. In this class, one element is defined as a key, value-paired tuple. Then returns the keys with the values summed up.

So, you can see it will not return the same number of output element as the input element.

class Counting(beam.DoFn):
  def process(self, element):
    (k, v) = element
    return [(k, sum(v))]

Finally, the pipeline starts again with the calling beam.Pipeline(). After read and SplitRow() transform as in the previous example, filterMotorcycles class needs to filter all the data except for the motorcycles data and PairStatus class will add the ‘STATUS’ column with ‘PRODUCTLINE’ and pair an 1 to make tuples like this:

[(‘Motorcycles, Shipped’, 1)

(‘Motorcycles, Shipped’, 1)

(‘Motorcycles, Disputed’, 1)

(‘Motorcycles, Disputed’, 1)

(‘Motorcycles, On Hold’, 1)]

After that, we will use Apache Beam’s own GroupByKey() method that will group each key and the corresponding values as below:

[(‘Motorcycles, Shipped’, [1, 1, 1…1, 1])





(‘Motorcycles, Disputed’, [1, 1, 1…1, 1])]

And then, the counting method will sum up all the 1’s to give you the total number of each status motorcycles have.

p2 = beam.Pipeline()
shipped_count = (p2 |‘data/sales_data_sample.csv’, coder=CustomCoder(), skip_header_lines=1) |beam.ParDo(SplitRow()) |beam.ParDo(filterMotorcycles()) |beam.ParDo(PairStatus()) |beam.GroupByKey() |beam.ParDo(Counting()) |‘data/output2’) ) !head -n 5 data/output2-00000-of-00001


('Motorcycles, Shipped', 324)
('Motorcycles, Disputed', 6)
('Motorcycles, On Hold', 1)

It’s done. We got 324 motorcycles shipped, 6 disputed, and 1 on hold.

That’s all.


In this tutorial, I wanted to work on some examples to explain how ParDo works in apache beam. I hope the examples and explanations are clear enough.

Leave a Reply

Close Menu