Loading and Saving your Data
Spark makes it very simple to load and save data in a large number of file formats. Formats range from unstructured, like text, to semi structured, like JSON, to structured, like SequenceFiles. The input formats that Spark wraps all transparently handle compressed formats based on the file extension.
Text Files: Text files are very simple to load from and save to with Spark. When we load a single text file as an RDD, each input line becomes an element in the RDD. We can also load multiple whole text files at the same time into a pair RDD, with the key being the name and the value being the contents of each file.
Loading text files
Loading a single text file is as simple as calling the textFile() function on our
SparkContext with the path to the file, as you can see in Example
input = sc.textFile("file:///home/holden/repos/spark/README.md")
Saving text files
The method saveAsTextFile(),takes a path and will output the contents of the RDD to that file. The path is treated as a directory and Spark will output multiple files underneath that directory. This allows Spark to write the output from multiple nodes. Example:
JSON is a popular semistructured data format. The simplest way to load JSON data is by loading the data as a text file and then mapping over the values with a JSON parser.
Loading the data as a text file and then parsing the JSON data is an approach that we can use in all of the supported languages. This works assuming that you have one JSON record per row; if you have multiline JSON files, you will instead have to load the whole file and then parse each file.
Writing out JSON files is much simpler compared to loading it, because we don’t have to worry about incorrectly formatted data and we know the type of the data that we are writing out. We can use the same libraries we used to convert our RDD of strings into parsed JSON data and instead take our RDD of structured data and convert it into an RDD of strings, which we can then write out using Spark’s text file API.
Comma-Separated Values and Tab-Separated Values
Comma-separated value (CSV) files are supposed to contain a fixed number of fields per line, and the fields are separated by a comma (or a tab in the case of tab-separated value, or TSV, files).
Loading CSV/TSV data is similar to loading JSON data in that we can first load it as text and then process it. The lack of standardization of format leads to different versions of the same library sometimes handling input in different ways.
As with JSON, there are many different CSV libraries, but we will use only one for each language.
As with JSON data, writing out CSV/TSV data is quite simple and we can benefit from reusing the output encoding object. Since in CSV we don’t output the field name with each record, to have a consistent output we need to create a mapping. One of the easy ways to do this is to just write a function that converts the fields to given positions in an array.
SequenceFiles are a popular Hadoop format composed of flat files with key/value pairs. SequenceFiles have sync markers that allow Spark to seek to a point in the file and then resynchronize with the record boundaries.
Spark has a specialized API for reading in SequenceFiles. On the SparkContext we can call sequenceFile(path, keyClass, valueClass, minPartitions).
SequenceFiles are key/value pairs, we need a PairRDD with types that our SequenceFile can write out. Implicit conversions between Scala types and Hadoop Writables exist for many native types, so if you are writing out a native type you can just save your PairRDD by calling saveAsSequenceFile(path), and it will write out the data for you.
If there isn’t an automatic conversion from our key and value to Writable, or we want to use variable-length types (e.g., VIntWritable), we can just map over the data and convert it before saving.
Object files are a deceptively simple wrapper around SequenceFiles that allows us to save our RDDs containing just values.
Saving an object file is as simple as calling saveAsObjectFile on an RDD. Reading an object file back is also quite simple: the function objectFile() on the SparkContext takes in a path and returns an RDD.
Hadoop Input and Output Formats
- Loading with other Hadoop input formats: To read in a file using the new Hadoop API we need to tell Spark a few things. The newAPIHadoopFile takes a path, and three classes. The first class is the “format” class, which is the class representing our input format. A similar function, hadoopFile(), exists for working with Hadoop input formats implemented with the older API. The next class is the class for our key, and the final class is the class of our value. If we need to specify additional Hadoop configuration properties, we can also pass in a conf object.
Frequently when working with Big Data, we find ourselves needing to use compressed data to save storage space and network overhead. With most Hadoop output formats, we can specify a compression codec that will compress the data.
Spark supports a large number of filesystems for reading and writing to, which we can use with any of the file formats we want.
- Local/“Regular” FS: While Spark supports loading files from the local filesystem, it requires that the files are available at the same path on all nodes in your cluster.
- Amazon S3: Amazon S3 is an increasingly popular option for storing large amounts of data. S3 is especially fast when your compute nodes are located inside of Amazon EC2, but can easily have much worse performance if you have to go over the public Internet.
- HDFS: The Hadoop Distributed File System (HDFS) is a popular distributed filesystem with which Spark works well. HDFS is designed to work on commodity hardware and be resilient to node failure while providing high data throughput.
Structured Data with Spark SQL
Spark SQL is a component added in Spark 1.0 that is quickly becoming Spark’s preferred way to work with structured and semistructured data. By structured data, we mean data that has a schema—that is, a consistent set of fields across data records.
- Apache Hive: One common structured data source on Hadoop is Apache Hive. Hive can store tables in a variety of formats, from plain text to column-oriented formats, inside HDFS or other storage systems. Spark SQL can load any table supported by Hive.
- JSON: If you have JSON data with a consistent schema across records, Spark SQL can infer their schema and load this data as rows as well, making it very simple to pull out the fields you need. To load JSON data, first create a HiveContext as when using Hive. Then use the HiveContext.jsonFile method to get an RDD of Row objects for the whole file.
Spark can access several popular databases using either their Hadoop connectors or custom Spark connectors.
Java Database Connectivity: Spark can load data from any relational database that supports Java Database Connectivity (JDBC), including MySQL, Postgres, and other systems. To access this data, we construct an org.apache.spark.rdd.JdbcRDD and provide it with our SparkContext and the other parameters.
Cassandra: Spark’s Cassandra support has improved greatly with the introduction of the open source Spark Cassandra connector from DataStax. Since the connector is not currently part of Spark, you will need to add some further dependencies to your build file. Cassandra doesn’t yet use Spark SQL, but it returns RDDs of CassandraRow objects, which have some of the same methods as Spark SQL’s Row object.
HBase: Spark can access HBase through its Hadoop input format, implemented in the
org.apache.hadoop.hbase.mapreduce.TableInputFormat class. This input format returns key/value pairs where the key is of type org.apache.hadoop.hbase.io.Immu tableBytesWritable and the value is of type org.apache.hadoop.hbase.client.Result. The Result class includes various methods for getting values based on their column family, as described in its API documentation.
Elasticsearch: Spark can both read and write data from Elasticsearch using Elasticsearch-Hadoop. Elasticsearch is a new open source, Lucene-based search system. The Elasticsearch connector is a bit different than the other connectors we have examined, since it ignores the path information we provide and instead depends on setting up configuration on our SparkContext.