Azure Schema Registry integration issue

104 views Asked by At

I'm developing a Springboot application that uses Spring-kafka to implement a Kafka Producer to sent messages to a Kafka cluster.

In the first iteration we've deployed the Spring application in a AKS cluster. The application connects to a "Kafka Cluster" based on Azure Event Hub.

This part worked well. We achieve the integration and we were able to send messages to a topic an consume the using a JSON format.

In the next iteration we want to introduce AVRO serializers and deserializers + a Schema Registry (with Azure Schema Registry). We did the changes in the code and we tested the changes successfuly with test containers. However, when we try to execute the application in the Azure environment is failing.

You have an example of the application here: https://github.com/alvNa/spring-kafka-demo

Here it is a chunk of the configuration:

spring:
  kafka:
    bootstrap-servers: <MY-NAMESPACE>.servicebus.windows.net:9093
    security:
      protocol: SASL_SSL
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      group-id: kafka-example-application
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<MY-NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<MY-KEYNAME>;SharedAccessKey=<MY-ACCESS-KEY>";
      schema.registry.url: https://<MY-NAMESPACE>.servicebus.windows.net
      schema.group: <MY-SCHEMA-GROUP>
      specific.avro.reader: true
      auto.register.schemas: false
      use.latest.version: true

I have a basic avro schema for the example:

{
  "namespace": "com.atradius.examples",
  "type": "record",
  "name": "Message",
  "version": "1",
  "fields": [
    {
      "name": "number",
      "type": "int"
    },
    {
      "name": "description",
      "type": "string"
    }
  ]
}

With the Avro Maven/Gradle plugin a Java Pojo is generated:

package com.atradius.examples;

import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@org.apache.avro.specific.AvroGenerated
public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = -90117537915021226L;
 ...
}

Using this model I create a Kafka Producer using the Spring Kafka Template.

@Service
@RequiredArgsConstructor
public class KafkaAvroMessageService {

  @Value("${example-app.kafka.producer.topic}")
  private String topicName;

  private final KafkaTemplate<String, Message> kafkaTemplate;

  public void sendEvent(final String key, final String body) {
    final Message message = new Message(key, body);

    kafkaTemplate.send(topicName, key, message);
  }
}

When I run the code and I try to send a message I have this error.

2024-02-08T17:19:18.807+01:00 ERROR 25180 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:750) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:674) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2790) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:908) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4912) ~[jackson-databind-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818) ~[jackson-databind-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3817) ~[jackson-databind-2.15.2.jar:2.15.2]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:927) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:918) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:496) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:216) ~[kafka-schema-serializer-7.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:200) ~[kafka-schema-serializer-7.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:133) ~[kafka-avro-serializer-7.3.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) ~[kafka-avro-serializer-7.3.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-7.3.1-ccs.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1002) ~[kafka-clients-7.3.1-ccs.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949) ~[kafka-clients-7.3.1-ccs.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1016) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:783) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:544) ~[spring-kafka-3.0.9.jar:3.0.9]
    at com.atradius.example.kafka.service.KafkaAvroMessageService.sendEvent(KafkaAvroMessageService.java:31) ~[main/:na]
    at com.atradius.example.kafka.api.KafkaAvroExampleController.postNewEvent(KafkaAvroExampleController.java:38) ~[main/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.11.jar:6.0]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.11.jar:6.0]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1740) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

After debuging it I realised the problem is the Azure Schema Registry is not returning what I'm expecting. The REST call to retrieve the schema it is currently returning an XML.

curl --location 'https://<MY-NAMESPACE>.servicebus.windows.net/subjects/<MY-TOPIC>/versions/latest' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json'

Like this

<feed xmlns="http://www.w3.org/2005/Atom">
    <title type="text">Publicly Listed Services</title>
    <subtitle type="text">This is the list of publicly-listed services currently available.</subtitle>
    <id>uuid:81b665e5-f924-4fdc-918e-346ccadd6fdb;id=90772</id>
    <updated>2024-02-08T16:20:47Z</updated>
    <generator>Service Bus 1.1</generator>
</feed>

Instead of returning a JSON schema like this.

{
  "namespace": "com.atradius.examples",
  "type": "record",
  "name": "Message",
  "version": "1",
  "fields": [
    {
      "name": "number",
      "type": "int"
    },
    {
      "name": "description",
      "type": "string"
    }
  ]
}

If someone have experiece in the integration with Azure Schema Registry. Do you know what could be the reason of this behaviour?

Do I need some kind of configuration in the Azure Schema Registry ?

Thanks.

Ref code: https://github.com/alvNa/spring-kafka-demo

0

There are 0 answers