Pyspark pandas_udf error with integer return values

38 views Asked by At

I am currently learning about pandas_udf in pyspark. Here is my spark dataframe:

import pyspark
import pandas as pd
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import  StringType, IntegerType, ArrayType


spark = pyspark.sql.SparkSession.builder.appName('Tmp').master('local').getOrCreate()


df = spark.createDataFrame([(1, 'Europe', 'France'),
                            (2, 'Asia', 'China'),
                            (3, 'Africa', 'Egypt'),
                            (4, 'Asia', 'India')], ['id', 'Continent', 'Country'])

df.show(truncate=False)
+---+---------+-------+                                                         
|id |Continent|Country|
+---+---------+-------+
|1  |Europe   |France |
|2  |Asia     |China  |
|3  |Africa   |Egypt  |
|4  |Asia     |India  |
+---+---------+-------+

I created a function named as dummy_func_string which returns a basic pandas series of string values and used that function to create a pandas_udf to add a new column in the dataframe:

def dummy_func_string(x):
    return pd.Series(['1', '2', '3', '4'])

string_udf = pandas_udf(dummy_func_string, ArrayType(StringType()))

df.withColumn('UDF_String', string_udf(col('country'))).show()
+---+---------+-------+----------+                                              
| id|Continent|Country|UDF_String|
+---+---------+-------+----------+
|  1|   Europe| France|       [1]|
|  2|     Asia|  China|       [2]|
|  3|   Africa|  Egypt|       [3]|
|  4|     Asia|  India|       [4]|
+---+---------+-------+----------+

This part above works fine, but when I try to create a pandas_udf using a function that returns integer value pandas series instead of string, I am getting an error:

def dummy_func_integer(x):
    return pd.Series([1, 2, 3, 4])

integer_udf = pandas_udf(dummy_func_integer, ArrayType(IntegerType()))

df.withColumn('UDF_Integer', integer_udf(col('country'))).show()

This is the error:

24/02/11 22:29:20 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/series.py", line 4904, in apply
    ).apply()
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1427, in apply
    return self.apply_standard()
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1507, in apply_standard
    mapped = obj._map_values(
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
    return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
    return lib.map_infer(values, mapper, convert=convert)
  File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 814, in convert_array
    assert isinstance(value, Iterable)
AssertionError

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
24/02/11 22:29:20 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 9) (172.31.5.103 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/series.py", line 4904, in apply
    ).apply()
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1427, in apply
    return self.apply_standard()
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1507, in apply_standard
    mapped = obj._map_values(
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
    return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
    return lib.map_infer(values, mapper, convert=convert)
  File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 814, in convert_array
    assert isinstance(value, Iterable)
AssertionError

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

24/02/11 22:29:20 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 959, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/series.py", line 4904, in apply
    ).apply()
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1427, in apply
    return self.apply_standard()
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1507, in apply_standard
    mapped = obj._map_values(
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
    return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
    return lib.map_infer(values, mapper, convert=convert)
  File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
  File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 814, in convert_array
    assert isinstance(value, Iterable)
AssertionError

Can someone please help me understand when the function return string values, the code works fine, but when integer values are returned, there is an error?

0

There are 0 answers