Explore Courses Blog Tutorials Interview Questions
0 votes
in Big Data Hadoop & Spark by (11.4k points)

I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.

I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.

Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.

f I use .option("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shell with the code below.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?

1 Answer

0 votes
by (32.3k points)

Do the following things:

1.Create a new DataFrame(headerDF) containing header names.

2.Union it with the DataFrame(dataDF) containing the data.

3.Output the union-ed DataFrame to disk with option("header", "false").

4.merge partition files(part-0000**0.csv) using hadoop FileUtil

In this way, all partitions will be having no header except for a single partition's content that has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

  //dataFrame is the data to save on disk

  //cast types of all columns to String

  val dataDF = => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names

  import scala.collection.JavaConverters._

  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data

  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file

  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)

  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)

Browse Categories