Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I want to use an accumulator to gather some stats about the data I'm manipulating on a Spark job. Ideally, I would do that while the job computes the required transformations, but since Spark would re-compute tasks on different cases the accumulators would not reflect true metrics. Here is how the documentation describes this:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

This is confusing since most actions do not allow running custom code (where accumulators can be used), they mostly take the results from previous transformations (lazily). The documentation also shows this:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.

But if we add data.count() at the end, would this be guaranteed to be correct (have no duplicates) or not?

1 Answer

0 votes
by (32.3k points)

Accumulators are variables that are used for aggregating information across the executors.

They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages (NOTE − this is not yet supported in Python).

An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

The code given below shows an accumulator being used to add up the elements of an array −

scala> val accum = sc.accumulator(0) 

 

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

scala> accum.value 

Output

res2: Int = 10 

Answering your question "When are accumulators truly reliable ?"

Accumulators re truel reliable when they are present in an Action operation.

In an Action Task, even if any restarted tasks are present it will update Accumulator only once.

For accumulator updates are performed inside actions only, Spark guarantees that each task’s update will be applied to the accumulator only once.

Accumulator updates are sent back to the driver when a task is successfully completed. So your accumulator results are guaranteed to be correct when you are certain that each task will have been executed exactly once and each task did as you expected.

Browse Categories

...