Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm quite new to Spark and I'm trying to implement some iterative algorithm for clustering (expectation-maximization) with centroid represented by Markov model. So I need to do iterations and joins.

One problem that I experience is that each iterations time growth exponentially.
After some experimenting I found that when doing iterations it's needed to persist RDD that is going to be reused in the next iteration, otherwise every iteration spark will create execution plan that will recalculate the RDD from from start, thus increasing calculation time.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))       
    init = init2.map(lambda n: n[0])
#     init.cache()

    print init.count()   
    print str(datetime.datetime.now() - start)


Results in:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

 So adding cache() helps and iteration time become constant.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))       
    init = init2.map(lambda n: n[0])
    init.cache()

    print init.count()   
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368


But when making Join inside the iteration the problem comes back. Here is some simple code I demonstrating the problem. Even making cache on each RDD transformation doesn't solve the problem:

init = sc.parallelize(xrange(10000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))
    init2.cache()

    init3 = init.map(lambda n: (n, n*2))
    init3.cache()

    init4 = init2.join(init3)
    init4.count()
    init4.cache()

    init = init4.map(lambda n: n[0])
    init.cache()

    print init.count()   
    print str(datetime.datetime.now() - start)


And here is the output. As you can see iteration time growing exponentially :(

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815


I will really appreciate any help

1 Answer

0 votes
by (32.3k points)

The problem here is that calling join without specifying the number of partitions may (does) result in a growing number of partitions. The number of partitions can grow (apparently) without bound. There are (at least) two ways to prevent the number of partitions from growing (without bound) when repeatedly calling join.

Method 1:

You can specify the number of partitions manually when you call join. For example

rdd1.join(rdd2, numPartitions)

This will ensure that the number of Partitions does not exceed numPartitions and in particular the number of partitions will not continually grow.

Method 2:

Whenever you create your SparkConf just specify the default level of parallelism. Now, when you set this value, and call functions like join without specifying numPartitions, the default parallelism will be used instead, effectively capping the number of partitions and preventing them from growing. You can set this parameter as

conf=SparkConf.set("spark.default.parallelism", numPartitions)

sc = SparkContex(conf=conf)

Browse Categories

...