20 Very Commonly Used Functions of PySpark RDD
PySpark RDD Functions

20 Very Commonly Used Functions of PySpark RDD

Apache Spark is very popular in Big Data Analytics. It uses a distributed processing system. PySpark is the interface for Apache Spark in Python. When you have a huge dataset of terabytes size, regular python code will be really slow. But a PySpark algorithm will be much faster. Because it divides the dataset into smaller portions, distributes it into separate processors, performs the operations in each processor separately, and then puts them back together to give you the total output.

This is a high-level overview of how PySpark works faster. This article will focus on some very commonly used functions in PySpark.

If you are a beginner, you can practice using a google-colab notebook. You just have to install using this simple line:

pip install pyspark

It will take just a few minutes to install and the notebook will be ready for PySpark codes.

To start with, it is required to create a SparkContext which is the main entry point for the Spark functionality. It represents the connection to a Spark cluster. Here I am creating a SparkContext:

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

I will start with the most basic functions and move towards more analytics-friendly functions.

sc.parallelize()

Here I am creating a very simple RDD object using this SparkContext using the parallelize method. The parallelized method creates a parallelized collection that allows the distribution of the data.

rdd_small = sc.parallelize([3, 1, 12, 6, 8, 10, 14, 19])

You cannot print an RDD object like a regular list or array in a notebook.

.collect()

If you simply type rdd_small and run in the notebook, the output will look like this:

rdd_small

Output:

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

So, it is a parallelCollectionRDD. Because this data is in the distributed system. You have to collect them back together to be able to use them as a list.

rdd_small.collect()

Output:

[3, 1, 12, 6, 8, 10, 14, 19]

Collecting the whole RDD object may not make sense all the time when the dataset is too large. You may want to take only the first element of data or the first few elements of the data to examine the data structure, type, or quality of the data.

Here I am making a bigger RDD object:

rdd_set = sc.parallelize([[2, 12, 5, 19, 21],
[10, 19, 5, 21, 8],
[34, 21, 14, 8, 10],
[110, 89, 90, 134, 24],
[23, 119, 234, 34, 56]])

.first()

Getting only the first element of the RDD object:

rdd_set.first()

Output:

[2, 12, 5, 19, 21]

.take()

Here I am taking the first three elements:

rdd_set.take(3)

Output:

[[2, 12, 5, 19, 21], [10, 19, 5, 21, 8], [34, 21, 14, 8, 10]]

We get the first three elements as output.

.textFile()

At this point, I want to introduce a text file to demonstrate several different functions.

I copied some texts from the Wikipedia page of the USA and made a text file using a simple notepad. The file is saved as usa.txt. You can download this text file from this link:

Big-Data-Anlytics-Pyspark/usa.txt at main · rashida048/Big-Data-Anlytics-Pyspark

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below…

github.com

 

Here is how to make an RDD using the text file:

lines = sc.textFile("usa.txt")

Let’s use the .take() function again to see the first 4 elements of the file:

lines.take(2)

Output:

["The United States of America (U.S.A. or USA), commonly known as the United States (U.S. or US) or America, is a country primarily located in North America. It consists of 50 states, a federal district, five major unincorporated territories, 326 Indian reservations, and nine minor outlying islands.[h] At nearly 3.8 million square miles (9.8 million square kilometers), it is the world's third- or fourth-largest country by geographic area.[c] The United States shares land borders with Canada to the north and Mexico to the south as well as maritime borders with the Bahamas, Cuba, Russia, and other countries.[i] With a population of more than 331 million people,[j] it is the third most populous country in the world. The national capital is Washington, D.C., and the most populous city and financial center is New York City.",  '']

.flatMap()

It is a common practice to separate the text content for analysis. Here is the use of the flatMap function to split the text data by space and make one big list of strings:

words = lines.flatMap(lambda x: x.split(' '))words.take(10)

Output:

['The',  'United',  'States',  'of',  'America',  '(U.S.A.',  'or',  'USA),',  'commonly',  'known']

The first 10 elements look like this now.

.map()

