Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I have a pyspark dataframe consisting of one column, called json, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}
'


df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
I've tried mapping over each row with json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()


But this returns a TypeError: expected string or buffer

I suspect that part of the problem is that when converting from a dataframe to an rdd, the schema information is lost, so I've also tried manually entering in the schema info:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()


But I get the same TypeError.

1 Answer

0 votes
by (32.3k points)

Converting a dataframe with json strings to structured dataframe is actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

For example:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))

>>> new_df.printSchema()

root

 |-- body: struct (nullable = true)

 |    |-- id: long (nullable = true)

 |    |-- name: string (nullable = true)

 |    |-- sub_json: struct (nullable = true)

 |    | |-- id: long (nullable = true)

 |    | |-- sub_sub_json: struct (nullable = true)

 |    | |    |-- col1: long (nullable = true)

 |    | |    |-- col2: string (nullable = true)

 |-- header: struct (nullable = true)

 |    |-- foo: string (nullable = true)

 |    |-- id: long (nullable = true)


 

Also,  Since Spark 2.1+, you can use from_json which allows the preservation of the other non-json columns within the dataframe as follows:

from pyspark.sql.functions import from_json, col

json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema

df.withColumn('json', from_json(col('json'), json_schema))

Now, just let Spark derive the schema of the json string column. Then the df.json column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType and all the other columns of df are preserved as-is.

You can access the json content as follows:

df.select(col('json.header').alias('header'))

...