Spark with Scala: Task not serializable due to sparkContext

112 views Asked by At

Hi first time posting out of desperation ^^U, I'm trying to make this work. The idea is: from a Dataframe with one column representing a list of ids I want to return a new Dataframe with a new column representing a list of the measures inside this ids for past records. Im getting "task not serializable error" and I think it points to the SparkContext instance as seen in the log:

- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
    - field (class: Myclass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class Myclass$$anonfun$6, <function1>)

I guess there is something inside the map function that can't be there, Since it is pointing to the SparkContext, I am now explicitly using SparkContext as parameter in both myMethod and myDaoMethod All my classes implement Serializable.

Any help welcomed. Thank you.

def myMethod(df: DataFrame, factory: myFactory, sc: SparkContext)
                       (implicit sqlContext: SQLContext)   : DataFrame = {

  import org.apache.spark.sql.Row
  import org.apache.spark.sql.types._

  // function to return date n weeks ago
  val getDateNWeeksAgo: (String, Int) => String = (date: String, n: Int) => LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE).minusWeeks(n).toString 

  val myNewDF= df.rdd.map(r=> {

    val name = r.getAs[String]("name")
    val ym: String = r.getAs[String]("ym")
    val dd: String = r.getAs[String]("dd")
    val ymd: String = r.getAs[String]("ymd")
    val mag = r.getAs[String]("mag")
    val listId = r.getAs[String]("list_id") //  list  --> String [1, 5, 24]
    val listSplit = listId.substring(1, listId.length - 1).split(",") //  Array[1, 5, 24] 

    val listValues = new util.ArrayList[String]() // List to store the

    for (id <- 0 until listSplit.length) {  // loop through the array of ids
      var value = 0d
      val meas1wAgo = findValueById(myDao.MyDaoMethod(name, getDateNWeeksAgo(ymd, 1), mag)(sqlContext, sc), listSplit(id))
      /* more code regarding algorithm with more measures n weeks ago*/
      value = meas1wAgo.toDouble
      listValues.add(value.toString)
    }

    Row(name, ym, dd, mag, listId, listValues)
  })

  // Define the schema for the resulting DataFrame
  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("meas_ym", StringType, nullable = false),
    StructField("meas_dd", StringType, nullable = false),
    StructField("mag", StringType, nullable = false),
    StructField("list_id", StringType, nullable = false),
    StructField("listValues", DataTypes.createArrayType(DataTypes.StringType), nullable = false)
  ))

  // Create a DataFrame from the RDD[Row] with the specified schema
  val DFwithValues= sqlContext.createDataFrame(myNewDF, schema)

  DFwithValues
}

MyDaoMethod is defined outside the greater method and correctly queries the DB and returns a Dataframe with measures of the desired date given a name, date and mag.

findValueById is defined outside and correctly returns a the measure in the form of String given a Dataframe and the id of the measure.

The stackTrace I'm getting is the following:

diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:415)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:405)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2353)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.map(RDD.scala:392)
    at /* user comment: Map Line -> /*scala:307)
    [...]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:675)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
    - field (class: MyClass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class Myclass$$anonfun$6, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:412)
    ... 25 more
1

There are 1 answers

0
Chris On

In addition to mazaneicha's point if you are using Dataset in findValueById use a join with your ID dataframe joining on the id and let Spark manage the join for you. i.e. open the ID dataframe once, then join on it, don't do it inside another object requiring SparkContext

You cannot use SparkContext's inside of spark operations, they only exist on the driver node.