0 votes
1 view
in Big Data Hadoop & Spark by (43.2k points)

How can you write to multiple outputs dependent on the key using Spark in a single Job.

E.g.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

would ensure cat prefix/1 is

a
b
and cat prefix/2 would be
c

1 Answer

0 votes
by (31.4k points)
edited by

Just check this example it will solve your problem:

import org.apache.hadoop.io.NullWritable

import org.apache.spark._

import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {

  override def generateActualKey(key: Any, value: Any): Any = 

    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 

    key.asInstanceOf[String]

}

object Split {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Split" + args(1))

    val sc = new SparkContext(conf)

    sc.textFile("input/path")

    .map(a => (k, v)) // Your own implementation

    .partitionBy(new HashPartitioner(num))

    .saveAsHadoopFile("output/path", classOf[String], classOf[String],

      classOf[RDDMultipleTextOutputFormat])

    spark.stop()

  

}

Actually, you don't need customized partitions. The MultipleTextOutputFormat will create a file for each key. It is ok that multiple records with the same keys fall into the same partition, new HashPartitioner(num), where the “num” is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

 Important Points :

- a hash partition will ensure that all records with the same key go to the same partition

If you want to know more about Apache Spark, refer to the following video tutorial:

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...