Kafka Connect - Schema Registry - Unknown magic byte error

47 views Asked by At

I’m quite new to Kafka and after successfully managing some HTTP connectors without schema (“value.converter.schemas.enable”: “false”), I’m struggling to use schemas.

I’ve added the schema (“schemaType”: “JSON”) of my JSON to the schema registry and then modified the configuration of my connector so that it uses it.

"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "https://kafka-schemaregistry:8081/",

The problem is, I get the (in)famous “Unknown magic byte!”

ERROR Error encountered in task test-withschema-0. Executing stage ‘VALUE_CONVERTER’ with class ‘io.confluent.connect.json.JsonSchemaConverter’, where consumed record is {topic=‘mytopic’, partition=0, offset=34, timestamp=1708945734273, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
    org.apache.kafka.connect.errors.DataException: Converting byte to Kafka Connect data failed due to serialization error of topic visitor:
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:236)

    Caused by: org.apache.kafka.common.errors.SerializationException: **Unknown magic byte**!

I’ve read plenty of posts (including the great How to Fix Unknown Magic Byte Errors in Apache Kafka ) but still struggling to fix it.

I’d have a couple of questions:

I manipulate the JSON in my Kafka topic in order to match what I expect at the other end of the connector via some transformations. The schema should validate the ***resulting ***JSON (after the transformations), is that correct? Or should validate the ***initial ***JSON (before the transformations)? I currently have only schema in my schema registry, but do I need to somehow tell me connector which schema it should use? If so, how do I do that?

Thanks for any help which can help me better troubleshoot my issue,

Here you can find the schema as I defined it on the schema registry:

curl -https://kafka-schemaregistry:8081/schemas/ | jq

[
  {
    "subject": "visitors",
    "version": 1,
    "id": 2,
    "schemaType": "JSON",
    "schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Visitor",\"type\":\"object\",\"properties\":{\"BADGE_PRINT_ENTITY\":{\"type\":\"string\"},\"VISITOR_REGISTRATION_STATUS\":{\"type\":\"string\"},\"VISITOR_PERSON_ID\":{\"type\":\"string\"},\"VISIT_BUILDING\":{\"type\":\"string\"},\"VISITOR_DOC_EXPIRY_DATE\":{\"type\":\"string\"},\"VISITOR_EMAIL\":{\"type\":\"string\"},\"VISIT_SITE\":{\"type\":\"string\"},\"HOST_FIRST_NAME\":{\"type\":\"string\"},\"VISIT_NAME\":{\"type\":\"string\"},\"VISITOR_DOC_NUMBER\":{\"type\":\"string\"},\"VISITOR_APPROVAL_STATUS\":{\"type\":\"string\"},\"VISITOR_DOCUMENT_TYPE\":{\"type\":\"string\"},\"VISITOR_LAST_NAME\":{\"type\":\"string\"},\"VISITOR_NATIONALITY\":{\"type\":\"string\"},\"HOST_ID\":{\"type\":\"string\"},\"VISITOR_DATE_OF_BIRTH\":{\"type\":\"string\"},\"PERSONAL_NUMBER\":{\"type\":\"string\"},\"VISIT_TYPE\":{\"type\":\"string\"},\"BADGE_NUMBER\":{\"type\":\"string\"},\"VISITOR_BADGE_PRINT_STATUS\":{\"type\":\"string\"},\"VISITOR_SOCIAL_NAME\":{\"type\":\"string\"},\"HOST_LAST_NAME\":{\"type\":\"string\"},\"VISITOR_ORGANISATION\":{\"type\":\"string\"},\"VISIT_ID\":{\"type\":\"string\"},\"VIST_END_DATE\":{\"type\":\"string\"},\"VISITOR_FIRST_NAME\":{\"type\":\"string\"},\"HOST_GROUP\":{\"type\":\"string\"},\"BADGE_PRINT_DATE_AND_TIME\":{\"type\":\"string\"},\"HOST_EMAIL\":{\"type\":\"string\"},\"VISIT_ROOM\":{\"type\":\"string\"},\"VISIT_START_DATE\":{\"type\":\"string\"}},\"required\":[\"BADGE_PRINT_ENTITY\",\"VISITOR_REGISTRATION_STATUS\",\"VISITOR_PERSON_ID\",\"VISIT_BUILDING\",\"VISITOR_DOC_EXPIRY_DATE\",\"VISITOR_EMAIL\",\"VISIT_SITE\",\"HOST_FIRST_NAME\",\"VISIT_NAME\",\"VISITOR_DOC_NUMBER\",\"VISITOR_APPROVAL_STATUS\",\"VISITOR_DOCUMENT_TYPE\",\"VISITOR_LAST_NAME\",\"VISITOR_NATIONALITY\",\"HOST_ID\",\"VISITOR_DATE_OF_BIRTH\",\"PERSONAL_NUMBER\",\"VISIT_TYPE\",\"BADGE_NUMBER\",\"VISITOR_BADGE_PRINT_STATUS\",\"VISITOR_SOCIAL_NAME\",\"HOST_LAST_NAME\",\"VISITOR_ORGANISATION\",\"VISIT_ID\",\"VIST_END_DATE\",\"VISITOR_FIRST_NAME\",\"HOST_GROUP\",\"BADGE_PRINT_DATE_AND_TIME\",\"HOST_EMAIL\",\"VISIT_ROOM\",\"VISIT_START_DATE\"]}"
  }
]

This is how I tell the connector to use a schema:

"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "https://kafka-schemaregistry:8081/",

Any help would be appreciated :)

0

There are 0 answers