Oct 10, 2017

The following example shows a deliberate combination of local and global variables and function parameters, and how global variables work:

def foo(x, y):
	global a
	a = 42
	x,y = y,x
	b = 33
	b = 17
	c = 100
	print a,b,x,y

a,b,x,y = 1,15,3,4
foo(17,4)
print a,b,x,y 

The output of the script above looks like this:

42 17 4 17
42 15 3 4 

What is the output of the following code?

def increment_counter():
    global counter
    counter += 1
    print(counter)

def get_number_of_elements(rdd):
    global counter
    counter = 0
    rdd.foreach(lambda x: increment_counter())
    return counter 

It is 0.

My guess is that although the function increment_counter() increments the number of element in rdd correctly, it will not assign the value of counter from function increment_counter() to the counter in function get_number_of_elements(rdd). In other word, it (rdd.foreach) won’t update counter in get_number_of_elements() because global variable is not caught by the functions executed in foreach in pyspark. Thus, the counter returned from get_number_of_element(rdd) is still 0 no matter how many the number of element rdd has.

You can check it out using the following code.

from __future__ import print_function

from pyspark.sql import SparkSession

def increment_counter():
    global counter
    counter += 1
    print(counter)

def get_number_of_elements(rdd):
    global counter
    counter = 0
    rdd.foreach(lambda x: increment_counter())
    return counter

if __name__ == "__main__":
    # Initialize the spark context.
    spark = SparkSession\
       .builder\
        .appName("test")\
        .getOrCreate()

    lines = spark.read.text('url.txt').rdd.map(lambda r: r[0])
    #print(lines.collect())
    print(get_number_of_elements(lines))

    spark.stop()

Thus, we should use reduceByKey instead.



Copyright © 2016 - 2018, Long Wang. All rights reserved.