Recently, while trying to debug an error during my on-call rotation, I ran into a situation where a foreachPartition was causing a Serialization results bigger than spark.driver.maxResultSize. The error is caused by too much data being pulled back to the driver and is normally associated with a collect function. But the stacktrace pointed to the foreachPartition in our code base, so I struggled to fix the issue. The documentation says it’s a Unit function, so nothing should be returned back to the driver. But, when I dug into the Spark source code I found the issue.

Spark Source Code

Diving into the Spark source code, I was eventually lead to a SparkContext runJob function, which clearly shows where the results are accumulated. This function, while not the final runJob function, nor the function that submits the job, it does clearly show how the results are accumulated.

But wait, if the function used in the foreachPartition call returns a Unit then how does this cause a “bigger than spark.driver.maxResultSize” error? The answer is that Scala’s Unit is still an object, which takes up memory. So, given enough partitions, you’ll eventually consume enough memory to cause an error.

Reproducing the issue

I was able to easily reproduce the issue in below (and also in this gist).

import org.apache.spark.sql.{Row, SparkSession}

object TestApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Test")
      .master("local[*]")
      .config("spark.driver.maxResultSize", "1b")
      .getOrCreate()

    import spark.implicits._

    val testDF = Range(0, 1000)
      .toDF("id")
      .repartition(999)

    testDF.foreachPartition{ i: Iterator[Row] => println()}
  }
}

By setting the spark.driver.maxResultSize = 1b even count fails.

Of course setting spark.driver.maxResultSize = 1b is ridiculous, but it makes the behavior easy to reproduce.

Conclusion

Even when using a DataFrame Unit function like foreachPartition or foreach it is still possible to get a “bigger than spark.driver.maxResultSize” error. I’m not sure if this behavior is expected behavior or if it’s actually a bug. And after searching Jira, it doesn’t seem like anyone has logged a ticket about this behavior.