I am using Flink and custom kryo class for my pojo class . But getting
Caused by: java.lang.NullPointerException
    at MyTreeSerializer.read(MyTreeSerializer.java:36)
    at MyTreeSerializer.read(MyTreeSerializer.java:11)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:414)
    ... 16 more
Here are details -
Kryo 2.24.0
My Pojo Class
```
public class MyTree extends TreeMap<String, Object> {
    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}
- Serializer For Pojo
 
```
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
public class MyTreeSerializer extends Serializer<MyTree> {
    public MyTreeSerializer() {
    }
    @Override
    public void write(Kryo kryo, Output output, MyTree object) {
        output.writeString(object.getId());
        kryo.writeObject(output, object, new MapSerializer());
    }
    @Override
    public MyTree read(Kryo kryo, Input input, Class<MyTree> type) {
        String id = input.readString();
        System.out.println("Serialized Id " + id);
        MyTree myTree = kryo.readObject(input, type, new MapSerializer());
        System.out.println("Serialized Object " + myTree);
        myTree.setId(id);
        return myTree;
    }
}
```
- Flink Streaming Main Program
 
```
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class MultiSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // Setting Serializer
        env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);
        DataStreamSource<String> data = env.fromElements("1", "2");
        DataStream<MyTree> returns = data.map(new MapFunction<String, MyTree>() {
            @Override
            public MyTree map(String s) throws Exception {
                MyTree myTree = new MyTree();
                myTree.setId(s);
                myTree.put("name", "sohi");
                return myTree;
            }
        }).returns(MyTree.class);
        returns.addSink(new SinkFunction<MyTree>() {
            @Override
            public void invoke(MyTree myTree) throws Exception {
                System.out.println("==> " + myTree.toString());
            }
        });
        env.execute();
    }
}
By using all code mentioned only id is getting serialized not the map part of MyTree .
But If I replace
env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);
with
env.getConfig().addDefaultKryoSerializer(MyTree.class, MapSerializer.class);
then id is not serialized but map is getting serialized .
Just need help why it is not working when using MyTreeSerializer.class.
Thanks in advance .
                        
The following line in
MyTreeSerializerresults in anull:Thats also why
myTree.setId(id)results in aWhen you use
MapSerializer, it works fine (except the deserialization of id of course) becauseMyTreeextends from aTreeMapwhich implements aMap.In your implementation of
MyTreeSerializer, you are trying to deserialize a member of classMyTreefrom aMyTreeObject. It's like theMyTreeSerializerexpects an object like in the sample code below:public class MyTree extends TreeMap { private String id; private MyTree myTree; public String getId() { return id; } public void setId(String id) { this.id = id; } public MyTree getMyTree() { return myTree; } public void setMyTree(MyTree myTree) { this.myTree = myTree; } }It think you will need to look at the
MapSerializerand extend from it or use it as a base for your own implementation in order to serialize and deserialize MyTree objects.