The map is useful if you want to apply some transformation to each element of an RDD or use a condition. In this case, each element means each word. Here I will make each word lower case and will convert each word as a tuple by adding a 1 with each word.

wordsAsTuples = words.map(lambda x: (x.lower(), 1))wordsAsTuples.take(4)

Output:

[('the', 1), ('united', 1), ('states', 1), ('of', 1)]

Here is a little explanation of what happens. The ‘x’ in the lambda expression represents each element of an RDD. Whatever you do to ‘x’ applies to every element in the RDD.

Here, we transformed ‘x’ as (x, 1). So, each word comes out as (word, 1). Look at the output carefully.

.reduceByKey

This function is useful where there is a key-value pair and you want to add all the values of the same key. For example, in the wordsAsTuples above we have key-value pairs where keys are the words and values are the 1s. Usually, the first element of the tuple is considered as the key and the second one is the value.

If we use reduceByKey on wordsAsTuples, it will add up the 1s we added for the same key (that means the same words). If we have 4 ‘the’, it will add four 1s and will make it (‘the’, 4)

counts = wordsAsTuples.reduceByKey(lambda x, y: x+y)
counts.take(3)

Output:

[('united', 14), ('of', 20), ('america', 1)]

So, in the our text data ‘united appeared 14 times, ‘of’ appeared 20 times and ‘america’ appeared only once.

.top()

Returns the specified top elements. I will explain some more after this example:

counts.top(20, lambda x: x[1])

Output:

[('the', 55),  
('and', 24),
('of', 20),
('united', 14),
('is', 13),
('in', 13),
('a', 13),
('states', 12),
('it', 9),
('to', 7),
('as', 6),
("world's", 6),
('by', 6),
('world', 5),
('with', 5),
('american', 5),
('war', 5),
('or', 4),
('north', 4),
('its', 4)]

What happens here? In this command, we are saying that we want the top 20 elements. Then x[1] is specified as a condition in lambda expression. In a tuple like (‘the’, 55), ‘the’ is x[0] and 55 is x[1]. In lambda, specifying x[1] means we want the top 20 elements based on x[1] of each element. So, it returns the top 20 words based on the occurrences in the text file.

If you use x[0] as the condition on lambda, it will return the top 20 based on the alphabetic order as the x[0] is a string. Please feel free to try.

.filter()

In the top 20 words above, most of the words are not very significant. The words like “to”, ‘the’, ‘with’, ‘in’ do not provide any insights into the text. It is a common practice to omit those types of insignificant words while dealing with text data. Though it’s not a good idea all the time.

If we can exclude some of those insignificant words we may see some more meaningful words in the top 20 list.

Here is the list of words that I want to exclude from the text before taking the top 20 words:

stop = ['', 'the', 'and', 'of', 'is', 'in', 'a', 'it', 'to', 'as', 'by', 'with', 'or', 'its', 'from', 'at']

Now, we will filter out those words:

words_short = counts.filter(lambda x: x[0] not in stop)

This new RDD words_short does not have those words we listed in ‘stop’.

Here are the top 20 words now:

words_short.top(20, lambda x: x[1])

Output:

[('united', 14),
('states', 12),
("world's", 6),
('world', 5),
('american', 5),
('war', 5),
('north', 4),
('country', 3),
('population', 3),
('new', 3),
('established', 3),
('war,', 3),
('million', 3),
('military', 3),
('international', 3),
('largest', 3),
('america,', 2),
('states,', 2),
('square', 2),
('other', 2)]

We do not have any of those words in the ‘stop’ list.

.sortByKey()

We can sort the whole RDD using this .sortByKey() function. As the name says, it sorts the RDD by the keys. In the ‘counts’ RDD, the keys are the character strings. So, it will sort by alphabetic order.

counts.sortByKey().take(10)

Output:

[('', 3),  
('(1775–1783),', 1),
('(9.8', 1),
('(u.s.', 1),
('(u.s.a.', 1),
('12,000', 1),
('16th', 1),
('1848,', 1),
('18th', 1),
('1969', 1)]

