0 votes
1 view
in AWS by (19.2k points)

I am trying to create a simple SQL query on S3 events using Spark. I am loading ~30GB of JSON files as following:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");

d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);

d2.registerTempTable("d2");

Then I am trying to write to file the result of my query:

val users_count = sql("select count(distinct data.user_id) from d2");

users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

But Spark is throwing the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)

at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)

at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)

at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)

at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)

at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)

at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Note that the same query works for smaller amounts of data. What's the problem here?

1 Answer

0 votes
by (44.6k points)

No Spark shuffle block is larger than 2GB (Integer.MAX_VALUE bytes) therefore you need additional / smaller partitions.

You should adjust spark.default.parallelism and spark.sql.shuffle.partitions (default 200) such that the number of partitions can accommodate your data without reaching the 2GB limit (you could try aiming for 256MB / partition so for 200GB you get 800 partitions). Thousands of partitions are very common so don't be afraid to repartition to 1000 as suggested.

For your information, you'll check the number of partitions for an RDD with something like rdd.getNumPartitions (i.e. d2.rdd.getNumPartitions)

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...