This question is a follow-up to State management not serializable.
I want to encapsulate state management logic.
The following represents where I am at right now:
class StateManager(
  stream: DStream[(String, String)],
  updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
  lazy val myState = stream.mapWithState(stateSpec).map(_.get)
  lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
  def apply(
    _dStream: DStream[(String, String)],
    _updateState: (String, Option[String], State[String]) => Option[(String, String)]
  ) =
    new StateManager(dStream, updateState)
}
This works fine, but only allows DStream[(String,String)] to be handled, which is a first step towards generic state management, fit to welcome any DStream: from DStream[(Int,String)] to DStream[(String,myCustomClass)].
myState requires to be a value function in order to work (serialization).
But I face a problem as type parameters don't apply to function objects in scala.
user6910411 gave me a hint by using ClassTags with an enclosing method (Type-parameterize a DStream), but in turn it'd still be a method.
Would anyone have some intel on how to overcome those difficulties?
The context:
Spark 1.6
Spark Graph:
object Consumer_Orchestrator {
    def main(args: Array[String]) = {
        //setup configurations
        val streamingContext = StreamingEnvironment(/*configurations*/)
        val kafkaStream = streamingContext.stream()
        val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
        val initialState = emptyRDD
        val stateManager = StateManager(kafkaStream, updateState)
        val state: DStream[(String, String)] = stateManager.myState
        state.foreachRDD(_.foreach(println))
        myStreamingContext.start()
        myStreamingContext.awaitTermination()
    }
}
The StreamingEnvironment class to create the Streaming:
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
    val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
    lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
    mStreamingContext.checkpoint(/*directory checkpoint*/)
    mStreamingContext.remember(Minutes(1))
    def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
    def stop() = sparkContext.stop()
}
object StreamingEnvironment {
    def apply(kafkaConf: KafkaConf) = {
    val sparkConf = new SparkConf
    new StreamingEnvironment(sparkConf, kafkaConf)
    }
}
				
                        
Here you are:
App.scala:StateManage.scala:build.sbt:Directory structure:
Example execution:
As you can see there is no magic here. If you introduce generic arguments you need
ClassTagsin the same context.