0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.

One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.

But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.

The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.

Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?

1 Answer

0 votes
by (32.5k points)
edited by

There is no explicit way to use partitionBy on a DataFrame.

I would suggest you to use Spark >= 2.3.0

Here,  SPARK-22614 exposes range partitioning.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain

// == Parsed Logical Plan ==

// 'RepartitionByExpression ['k ASC NULLS FIRST], 42

// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]

// 

// == Analyzed Logical Plan ==

// k: string, v: int

// RepartitionByExpression [k#5 ASC NULLS FIRST], 42

// +- Project [_1#2 AS k#5, _2#3 AS v#6]

//    +- LocalRelation [_1#2, _2#3]

// 

// == Optimized Logical Plan ==

// RepartitionByExpression [k#5 ASC NULLS FIRST], 42

// +- LocalRelation [k#5, v#6]

// 

// == Physical Plan ==

// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)

// +- LocalTableScan [k#5, v#6]

SPARK-22389 exposes external format partitioning in the Data Source API v2.

If you want to know more about Spark, then do check out this awesome video tutorial:

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...