Before, everything works well with flink 1.13.1, lately we update it to flink 1.14.2, the following code is run: and it throws this exception:
<T> DataStream<Tuple3<String, String, T>> returnsInternal(SiddhiOperatorContext siddhiContext, String[] executionPlanIds) {
if (createdDataStream == null) {
DataStream<Tuple2<StreamRoute, Object>> mapped = this.dataStream.map(new MapFunction<Tuple2<StreamRoute, Object>, Tuple2<StreamRoute, Object>>() {
@Override
public Tuple2<StreamRoute, Object> map(Tuple2<StreamRoute, Object> value) throws Exception {
if (executionPlanIds != null && executionPlanIds.length != 0) {
for (String executionPlanId : executionPlanIds) {
if (!executionPlanId.isEmpty()
&& siddhiContext.getExecutionPlan(executionPlanId).IsUsedStream(value.f0.getInputStreamId())) {
value.f0.addExecutionPlanId(executionPlanId);
}
}
}
return value;
}
});
createdDataStream = SiddhiStreamFactory.createDataStream(siddhiContext, mapped);
}
return createdDataStream;
}
The exception and callstack are as follows:
org.apache.flink.api.common.InvalidProgramException: The implementation of the BlockElement is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2139) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:577) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStreamBase.returnsInternal(ExecutionSiddhiStreamBase.java:135) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStreamBase.returnsInternal(ExecutionSiddhiStreamBase.java:123) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStream.returnAsRow(ExecutionSiddhiStream.java:180) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStream.returnAsRowWithQueryId(ExecutionSiddhiStream.java:165) at org.apache.flink.streaming.siddhi.SiddhiCEPITCase.testSimplePojoStreamAndReturnRowWithQueryId(SiddhiCEPITCase.java:245) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.io.NotSerializableException: org.apache.flink.configuration.description.TextElement at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 45 more
So, why is there a problem and what's the difference between 1.13.1&1.14.0,how can we fix this problem?
Thank you,David Anderson. This should be a bug introduced by the latest flink commit of file flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Diff the file, we can find TextElement is used here where ClosureCleanerLevel is is used as a memeber of Serializable ExecutionConfig.
TextElement in ClosureCleanerLevel
In Flink Siddhi, ExecutionConfig is serialized which is used to serilize flink data to siddhi type on every taskmanager, so that should be the cause.
The simplest way to verify the problem is running code as followings in flink 1.13.5 and 1.14.0, the exception is reproduced in 1.14.0 . And the diff between 1.13.5 and 1.14.0 is only lates commit.