I am trying to use Scala XML library in Flink to parse an XML and I am not able to make it work. Please note that I need to use both the serialized and unserialized (string) version on my code in the same processing function.
I tried already different solutions, they always work in IntelliJ but not when I run them on a Flink cluster. They always return different java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser; I tried multiple things but I still get error similar to this one.
This is an example of what my Flink Job looks like:
object StreamingJob {
import org.apache.flink.streaming.api.scala._
val l = List(
"""<ciao>ciao</ciao>""",
)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
env.setParallelism(10)
val stream = env.fromCollection(l)
stream
.uid("process")
.map(new Processor)
.print
env.execute("Flink-TEST")
}
}
This is an example of my processing function:
import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader
class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
xml.toString
}
}
object Processor {
val factory: SAXParserFactory = SAXParserFactory.newInstance
val SAXParser: SAXParser = factory.newSAXParser
val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}
and finally this is my pom.xml, using the maven-shade plugin to make the jar I pass to flink:
<!-- other sections of the pom are excluded -->
<properties>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
</properties>
<!-- other sections of the pom are excluded -->
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api-scala_2.12</artifactId>
<version>11.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_2.12</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<!-- other sections of the pom are excluded -->
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- other sections of the pom are excluded -->
I believe the issue is somehow related to the implementation that is going to be used for the SAXParser that Flink is using at runtime. I also tried using the @transient annotation to prevent persisting the fields from Flink but without success.
However I am quite confusion on what exactly is happening, anybody knows how to prevent the error and what went wrong?
After a while I figure out what was wrong with it.
Scala XML docs says:
Which in Maven translates into:
Well it appears that this dependency is not needed as even though Flink 1.7.2 appears to use Scala 2.12.8 it still retains Scala XML inside his distribution (hence in the classpath), I believe this can cause problems into which class is actually loaded and if correctly, however that might not be the solution to the linkage error.
The solution to the linkage error is actually by using Flink's own
RichMapFunction[InputT, OutputT]:As the JavaDoc says:
Unfortunately the usage of
varis preferred in this case as the initialisation of values/variables needs to be handled by Flink preventing the linkage error at runtime.Some notes:
DataStream[T]and not onDataSet[T].