o1566.showString error persists even with os env variables defined with PySpark in jupyter notebook

63 views Asked by At

I am a beginner at using PySpark and since I do not have access to the databricks platform, I am currently exploring learning through JupyterNotebook. I have reviewed other forums and tried different methods, however, the problem still persists when I run the .show() method. Below is my code:

import findspark
findspark.init()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AnotherTestDemo").getOrCreate()

import json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType, IntegerType, DateType

myschema = StructType([
    StructField("EmpID", IntegerType(), True),
    StructField("Details", StructType([
        StructField("FirstName", StringType(), True),
        StructField("LastName", StringType(), True),
        StructField("ADEmail", StringType(), True),
        StructField("DOB", StringType(), True),
        StructField("State", StringType(), True),
        StructField("GenderCode", StringType(), True),
        StructField("LocationCode", IntegerType(), True),
        StructField("RaceDesc", StringType(), True),
        StructField("MaritalDesc", StringType(), True),
    ]), True),
    StructField("JobProfile", StructType([
        StructField("Title", StringType(), True),
        StructField("BusinessUnit", StringType(), True),
        StructField("Supervisor", StringType(), True),
        StructField("EmployeeType", StringType(), True),
        StructField("DepartmentType", StringType(), True),
        StructField("Division", StringType(), True),
        StructField("JobFunctionDescription", StringType(), True),
    ]), True),
    StructField("Payroll", StructType([
        StructField("EmployeeClassificationType", StringType(), True),
        StructField("PayZone", StringType(), True),
    ]), True),
    StructField("Performance", StructType([
        StructField("Performance Score", StringType(), True),
        StructField("Current Employee Rating", IntegerType(), True),
    ]), True),
    StructField("History", StructType([
        StructField("StartDate", DateType(), True),
        StructField("EmployeeStatus", StringType(), True),
        StructField("ExitDetails", ArrayType(StructType([
            StructField("ExitDate", DateType(), True),
            StructField("TerminationType", StringType(), True),
            StructField("TerminationDescription", StringType(), True),
        ])), True),
    ]), True)
])
schemadefined_df = spark.read.json("employee_data.json", schema=myschema, multiLine=True)

from pysparketl.dataframes import flattenDF
with_defined_schema_df = flattenDF(schemadefined_df)

with_defined_schema_df.show()

The specific error message is as follows:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-105-b38a28ad0f7b> in <module>
      1 #with_defined_schema_df.show()

C:\Spark\spark-3.4.1-bin-hadoop3\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    897 
    898         if isinstance(truncate, bool) and truncate:
--> 899             print(self._jdf.showString(n, 20, vertical))
    900         else:
    901             try:

C:\Spark\spark-3.4.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1320 
   1321         answer = self.gateway_client.send_command(command)
-> 1322         return_value = get_return_value(
   1323             answer, self.gateway_client, self.target_id, self.name)
   1324 

C:\Spark\spark-3.4.1-bin-hadoop3\python\pyspark\errors\exceptions\captured.py in deco(*a, **kw)
    167     def deco(*a: Any, **kw: Any) -> Any:
    168         try:
--> 169             return f(*a, **kw)
    170         except Py4JJavaError as e:
    171             converted = convert_exception(e.java_exception)

C:\Spark\spark-3.4.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1573.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 87.0 failed 1 times, most recent failure: Lost task 0.0 in stage 87.0 (TID 60) (LAPTOP-KQHH2FNK executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to java.base/java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$1(FileFormat.scala:156)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
    at jdk.internal.reflect.GeneratedMethodAccessor61.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to java.base/java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$1(FileFormat.scala:156)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    ... 1 more

Further details: PySpark version: 3.4.1 Python version: 3.8.8 I have also restarted the Jupyter kernel and still the same error when running the .show() function

0

There are 0 answers