Map, Filter, and CombinePerKey Transforms in Writing Apache Beam Pipelines with Examples

Map, Filter, and CombinePerKey Transforms in Writing Apache Beam Pipelines with Examples

Apache Beam is getting popularity as the unified programming model for efficient and portable big data processing pipelines. It can deal with both batch and streaming data. That’s how the name comes from. Beam is combination of the words Batch and Stream:

B(from Batch) + eam(from stream)= Beam

The portability also is a great feature. You just need to focus on running the pipeline and it can be run from anywhere such as Spark, Flink, Apex, or Cloud Dataflow. You don’t need to change the logic or syntax for that.

In this article, we will focus on learning to write some ETL Pipelines using examples. We will try some transform operations using a good dataset and hopefully you will find all this transform operations useful in your work as well.

Please feel free to download this public dataset and follow along:

Sample Sales Data | Kaggle

A Google Colab notebook is used for this exercise. So, installation is very easy. Just use this line of code:

!pip install --quiet apache_beam

After installation is done, I made a directory for this exercise named ‘data’:

mkdir -p data

Let’s dive into today’s topic that is the transform operations. To start with we will work on a simplest pipeline that is just read the CSV file and Write it to a text file.

This is not as simple as Padas read_csv() method. It requires a coder() opeartion. First a CustomCoder() class was defined here that first encode the objects into byte strings, then decode the bytes to its corresponding objects and finally specifies if this coder is guaranteed to encode values deterministically. Please check the documentation here.

If this is your first Pipeline, please notice the syntax for a pipeline. After the CustomCoder() class there is the simplest pipeline. We initiated the empty pipeline as ‘p1’ first. Then we wrote the ‘sales’ Pipeline where first read the CSV file from the data folder that we created earlier. In Apache beam each transform operation in the pipeline starts with this | sign. After reading the data from the CSV file we just write it to a text file. At the end, with a run () method we ran the pipeline. This is the standard and usual pipeline syntax in Apache beam.

import apache_beam as beam
from apache_beam.coders.coders import Coder

class CustomCoder(Coder):
"""A custom coder used for reading and writing strings as UTF-8."""

def encode(self, value):
return value.encode("utf-8", "replace")

def decode(self, value):
return value.decode("utf-8", "ignore")

def is_deterministic(self):
return True
p1 = beam.Pipeline()

sales = (p1
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)

If you check your ‘data’ folder now you will see a ‘output-00000-of-00001’ file there. Printing the first 5 rows from this file to check the data:

!head -n 5 data/output-00000-of-00001


10107,30,95.7,2,2871,2/24/2003 0:00,Shipped,1,2,2003,Motorcycles,95,S10_1678,Land of Toys Inc.,2125557818,897 Long Airport Avenue,,NYC,NY,10022,USA,NA,Yu,Kwai,Small
10121,34,81.35,5,2765.9,5/7/2003 0:00,Shipped,2,5,2003,Motorcycles,95,S10_1678,Reims Collectables,26.47.1555,59 rue de l'Abbaye,,Reims,,51100,France,EMEA,Henriot,Paul,Small
10134,41,94.74,2,3884.34,7/1/2003 0:00,Shipped,3,7,2003,Motorcycles,95,S10_1678,Lyon Souveniers,+33 1 46 62 7555,27 rue du Colonel Pierre Avia,,Paris,,75508,France,EMEA,Da Cunha,Daniel,Medium
10145,45,83.26,6,3746.7,8/25/2003 0:00,Shipped,3,8,2003,Motorcycles,95,S10_1678,,6265557265,78934 Hillside Dr.,,Pasadena,CA,90003,USA,NA,Young,Julie,Medium
10159,49,100,14,5205.27,10/10/2003 0:00,Shipped,4,10,2003,Motorcycles,95,S10_1678,Corporate Gift Ideas Co.,6505551386,7734 Strong St.,,San Francisco,CA,,USA,NA,Brown,Julie,Medium


Let’s see how to use a Map transform in the above pipeline. This is the most common transform operation. The transformation you specify in the Map will apply to every single element in the PCollection.

For example, I would like to add a split method to create lists out of every element in the PCollection. Here we will use lambda for Map transform. If you are not familiar with lambda, observe this code with lambda here. After lanbda we mentioned ‘row’, any other name of the variable is fine too. Whatever function or method we would apply to the ‘row’ that will be applied to every element in the PCollection.