As you can see the empty string came first and then the numeric strings. Because numeric key comes before the letters in alphabetic order.

By default, the sort gives you the results in ascending order. But If you pass False in the sortByKey function, it will sort in descending order. Here we are sorting in descending order and taking the first 10 elements:

counts.sortByKey(False).take(10)

Output:

[('york', 1),  
('years', 1),
('world.', 1),
('world,', 1),
("world's", 6),
('world', 5),
('with', 5),
('which', 1),
('when', 1),
('west.', 1)]

It is also possible to apply a function or condition before sorting in the .sortByKey function. Here is an RDD:

r1 = [('a', 1), ('B', 2), ('c', 3), ('D', 4), ('e', 5)]

In the RDD r1, some keys are in lower case and some keys are in upper case. If we sort this by keys the upper case letters will come first and then the lower case letters by default.

r1.sortByKey().collect()

Output:

[('B', 2), ('D', 4), ('a', 1), ('c', 3), ('e', 5)]

But if we want to avoid the case part and want the function to sort without using the case sensitivity, we can add a condition in the .sortByKey function. Here is an example:

r1.sortByKey(True, keyfunc=lambda k: k.upper()).collect()

Output:

[('a', 1), ('B', 2), ('c', 3), ('D', 4), ('e', 5)]

In the lambda expression above, we are asking the function to consider all the keys as upper case and then sort. It only considers all the keys as an upper case but does not return the keys as an upper case.

.groupByKey()

This function groupByKey() groups all the values based on the keys and aggregates them. As a reminder, the first element in the tuple is the key and the second element is the value by default. Let’s see an example before further discussion on this:

numbers_only = wordsAsTuples.groupByKey().map(lambda x: sum(x[1]))
numbers_only.take(10)

Output:

[14, 20, 1, 1, 1, 6, 2, 13, 3, 1]

In this case, the keys are the words. Suppose ‘the’ is a key and when we use groupByKey(), it groups all the values of this key ‘the’ and aggregates them as specified. Here I used sum() as the aggregate function. So, it sums up all the values. We got the occurrences of each word. But this time we got only the numbers of occurances as a list.

.reduce()

It is used to reduce the RDD elements. Reducing the number_only we got from the last example:

total_words = numbers_only.reduce(lambda x, y: x+y)
total_words

Output:

575

We got 575. That means there is a total of 575 words in the text file.

.mapValues()

It can be used to apply some transformation to the value of the key-value pair. It returns the key and the transformed values. Here is an example where keys are the strings and values are the integers. I will divide the values by 2:

rdd_1 = sc.parallelize([("a", 3), ("n", 10), ("s", 5), ("l", 12)])rdd_1.mapValues(lambda x: x/2).collect()

Output:

[('a', 1.5), ('n', 5.0), ('s', 2.5), ('l', 6.0)]

Here ‘x’ in the lambda expression represents the values. So, whatever you do to the ‘x’ applies to all the values in the RDD.

One more example will be helpful to understand it better. In this example, a different RDD is used where keys are the strings and values are the lists of integers. We will use an aggregate function on the lists.

rdd_map = sc.parallelize([("a", [1, 2, 3, 4]), ("b", [10, 2, 8, 1])])rdd_map.mapValues(lambda x: sum(x)).collect()

Output:

[('a', 10), ('b', 21)]

Look at the output here. Each value is the summation of the integers in the lists of the values.

.countByValue()

Returns the number of occurrences of each element of the RDD in a dictionary format.

sc.parallelize([1, 2, 1, 3, 2, 4, 1, 4, 4]).countByValue()

Output:

defaultdict(int, {1: 3, 2: 2, 3: 1, 4: 3})

The output shows a dictionary where keys are the distinct elements of the RDD and values are the number of occurrences of those distinct values.

.getNumPartitions()

RDD objects are stored as clusters of elements. In other words, an RDD object is divided into a number of partitions. We do not make these divisions. This is the nature of the RDDs. It happens by default.

Clusters can run one task concurrently for all the partitions. As the name suggests, this function .getNumPartitions() tells you how many partitions are there.

