0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

According to this

Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.

Spark supports push down of predicates to the data source. Is this feature also available / expected for JDBC?

(From inspecting the DB logs I can see it's not the default behavior right now - the full query is passed to the DB, even if it's later limited by spark filters)


Running Spark 1.5 with PostgreSQL 9.4

code snippet:

from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION

sc = SparkContext()
sqlContext = SQLContext(sc)

url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"

df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)

SQL Trace:

< 2015-09-15 07:11:37.718 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                     
< 2015-09-15 07:11:37.771 EDT >LOG:  execute <unnamed>: SELECT * FROM dummy WHERE 1=0                                                                                                                  
< 2015-09-15 07:11:37.830 EDT >LOG:  execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid
 AND a.attnum = vals.attnum)                                                                                                                                                                           
< 2015-09-15 07:11:40.936 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                     
< 2015-09-15 07:11:40.964 EDT >LOG:  execute <unnamed>: SELECT "id","name" FROM dummy

I would expect that the last select will include a limit 1 clause - but it doesn't

1 Answer

0 votes
by (31.4k points)

Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE clause. Moreover it seems to look as it is limited to the logical conjunction (no IN and OR I am afraid) and simple predicates.

While, every else like limits, counts, ordering, groups and conditions is processed on the Spark side. One warning that is already covered on SO, is that df.count() or sqlContext.sql("SELECT COUNT(*) FROM df") is translated to SELECT 1 FROM df and requires both substantial data transfer and processing using Spark.

Does it mean it is a lost cause? Not exactly. We can use an arbitrary subquery as a table argument. It is just less convenient as compared to a predicate pushdown but otherwise it works pretty well:

n = ... # Number of rows to take

sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))

df = sqlContext.read.jdbc(url=url, table=sql)

Visit given below links for more details:

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !