I have an RDD containing pairs of nodes and I need to assign unique ids to them.
But I'm getting an NPE and I can't figure out how to solve it.
I'm basically putting all nodes into a distinct list and then I assigns uniqueIds to them. After that I merge the original pairs with this new assignment.
The code looks like this:
JavaPairRDD<Node, Node> pairs = // ... assigned previously
JavaPairRDD<Node, Long> index = pairs
.flatMap(tuple -> Arrays.asList(tuple._1(), tuple._2()).iterator())
.distinct()
.zipWithUniqueId();
pairs.leftOuterJoin(index)
.mapToPair(new MergeJoinResult())
.mapToPair(Tuple2::swap)
.leftOuterJoin(index)
.mapToPair(new MergeJoinResult())
.mapToPair(Tuple2::swap);
/*
* Given a tuple like (node1, (node2, node1Index))
* Creates a new tuple (node1, node2) where node1 is initialized with its index
*/
static class MergeJoinResult implements
PairFunction<Tuple2<Node, Tuple2<Node, Optional<Long>>>, Node, Node>,
Serializable {
@Override
public Tuple2<Node, Node> call(Tuple2<Node, Tuple2<Node, Optional<Long>>> row) throws Exception {
return Tuple2.apply(new Node(row._1(), row._2()._2().get()), row._2()._1());
}
}
The problem I'm having is that row._2()._2().get() returns Optional.empty and I get the NPE.
But this should be impossible because I'm deriving the index RDD from the pairs RDD. So a leftOuterJoin between them should always produce a match.
As a sanity check I added code to dump the entire RDD to S3 to see the contents of pairs and index. The data is there, all edges, and the proper index entry with its uniqueId. I did the dump using toString().
Then I thought that the problem would be with the equals implementation and I performed a delombok of the code and added print statements to verify if the comparision
between objects was returning false. In my log, the comparison always returns true, so I have no idea why I'm getting the Optional.empty there.
Another strange thing I notice is that when I stringfy the objects in the index and perform a group by, I find duplicates in it:
index
.groupBy(t -> t._1().toString())
.filter(t -> {
int size = 0;
for (Tuple2<Node, Long> value : t._2()) {
size++;
if (size >= 2) return true;
}
return false;
});
The same happens if I perform a pairs.cogroup(index). I get multiple entries with the same K.
I tried to perform a comparison between these objects after grouping them by their string representation but their equals and hashCode returns the same result. I'm using Lombok implementation for those.
I also tried to serialize the RDD before this code to JSON and loaded everything in my machine, but the NPE doesn't occur in my machine after I do this.
I'm a little lost here.
My next guess would be that the problem is with the serialization (I'm using Kryo). Another option that I'm going to try is to set a different partitioner for the RDD.
Any suggestions on what I could do here? I'm using Spark 3.3.1 in AWS Glue.
Edit: I changed things to serialize all Node objects into JSON strings and used that as a join key, that worked. This hints further to serialization issues, I don't think it's related to equals or hashCode because those are implemented by Lombok.
If one joins
LEFT_TABLEtoRIGHT_TABLEusing aLEFT OUTER JOINand there is an entity in theLEFT_TABLEwithout a corresponding match in theRIGHT_TABLE, you will get NULLs, yes.In this sense, the
OUTERspecification in the JOIN is the issue. It is hard to tell from what is given, however it seems that you would prefer anINNER JOINhere.See What's the difference between INNER JOIN, LEFT JOIN, RIGHT JOIN and FULL JOIN? for more information on the distinction