I'm training a Keras Model to build a recommender system and running it on Spark with Horovod and hvd.KerasEstimator().
Here is my Estimator :
keras_estimator = hvd.KerasEstimator(
num_proc=2,
store=store,
model=model,
optimizer=optimizer,
loss='mse',
metrics=[tf.keras.metrics.RootMeanSquaredError()],
feature_cols=['userID','itemID'],
label_cols=['rating'],
batch_size=512,
epochs=5,
verbose=1)
keras_model = keras_estimator.fit(train_df).setOutputCols(['rating_prob'])
The predict function is just :
pred_df = keras_model.transform(test_df)
The model is trained without any problem and I am able to get loss for every epochs but I struggle with predictions!
The predict function doesn't print any error and seems to work but the pred_df is impossible to manipulate.
I tried to do :
pred_df.show() or pred_df.toPandas() but everything raise the same error below :
"org.apache.spark.api.python.PythonException: 'ValueError: cannot convert Spark data Type <class 'pyspark.sql.types.DecimalType'> to native python type'"
I dont understand because my train_df and test_df have the same types !
I've tryed changing types with :
# reset data types to integer and float for tensorflow
train_df = train_df.withColumn("itemID",col("itemID").cast(IntegerType())) \
.withColumn("userID",col("userID").cast(IntegerType())) \
.withColumn("rating",col("rating").cast(FloatType()))
test_df = test_df.withColumn("itemID",col("itemID").cast(IntegerType())) \
.withColumn("userID",col("userID").cast(IntegerType())) \
.withColumn("rating",col("rating").cast(FloatType()))
But the error is still here..
Below is an example of my data :
Can you help me solve this issue please ?
Thanks in advance
I just tried to change column types but it doesn't change anything