Explore Courses Blog Tutorials Interview Questions
0 votes
in Big Data Hadoop & Spark by (11.4k points)

One of the great benefits of the Parquet data storage format is that it's columnar. If I've got a 'wide' dataset with hundreds of columns, but my query only touches a few of those, then it's possible read only the data that stores those few columns, and skip the rest.
Presumably this feature works by reading a bit of metadata at the head of a parquet file that indicates the locations on the filesystem for each column. The reader can then seek on disk to read in only the necessary columns.
Does anyone know whether spark's default parquet reader correctly implements this kind of selective seeking on S3?

1 Answer

0 votes
by (32.3k points)

While working with Spark Apache Parquet gives the fastest read performance. Using Parquet, data is arranged in a columnar fashion, where it puts related values in close proximity to each other to optimize query performance, minimize I/O, and facilitate compression. Parquet detects and encodes the similar or same data, using a technique that conserves resources. It also stores column metadata and statistics, which can be pushed down to filter columns.

Spark 2.x  comes with a vectorized Parquet reader that does decompression and decoding in column batches, providing ~ 10x faster read performance.


Let me break down some facts that will clear your confusion:

  1. Does the Parquet code get the predicates from spark? Yes

  2. Does parquet then attempt to selectively read only those columns, using the Hadoop FileSystem seek() + read() or readFully(position, buffer, length) calls? Yes

  3. Does the S3 connector translate these File Operations into efficient HTTP GET requests?  In Amazon EMR: Yes. In Apache Hadoop, you need hadoop 2.8 on the classpath and set the properly spark.hadoop.fs.s3a.experimental.fadvise=random to trigger random access.

Hadoop 2.7 and earlier handle the aggressive seek() round the file badly, because they always initiate a GET offset-end-of-file, get surprised by the next seek, have to abort that connection, reopen a new TCP/HTTPS 1.1 connection (slow, CPU heavy), do it again, repeatedly. The random IO operation affects on bulk loading of things like .csv.gz.

Also, you don't get the speedup on Hadoop 2.7's hadoop-aws JAR. If you need it you need to update hadoop*.jar and dependencies, or build Spark up from scratch against Hadoop 2.8

Note: Hadoop 2.8+ also has a nice little feature: if you call toString() on an S3A filesystem client in a log statement, it prints out all the filesystem IO stats, including how much data was discarded in seeks, aborted TCP connections &c. Helps you work out what's going on.

Do not try to drop the Hadoop 2.8+ hadoop-aws JAR on the classpath along with the rest of the hadoop-2.7 JAR set and expect to see any speedup. All you will see are stack traces. You need to update all the hadoop JARs and their transitive dependencies.

Browse Categories