Flink AvroDeserilized Records defaulting to GenericType

94 views Asked by At

I am trying to read data from a kafka topic which has an avro schema. I have generated POJO from my avro schema which i am using it to deserialize it. At least that what i understood from reading Stream Processing with Apache Flink from Fabian Hueske :"POJOs, including classes generated by Apache Avro" can be used.

What i see in the output is that some fields default to generic records. Usually it happens for DateTime fields Below the error:

13:50:30,736 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field Class#name will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

I don't understand why is it falling to Generic Type where i would like POJO to be used.

What am i doing wrong ? Or have i understood it wrong ?

I also read the documentation about Serializers and TypeInformation but still im not able to make sense why my approach is not working.

https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#performance-comparison

Below is my code

    KafkaSource<MyClas> kafkaSource = KafkaSource.<MyClas>builder()
            .setTopics(topic66)
            .setGroupId("flink-group")
            .setBootstrapServers("localhost:9092")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setDeserializer(new MyClasDeserializer(MyClas.class,SCHEMA_REGISTRY_URL))
            .build();

    DataStream<MyClas> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"reading Data")
            .returns(TypeInformation.of(MyClas.class));

And code for deserializing:

public class MyClassDeserializer implements KafkaRecordDeserializationSchema<MyClass> {

    private final TypeInformation<MyClass> typeInformation;

    private final DeserializationSchema<MyClass> deserializationSchemaValue;

    public MyClassDeserializer(final Class<MyClass> trackClass,final String schemaRegistryUrl) {
        this.typeInformation =  TypeInformation.of(trackClass);
        this.deserializationSchemaValue = ConfluentRegistryAvroDeserializationSchema.forSpecific(trackClass,schemaRegistryUrl);
    }

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<MyClass> collector) throws IOException {
        try {
            collector.collect(deserializationSchemaValue.deserialize(consumerRecord.value()));
        }
         catch (IOException e ) {
             System.out.println(" deserializng");

         }
    }

    @Override
    public TypeInformation<MyClass> getProducedType() {
        return typeInformation;
    }

How can i deserialize using POJO or Avro.

1

There are 1 answers

2
kkrugler On

Even for what looks like a valid POJO, Flink will fall back to Kryo if any field in the POJO can't be serialized as a POJO. I believe this is the case for many of the date-time classes.

You could force Flink to use the Avro serializer via env.getConfig().enableForceAvro(). Or you could implement a custom serializer for your type.