+7 votes
in Big Data Hadoop & Spark by (1.5k points)

How can I convert an RDD to a dataframe?  I converted a data frame to rdd using .rdd. After processing it I want it back in data frame. How to convert it back to rdd

3 Answers

+15 votes
by (13.2k points)

There are multiple ways to create a DataFrame given rdd, you can take a look here. I’ll demonstrate the simple one.

It creates dataframe from rdd containing rows using given schema.

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

If you prefer doing it with DF Helper Function, take a look here.

0 votes
by (11.5k points)

This code works perfectly from Spark 2.x with Scala 2.11

Import necessary classes

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
Create SparkSession Object, Here it's spark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Method 1
Let's an RDD to make it DataFrame

val rdd = sc.parallelize(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))

Using SparkSession.createDataFrame(RDD obj) and specifying column names.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")

|    id|                vals|
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|

Method 2 (Actual answer to question)
Here, you are creating the input rdd  of type RDD[Row].

val rowsRdd: RDD[Row] = sc.parallelize(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)

create the schema

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

Now apply both rowsRdd and schema to createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)

|    id|val1|val2|
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|


0 votes
by (33.2k points)

To change an Array[Row] to DataFrame or Dataset, the following works:

Let say, the schema is the StructType for the row, then

val rows: Array[Row]=...

implicit val encoder = RowEncoder.apply(schema)

import spark.implicits._


You can refer to the following video for more information regarding the same:

Welcome to Intellipaat Community. Get your technical queries answered by top developers !