+4 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I am loading my CSV file to a data frame and I can do that but I need to skip the starting three lines from the file.

I tried .option() command by giving header as true but it is ignoring the only first line.

val df = spark.sqlContext.read
    .schema(Myschema)
    .option("header",true)
    .option("delimiter", "|")
    .csv(path)
I thought of giving header as 3 lines but I couldn't find the way to do that.

alternative thought: skip those 3 lines from the data frame

4 Answers

+3 votes
by (31.4k points)
edited by

Try to make changes with respect to your schema. This approach will be a safer approach

import org.apache.spark.sql.Row

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 //Read CSV

 val fi = sc.textFile("csvfilelocation")

 //Remove first 3 lines

 val da = fi.mapPartitionsWithIndex{ (id_x, iter) => if (id_x == 0) iter.drop(3) else iter }

 //Create RowRDD by mapping each line to the required fields

 val rowRdd = da.map(x=>Row(x(0), x(1)))

 //create dataframe by calling sqlcontext.createDataframe with rowRdd and your schema   

 val df = sqlContext.createDataFrame(rowRdd, schema)


 

Another approach:

You can add an index, using monotonically_increasing_id.

df.withColumn("Index",monotonically_increasing_id)

 .filter('Index > 2)

 .drop("Index")

But this is not a safe approach, as it only going to work if the first 3 rows are in the first partition and the code may break with further versions of spark.

If you have any doubt regarding spark, refer the following video:

If you wish to learn What is Apache Spark visit this Apache Spark Training by Intellipaat.

+4 votes
by (90.8k points)

A generic way to handle your problem would be to index the data frame and filter the indices that are greater than 2.

Straightforward approach:

As suggested in another answer, you may try adding an index with monotonically_increasing_id.

df.withColumn("Index",monotonically_increasing_id)

  .filter('Index > 2)

  .drop("Index")

Yet, that's only going to work if the first 3 rows are in the first partition. Moreover, as mentioned in the comments, this is the case today but this code may break completely with further versions or spark and that would be very hard to debug. Indeed, the contract in the API is just "The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive". It is therefore not very safe to assume that they will always start from zero. There might even be other cases in the current version in which that does not work (I'm not sure though).

To illustrate my first concern, have a look at this:

scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()

+---+----------+

| id|     Index|

+---+----------+

|  0|         0|

|  1|         1|

|  2|8589934592|

|  3|8589934593|

+---+----------+

We would only remove two rows.

by (63.3k points)
It worked for me!
by (33.2k points)
Well explained answer.
Thanks!
by (14.5k points)
Thank you for this great explanation.
by (31k points)
I had the same problem. This solution worked for me!
Thanks
by (29.8k points)
great solution this works!!!
+2 votes
by (44.6k points)

zipWithIndex from the RDD API can be used to get the successive indices.

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {

  val rdd = df.rdd.zipWithIndex

    .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }

  val newSchema = df.schema

    .add(StructField(name, LongType, false))

  df.sparkSession.createDataFrame(rdd, newSchema)

}

zipWithIndex(df, "index").where('index > 2).drop("index")

Also, this is safer than other methods and you can check it like this:

scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()

+---+-----+

| id|index|

+---+-----+

|  0|   0|

|  1|   1|

|  2|   2|

|  3|   3|

+---+-----+

by (40.3k points)
Thanks for explaining it well.
by (28.1k points)
well explained!
by (47.2k points)
Thank You kodee, it worked for me
0 votes
by (107k points)

You can use the below-mentioned code to skip three rows from the dataframe while loading from a CSV file in scala:-

import csv 

customSchema = StructType([ \ 

StructField("Col1", StringType(), True), \ 

StructField("Col2", StringType(), True)]) 

df = sc.textFile("file.csv")\ 

.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\ 

.toDF(customSchema)

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...