Following are the two scenarios covered in this answer:
Common Properties :
To connect any database connection we require basically the common properties such as database driver , db url , username and password. Hence in order to connect using pyspark code also requires the same set of properties.
db_properties :
driver — the class name of the JDBC driver to connect the specified url
url — the JDBC url to connect the database.
In the Spark documentation on JDBC connection, the explanation of all the properties are given in detail . Example of the db properties file would be something like shown below:
[postgresql]
url = jdbc:postgresql://localhost:5432/<db_name>
Database = <database_name>
schema = <schema>
username= <username>
password = <pwd>
driver=org.postgresql.Driver
Save Dataframe to DB Table:-
Spark class `class pyspark.sql.DataFrameWriter` provides the interface method to perform the jdbc specific operations. The method jdbc takes arguments and saves the dataframe object contents to the specified external table.
For example, the sample code to save the dataframe ,where we read the properties from a configuration file and load the values to dict and pass the python dict to the method.
Common code to read Database properties from a configuration file.
#Create the Database properties
db_properties={}
config = configparser.ConfigParser()
config.read(
"db_properties.ini")
db_prop = config['postgres']
db_url = db_prop['url']
db_properties['username']=db_prop['username']
db_properties['password']=db_prop['properties']
db_properties['url']=
db_properties['driver']=db_prop['driver']
#Save the dataframe to the table.
df.write.jdbc(url=db_url,table='testdb.employee',mode='overwrite',properties=db_properties)
Load Table Contents to Spark Dataframe:-
Spark class `class pyspark.sql.DataFrameReader` provides the interface method to perform the jdbc specific operations. The method jdbc takes the following arguments and loads a specified input table to the spark dataframe object.
For example, the sample code to load the contents of a table to the spark dataframe object, where we read the properties from a configuration file. And load the values to dict and pass the python dict to the method.
df = spark.read.jdbc(url=url,table='testdb.employee',properties=db_properties)