+5 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)
I would like to know if the foreachPartitions will results in better performance, due to an higher level of parallelism, compared to the foreach method considering the case in which I'm flowing through an  RDD in order to perform some sums into an accumulator variable.

3 Answers

+3 votes
by (31.4k points)

The foreach action in Spark is designed like a forced map (so the "map" action occurs on the executors). Foreach is useful for a couple of operations in Spark. They are required to be used when you want to guarantee an accumulator's value to be correct. In addition, they can be used when you want to move data to an external system, like a database, though typically a foreachPartition is used for that operation

Now, talking about foreachpartition(), it is similar to foreach() , but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient than foreach() because it reduces the number of function calls (just like mapPartitions() ).

Foreach() example:

scala> val accum = sc.longAccumulator("My Accumulator")

accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

...

 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value

res2: Long = 10

Foreachpartition() example:

Usage of foreachpartition where one database connection is used.

 def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")

    dataFrame.foreachPartition { partition =>

      // Note : Each partition one connection (more better way is to use connection pools)

      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)

      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql

      partition.grouped(1000).foreach {

        group =>

          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()

          group.foreach {

            record => insertString.append("('" + record.mkString(",") + "'),")

          }

          sqlExecutorConnection.createStatement()

            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "

              + insertString.stripSuffix(","))

      }

      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.

    }

  }

Accumulator sample snippet through which you can test the performence:

     test("Foreach - Spark") {

        import spark.implicits._

        var accum = sc.longAccumulator

        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))

        assert(accum.value == 6L)

      }

      test("Foreach partition - Spark") {

        import spark.implicits._

        var accum = sc.longAccumulator

        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))

        assert(accum.value == 6L)

      } 

Now, it will be easy for you to conclude that foreachpartition operates specifically on partitions so it is very obvious that it has an edge over foreach.

Note: One should use foreachPartition when accessing costly resources such as database connections or kafka producer etc., which would initialize one per partition rather than one per element(foreach).

by (17k points)
Thank you for the clear explanation.
by (33.2k points)
Thanks for this well-detailed answer!
by (40.4k points)
Well explained.
by (107k points)
Nice explanation
+3 votes
by (44.6k points)

foreach auto run the loop on many nodes.

However, sometimes you want to do some operations on each node. For example, make a connection to the database. You can not just make a connection and pass it into the foreach function: the connection is only made on one node.

So with foreachPartition, you'll be able to create a connection to the database on every node before running the loop.

by (19.8k points)
It worked for me, got the answer!
by (16k points)
This solved my problem.
Thanks!
by (29.8k points)
Nice solution ! works for me .. thankyou!!!
+2 votes
by (95.7k points)

There is not that much of a difference between foreach and foreachPartitions. Under the covers, all that foreach is doing is calling the iterator's foreach using the provided function. foreachPartition presently gives you the chance to do something outside of the looping of the iterator, usually something expensive like spinning up a database connection or something along those lines. So, if you don't have anything that could be done once for each node's iterator and reused throughout, then I would suggest using foreach for improved clarity and reduced complexity.

by (31.8k points)
Correct!
This should have sorted your question. @Arav
by (29.1k points)
well explained!
by (47.2k points)
With foreachPartition, you can make a connection to database on each node before running the loop.
Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...