bing
Flat 10% & upto 50% off + Free additional Courses. Hurry up!

Programming with RDDs

 

A Resilient Distributed Dataset (RDD) in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

RDDs can be created in two ways: by loading an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program.

Every Spark program and shell session will work as follows:

  • Create some input RDDs from external data.
  • Transform them to define new RDDs using transformations like filter().
  • Ask Spark to persist() any intermediate RDDs that will need to be reused.
  • Launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by Spark.

 

Creating RDDs

The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method, as shown in Examples:

parallelize() method in Python

lines = sc.parallelize(["pandas", "i like pandas"])

 

parallelize() method in Scala

val lines = sc.parallelize(List("pandas", "i like pandas"))

 

parallelize() method in Java

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

 

RDD Operations

RDD performs two types of operations: transformations and actions.

 

Transformations 

It returns a new RDD.

As an example, suppose that we have a logfile, log.txt, with a number of messages, and we want to select only the error messages. We can use the filter() transformation. We’ll show a filter in all three of Spark’s language APIs

filter() transformation in Python

inputRDD = sc.textFile("log.txt")

errorsRDD = inputRDD.filter(lambda x: "error" in x)

 

filter() transformation in Scala

val inputRDD = sc.textFile("log.txt")

val errorsRDD = inputRDD.filter(line => line.contains("error"))

 

filter() transformation in Java

JavaRDD<String> inputRDD = sc.textFile("log.txt");

JavaRDD<String> errorsRDD = inputRDD.filter(

new Function<String, Boolean>() {

public Boolean call(String x) { return x.contains("error"); }

}

});

 

filter() operation does not mutate the existing input RDD. Instead, it returns a pointer to an entirely new RDD.

 

Actions 

They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.

 

Python error count using actions

print "Input had " + badLinesRDD.count() + " concerning lines"

print "Here are 10 examples:"

for line in badLinesRDD.take(10):

print line

 

Scala error count using actions

println("Input had " + badLinesRDD.count() + " concerning lines")

println("Here are 10 examples:")

badLinesRDD.take(10).foreach(println)

 

Java error count using actions

System.out.println("Input had " + badLinesRDD.count() + " concerning lines")

System.out.println("Here are 10 examples:")

for (String line: badLinesRDD.take(10)) {

System.out.println(line);

}

 

In this example, we used take() to retrieve a small number of elements in the RDD at the driver program. We then iterate over them locally to print out information at the driver. RDDs also have a collect() function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

It is important to note that each time we call a new action, the entire RDD must be computed “from scratch.” To avoid this inefficiency, users can persist intermediate results.

 

Lazy Evaluation

Lazy evaluation means that when we call a transformation on an RDD (for instance, calling map()), the operation is not immediately performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call sc.textFile(), the data is not loaded until it is necessary. As with transformations, the operation (in this case, reading the data) can occur multiple times.

 

Passing Functions to Spark

Python: In Python, we have three options for passing functions into Spark. Firstly, we can pass in lambda expressions. Secondly, pass a function that is the member of an object, or contains references to fields in an object. Thirdly, just extract the fields you need from your object into a local variable and pass that in.

Scala: In Scala, we can pass in functions defined inline, references to methods, or static functions as we do for Scala’s other functional APIs.

Java: In Java, functions are specified as objects that implement one of Spark’s function interfaces from the org.apache.spark.api.java.function package.

 

Common Transformations and Actions

The two most common transformations you will likely be using are map() and filter(). The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. The filter() transformation takes in a function and returns an RDD that only has elements that pass the filter() function.

mapped and filtered rdd from an input rdd

 

Basic example of map() that squares all of the numbers in an RDD

 

Python squaring the values in an RDD

nums = sc.parallelize([1, 2, 3, 4])

squared = nums.map(lambda x: x * x).collect()

for num in squared:

print "%i " % (num)

 

Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap().

A simple usage of flatMap() is splitting up an input string into words, as shown in Example:

 

flatMap() in Python, splitting lines into words

lines = sc.parallelize(["hello world", "hi"])

words = lines.flatMap(lambda line: line.split(" "))

words.first() # returns "hello"

 

Pseudo set operations

 

simple set operations

 

Cartesian product between two RDDs

 

cartesian product between two rdds

 

Actions

The most common action on basic RDDs you will likely use is reduce(), which takes a function that operates on two elements of the type in your RDD and returns a new element of the same type. Similar to reduce() is fold(), which also takes a function with the same signature as needed for reduce(), but in addition takes a “zero value” to be used for the initial call on each partition.

Both fold() and reduce() require that the return type of our result be the same type as that of the elements in the RDD we are operating over. This works well for operations like sum, but sometimes we want to return a different type.

"0 Responses on Programming with RDDs"

Training in Cities

Bangalore, Hyderabad, Chennai, Delhi, Kolkata, UK, London, Chicago, San Francisco, Dallas, Washington, New York, Orlando, Boston

100% Secure Payments. All major credit & debit cards accepted Or Pay by Paypal.

top

Sales Offer

  • To avail this offer, enroll before 10th December 2016.
  • This offer cannot be combined with any other offer.
  • This offer is valid on selected courses only.
  • Please use coupon codes mentioned below to avail the offer
offer-june

Sign Up or Login to view the Free Programming with RDDs.