I am trying to parse the message from kafka topic though kafka message listener. I am able to see the message through producer and in the kafka topic. But in consumer flow, the transform message is unable to parse the json properly.
Consumer flow
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:bigquery="http://www.mulesoft.org/schema/mule/bigquery" xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/bigquery http://www.mulesoft.org/schema/mule/bigquery/current/mule-bigquery.xsd">
<flow name="account-consumer-npFlow" doc:id="0d996208-7cb6-42b9-93f2-77ead77d1883" >
<kafka:message-listener doc:name="Message listener" doc:id="c302fcb3-539c-42d9-8d21-347da8328ecd" config-ref="EDH_Consumer_np_configuration">
<repeatable-in-memory-stream />
</kafka:message-listener>
<ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" doc:id="c4bc21b6-98d6-466c-a086-1fd5bf48343b" message="#[%dw 2.0 output application/json --- payload]" />
</flow>
</mule>
Error Log:
INFO 2024-02-06 10:24:43,087 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-producer-np.CPU_LITE @37e0e7a1] [processor: account-producer-np/processors/0/route/1/processors/0; event: 3bc1f370-c50c-11ee-942a-acde48001122] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {
"ChangeEventHeader": {
"commitNumber": 1707236682278494209,
"commitUser": "0057f000005zHkEAAU",
"sequenceNumber": 1,
"entityName": "Account",
"changeType": "DELETE",
"changedFields": [
],
"changeOrigin": "",
"transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
"commitTimestamp": 1707236682000,
"recordIds": [
"001O1000007mPzNIAU"
]
}
}
ERROR 2024-02-06 10:24:43,907 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-consumer-npFlow.CPU_INTENSIVE @3e250b1c] [processor: ; event: 3c5c37a1-c50c-11ee-942a-acde48001122] org.mule.runtime.core.privileged.exception.DefaultExceptionListener:
********************************************************************************
Message : "You called the function 'Value Selector' with these arguments:
1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
2: Name ("ChangeEventHeader")
But it expects one of these combinations:
(Array, Name)
(Array, String)
(Date, Name)
(DateTime, Name)
(LocalDateTime, Name)
(LocalTime, Name)
(Object, Name)
(Object, String)
(Period, Name)
(Time, Name)
5| out : payload.ChangeEventHeader.changeType
^^^^^^^^^^^^^^^^^^^^^^^^^
Trace:
at anonymous::main (line: 5, column: 8)" evaluating expression: "%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}".
Element : account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message)
Element DSL : <ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590">
<ee:message>
<ee:set-payload><![CDATA[
%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}
]]></ee:set-payload>
</ee:message>
</ee:transform>
Error type : MULE:EXPRESSION
FlowStack : at account-consumer-npFlow(account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message))
(set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
I am able to parse the payload message just fine in dataWeave playground.
{
"ChangeEventHeader": {
"commitNumber": 1707236682278494209,
"commitUser": "0057f000005zHkEAAU",
"sequenceNumber": 1,
"entityName": "Account",
"changeType": "DELETE",
"changedFields": [
],
"changeOrigin": "",
"transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
"commitTimestamp": 1707236682000,
"recordIds": [
"001O1000007mPzNIAU"
]
}
}
with script
%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}
returns as expected.
{
"out": "DELETE"
}
Not sure why transform message is unable to parse it.
The error is telling you that the DataWeave script is receiving a binary as an input, not a JSON value.
Since it is a binary it doesn't has keys. The expression
payload.ChangeEventHeaderfails because trying to use the single-value selector to get the value of key from a binary is not valid.You have to check why your Kafka message is a binary.