Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y), but I don't see that function for Dataset. So I decided to write one.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})


However, this returns everything to the driver. How would you write this to return a Dataset? Maybe mapPartition and do it there?

Note this compiles but does not run because it doesn't have encoders for Map yet

1 Answer

0 votes
by (32.3k points)

I assume your goal is to translate this idiom to Datasets:

rdd.map(x => (x.someKey, x.someField))

   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

The nearest solution that I have found with the Dataset API looks like this:

ds.map(x => (x.someKey, x.someField))          // [1]

  .groupByKey(_._1)                            

  .reduceGroups((a, b) => (a._1, a._2 + b._2))

  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:

// [1]Now, according to me, having a map before groupByKey is required

//     to end up with the proper type in reduceGroups. After all, we do

//     not want to reduce over the original type, but the FieldType.

// [2] required since reduceGroups converts back to Dataset[(K, V)]

//     not knowing that our V's are already key-value pairs.

Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...

Note: An alternative might be to use groupByKey(_.someKey) as a first step. But here the main problem is that using groupByKey changes the type from a regular Dataset to a KeyValueGroupedDataset. The latter doesn’t carries a regular map function. Instead it offers an mapGroups, which does not seem very convenient because it wraps the values into an Iterator and performs a shuffle according to the docstring.

Related questions

Browse Categories

...