p2 = beam.Pipeline()
sales = (p2
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
|beam.Map(lambda row: row.split(','))

Look, it is the exact same syntax. Just I put one extra line of code in between read and write operation. Again printing the first 5 rows of the output to check:

!head -n 5 data/output2-00000-of-00001


['10107', '30', '95.7', '2', '2871', '2/24/2003 0:00', 'Shipped', '1', '2', '2003', 'Motorcycles', '95', 'S10_1678', 'Land of Toys Inc.', '2125557818', '897 Long Airport Avenue', '', 'NYC', 'NY', '10022', 'USA', 'NA', 'Yu', 'Kwai', 'Small']
['10121', '34', '81.35', '5', '2765.9', '5/7/2003 0:00', 'Shipped', '2', '5', '2003', 'Motorcycles', '95', 'S10_1678', 'Reims Collectables', '26.47.1555', "59 rue de l'Abbaye", '', 'Reims', '', '51100', 'France', 'EMEA', 'Henriot', 'Paul', 'Small']
['10134', '41', '94.74', '2', '3884.34', '7/1/2003 0:00', 'Shipped', '3', '7', '2003', 'Motorcycles', '95', 'S10_1678', 'Lyon Souveniers', '+33 1 46 62 7555', '27 rue du Colonel Pierre Avia', '', 'Paris', '', '75508', 'France', 'EMEA', 'Da Cunha', 'Daniel', 'Medium']
['10145', '45', '83.26', '6', '3746.7', '8/25/2003 0:00', 'Shipped', '3', '8', '2003', 'Motorcycles', '95', 'S10_1678', '', '6265557265', '78934 Hillside Dr.', '', 'Pasadena', 'CA', '90003', 'USA', 'NA', 'Young', 'Julie', 'Medium']
['10159', '49', '100', '14', '5205.27', '10/10/2003 0:00', 'Shipped', '4', '10', '2003', 'Motorcycles', '95', 'S10_1678', 'Corporate Gift Ideas Co.', '6505551386', '7734 Strong St.', '', 'San Francisco', 'CA', '', 'USA', 'NA', 'Brown', 'Julie', 'Medium']

Look, each element has become a list.


Next, I will add Filter transform to the above code block as well. Lambda can be used here again for filter as well. We will filter out all the data and keep only the data for ‘Classic Cars’ from Produc line. The 11th column of the dataset is the product line. As you know Python is zero indexed. So, counting of column number also starts from zero.

p3 = beam.Pipeline()
sales = (p3
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
|beam.Map(lambda row: row.split(','))
|beam.Filter(lambda row:row[10] == "Classic Cars")

As before, printing the first 5 rows for checking:

!head -n 5 data/output3-00000-of-00001


['10103', '26', '100', '11', '5404.62', '1/29/2003 0:00', 'Shipped', '1', '1', '2003', 'Classic Cars', '214', 'S10_1949', 'Baane Mini Imports', '07-98 9555', 'Erling Skakkes gate 78', '', 'Stavern', '', '4110', 'Norway', 'EMEA', 'Bergulfsen', 'Jonas', 'Medium']
['10112', '29', '100', '1', '7209.11', '3/24/2003 0:00', 'Shipped', '1', '3', '2003', 'Classic Cars', '214', 'S10_1949', '"Volvo Model Replicas', ' Co"', '0921-12 3555', 'Berguvsvgen 8', '', 'Lule', '', 'S-958 22', 'Sweden', 'EMEA', 'Berglund', 'Christina', 'Large']
['10126', '38', '100', '11', '7329.06', '5/28/2003 0:00', 'Shipped', '2', '5', '2003', 'Classic Cars', '214', 'S10_1949', '"Corrida Auto Replicas', ' Ltd"', '(91) 555 22 82', '"C/ Araquil', ' 67"', '', 'Madrid', '', '28023', 'Spain', 'EMEA', 'Sommer', 'Martn', 'Large']
['10140', '37', '100', '11', '7374.1', '7/24/2003 0:00', 'Shipped', '3', '7', '2003', 'Classic Cars', '214', 'S10_1949', 'Technics Stores Inc.', '6505556809', '9408 Furth Circle', '', 'Burlingame', 'CA', '94217', 'USA', 'NA', 'Hirano', 'Juri', 'Large']
['10150', '45', '100', '8', '10993.5', '9/19/2003 0:00', 'Shipped', '3', '9', '2003', 'Classic Cars', '214', 'S10_1949', '"Dragon Souveniers', ' Ltd."', '+65 221 7555', '"Bronz Sok.', ' Bronz Apt. 3/6 Tesvikiye"', '', 'Singapore', '', '79903', 'Singapore', 'Japan', 'Natividad', 'Eric', 'Large']

Look at the 11th element of each list in the output above. It’s ‘Classic Cars’.

Answering Some Questions

How much quantity was ordered for each type of automobile?

To find this out, we will first create tuples where the first element or the key will come from the 11th element of the dataset and the second element that means the value will be the second element of the dataset that is ‘QUANTITY ORDERED’. In the next step, we will use CombinePerKey () method. As the name suggest, it will combine the values with an aggregate function for each key.

It will be clearer when you will see the code. Here is the code.

p3a = beam.Pipeline()
sales = (p3a
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
|beam.Map(lambda row: row.split(','))
#|beam.Filter(lambda row:row[10] == "Classic Cars")
|beam.Map(lambda row: (row[10], int(row[1])))
!head -n 10 data/output3a-00000-of-00001

As you can see, we used Map function twice here. First to split and make lists as before and then from each row of data we took the product line that is 10th column, and the Quantity that is second column only.

Here is the output:

('Motorcycles', 30)
('Motorcycles', 34)
('Motorcycles', 41)
('Motorcycles', 45)
('Motorcycles', 49)
('Motorcycles', 36)
('Motorcycles', 29)
('Motorcycles', 48)
('Motorcycles', 22)
('Motorcycles', 41)

Just printed out the first 10 rows of the output. As you can see, we have the quantity ordered for each row of the data here. The next and the final step to answer the question above is to combine all the values for each item. There is CombinePerKey method available in apache beam for that. As the name suggest, it will combine the values with an aggregate function for each key. In this case we need the ‘sum’.

p4 = beam.Pipeline()
sales = (p4
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
|beam.Map(lambda row: row.split(','))
#|beam.Filter(lambda row:row[10] == "Classic Cars")
|beam.Map(lambda row: (row[10], int(row[1])))
!head -n 10 data/output4-00000-of-00001


('Motorcycles', 11663)
('Classic Cars', 33992)
('Trucks and Buses', 10777)
('Vintage Cars', 21069)
('Planes', 10727)
('Ships', 8127)
('Trains', 2712)

So, we have the total quantity ordered for each product.

From which states more than 2000 orders were placed?

This is an interesting question where we need every transform, we have done earlier plus another filter transform. We need to calculate the total number of orders for each state the way we calculated the total number of orders for each product in the previous example. And then the quantity that are more than 2000 should be filtered in.

In all the previous examples, lambda function were used in Map and Filter transforms. Here we will see how we can define a function and use that in the Map or Filter function. A function quantity_filter() is defined here that returns the items with value count more than 2000.

def quantity_filter(row):
name, count = row
if count > 2000:
return row

p7 = beam.Pipeline()
sales = (p7
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
|beam.Map(lambda row: row.split(','))
|beam.Map(lambda row: (row[17], int(row[1])))
!head -n 10 data/output7-00000-of-00001


('NYC', 5294)
('San Francisco', 2139)
('', 33574)

This is the output where if the quantity is not bigger than 2000, it returned ‘None’. I don’t like to see all this ‘None’ values. I will add another filter transform to filter out these ‘None’ values.

p8 = beam.Pipeline() 
sales = (p8
|'data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
|beam.Map(lambda row: row.split(','))
|beam.Map(lambda row: (row[17], int(row[1])))
|beam.Filter(lambda row: row != None)
!head -n 10 data/output8-00000-of-00001


('NYC', 5294)
('San Francisco', 2139)
('', 33574)
('New Bedford', 2043)
('San Rafael', 6366)

So, we have total 5 values returned where the order count is bigger than 2000.


In this tutorial I wanted to show how to use Map, Filter, CombinePerKey transforms in Apache beam in writing ETL pipelines. Hopefully they are clear enough to use in your own projects. I will explain how to use ParDo in my next article.

Leave a Reply

Close Menu