I'm trying to implement a Hadoop Map/Reduce job that worked fine before in Spark. The Spark app definition is the following:
val data = spark.textFile(file, 2).cache()
val result = data
.map(//some pre-processing)
.map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
.flatMap(line => MyFunctions.combine(line))
.reduceByKey( _ + _)
Where MyFunctions.combine is
def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
for (i <- 0 to tuples.length - 2;
j <- 1 to tuples.length - 1
) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)
The combine function produces lots of map keys if the list used for input is big and this is where the exceptions is thrown.
In the Hadoop Map Reduce setting I didn't have problems because this is the point where the combine function yields was the point Hadoop wrote the map pairs to disk. Spark seems to keep all in memory until it explodes with a java.lang.OutOfMemoryError: GC overhead limit exceeded.
I am probably doing something really basic wrong but I couldn't find any pointers on how to come forward from this, I would like to know how I can avoid this.