java.lang.NoClassDefFoundError: Could not initialize class kafka.utils.Json$

20 views Asked by At

I'm trying to use Spark Streaming with Kafka in a Java application, and I'm encountering issues when using spark-submit with Spark 2.11.0. I've set up my dependencies in Maven as follows:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>

I've also created a SparkKafkaMain class that reads from Kafka and performs word count, and it runs without errors when I run it locally. However, when I try to submit it using spark-submit, I encounter issues.

Here's the SparkKafkaMain class:

public class SparkKafkaMain {
    private static final Pattern SPACE = Pattern.compile(" ");
    private SparkKafkaMain() {
    }
    public static void main(String[] args) throws Exception {
        if (args.length < 4) {
            System.err.println("Usage: SparkKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
            System.exit(1);
        }
        SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                new Duration(2000));
        int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<>();
        String[] topics = args[2].split(",");
        for (String topic: topics) {
            topicMap.put(topic, numThreads);
        }
        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
        JavaDStream<String> lines = messages.map(Tuple2::_2);
        JavaDStream<String> words =
                lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
        JavaPairDStream<String, Integer> wordCounts =
                words.mapToPair(s -> new Tuple2<>(s, 1))
                        .reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

When I submit the application using the following command:

spark-submit --class spark.kafka.SparkKafkaMain --master local[2] test.jar localhost:2181 test MOVIE 1 >> out

I get this error: java.lang.NoClassDefFoundError: Could not initialize class kafka.utils.Json$ at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:251).

I've tried different combinations of dependencies, but nothing seems to work. I'm new to Kafka and Spark, so any help would be appreciated.

0

There are 0 answers