data = sc.parallelize([("p",5),("q",0),("r", 10),("q",3)])data.getNumPartitions()

Output:

2

We have 2 partitions in the ‘data’ object.

You can see how they are divided using a .glom() function:

data.glom().collect()

Output:

[[('a', 1), ('b', 2)], [('a', 2), ('b', 3)]]

It shows two lists of elements. Because there are two partitions.

Union

You can combine two RDDs using union. For example, here I am making two RDDs ‘rd1’ and ‘rd2’. Then I use the union to join them together to create ‘rd3’.

rd1 = sc.parallelize([2, 4, 7, 9])rd2 = sc.parallelize([1, 4, 5, 8, 9])rd3 = rd1.union(rd2)rd3.collect()

Output:

[2, 4, 7, 9, 1, 4, 5, 8, 9]

The newly formed RDD ‘rd3’ includes all the elements of ‘rd1’ and ‘rd2’.

.distinct()

It returns the distinct elements of an RDD.

rd4 = sc.parallelize([1, 4, 2, 1, 5, 4])rd4.distinct().collect()

Output:

[4, 2, 1, 5]

We have only the distinct elements of the ‘rd4’.

.zip()

When we use zip on two RDDs, they create tuples using the elements of both the RDDs. An example will demonstrate it clearly:

rd11 = sc.parallelize(["a", "b", "c", "d", "e"])rdda = sc.parallelize([1, 2, 3, 4, 5])rda_11 = rdda.zip(rd11)
rda_11.collect()

Output:

[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]

In the zip operation, we mentioned ‘rdda’ first. So, in the output, the elements of ‘rdda’ come first.

Joins

The last thing to work on in this article is the joins. The name tells you already that it joins two RDDs. Let’s make another RDD like ‘rda_11’ and then we will join.

rddb = sc.parallelize([1, 3, 4, 6])rd22 = sc.parallelize(["apple", "ball", "cal", "dog"])
rdb_22 = rddb.zip(rd22)
rdb_22.collect()

Output:

[(1, 'apple'), (3, 'ball'), (4, 'cal'), (6, 'dog')]

We have ‘rdb_22’ now. Let’s join ‘rdda_11’ from the previous example and ‘rdb_22’ together:

rda_11.join(rdb_22).collect()

Output:

[(4, ('d', 'cal')), (1, ('a', 'apple')), (3, ('c', 'ball'))]

By default, the join operation joins two RDDs on keys. As a reminder again, the first element of each tuple is considered as key.

In the basic join operation, only the elements of the common keys in both RDDs join together.

There are other kinds of joins. Here is an example of the left outer join:

rda_11.leftOuterJoin(rdb_22).collect()

Output:

[(4, ('d', 'cal')),  
(1, ('a', 'apple')),
(5, ('e', None)),
(2, ('b', None)),
(3, ('c', 'ball'))]

As this is the left outer join, the RDD that is mentioned in the left, in this case, ‘rda_11’ will bring all its elements. But the order of the elements may not be the same as ‘rda_11’. The RDD that is on the right side will bring only the elements that are common with the RDD on the left side.

There is also a right outer join that does exactly the opposite:

rda_11.rightOuterJoin(rdb_22).collect()

Output:

[(4, ('d', 'cal')),  
(1, ('a', 'apple')),
(6, (None, 'dog')),
(3, ('c', 'ball'))]

Finally, there is a full outer join that returns each element from both the RDDs.

rda_11.fullOuterJoin(rdb_22).collect()

Output:

[(4, ('d', 'cal')),
(1, ('a', 'apple')),
(5, ('e', None)),
(2, ('b', None)),
(6, (None, 'dog')),
(3, ('c', 'ball'))]

As you can see, this has all the keys from both the RDDs.

Conclusion

I wanted to list the most commonly used and the simple RDD operations that can handle a lot of tasks. There are a lot more RDD operations. I will probably come up with some more sometime later. Hopefully, this was helpful.

Please feel free to follow me on Twitter, the Facebook page, and check out my YouTube channel.

Leave a Reply

Close Menu