The foreach action in Spark is designed like a forced map (so the "map" action occurs on the executors). Foreach is useful for a couple of operations in Spark. They are required to be used when you want to guarantee an accumulator's value to be correct. In addition, they can be used when you want to move data to an external system, like a database, though typically a foreachPartition is used for that operation
Now, talking about foreachpartition(), it is similar to foreach() , but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient than foreach() because it reduces the number of function calls (just like mapPartitions() ).
Foreach() example:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
Foreachpartition() example:
Usage of foreachpartition where one database connection is used.
def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
//numPartitions = number of simultaneous DB connections you can planning to give
datframe.repartition(numofpartitionsyouwant)
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition { partition =>
// Note : Each partition one connection (more better way is to use connection pools)
val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
//Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
partition.grouped(1000).foreach {
group =>
val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
group.foreach {
record => insertString.append("('" + record.mkString(",") + "'),")
}
sqlExecutorConnection.createStatement()
.executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
+ insertString.stripSuffix(","))
}
sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
}
}
Accumulator sample snippet through which you can test the performence:
test("Foreach - Spark") {
import spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
assert(accum.value == 6L)
}
test("Foreach partition - Spark") {
import spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
assert(accum.value == 6L)
}
Now, it will be easy for you to conclude that foreachpartition operates specifically on partitions so it is very obvious that it has an edge over foreach.
Note: One should use foreachPartition when accessing costly resources such as database connections or kafka producer etc., which would initialize one per partition rather than one per element(foreach).