2 views

edited

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

Now the following code sequence is a less than optimal way to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x, x/countsByKey.value[x])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]

by (32.3k points)

You can do the same using the rdd.aggregateByKey() method.

By KEY, simultaneously calculate the SUM (the numerator for the average that we want to compute), and COUNT (the denominator for the average that we want to compute):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.

>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a + b,    a + 1),

lambda a,b: (a + b, a + b))

Where the following is true about the meaning of each a and b pair above (so you can visualize what's happening):

First lambda expression for Within-Partition Reduction Step::

a: is a TUPLE that holds: (runningSum, runningCount).

b: is a SCALAR that holds the next Value

Second lambda expression for Cross-Partition Reduction Step::

a: is a TUPLE that holds: (runningSum, runningCount).

b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Finally, in order to calculate the average for each KEY follow the steps below:

>>> finalResult = rdd1.mapValues(lambda v: v/v).collect()

>>> print(finalResult)

[(u'2013-09-09', 11.235365503035176),

(u'2013-09-01', 23.39500642456595),

(u'2013-09-03', 13.53240060820617),

(u'2013-09-05', 13.141148418977687),

... snip ...

]