0

I'm doing data preprocessing for this csv file of 1 million rows and hoping to shrink it down to 600000 rows. However I'm having trouble always when doing an apply function on a column in the dataframe because it always raises an EOF error.

import os
import numpy as np
import ast
import re

import pyspark.sql.functions as f

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, StructField, StructType, FloatType

DATA_DIR = "./data/chronic-disease-data"

path = os.path.join(DATA_DIR, "U.S._Chronic_Disease_Indicators__CDI___2023_Release.csv")

spark = SparkSession.builder.appName('test')\
    .config("spark.executor.memory", "6g")\
    .getOrCreate()

test_spark_df_00_10 = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path)

cols_to_drop = ["Response",
    "ResponseID",
    "DataValueFootnoteSymbol",
    "DatavalueFootnote",

    "StratificationCategory2",
    "Stratification2",
    "StratificationCategory3",
    "Stratification3",

    "StratificationCategoryID1",
    "StratificationID1",
    "StratificationCategoryID2",
    "StratificationID2",
    "StratificationCategoryID3",
    "StratificationID3"]
test_spark_df_00_10 = test_spark_df_00_10.drop(*cols_to_drop)
test_spark_df_00_10.show()

it shows rows fine even after dropping which is expected

+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|    Stratification1|         GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|     2014|   2014|          AR|            Arkansas| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      916|       916.0|              NULL|               NULL|                 Gender|               Male|POINT (-92.274490...|         5|    AST|    AST3_1|           NMBR|
|     2018|   2018|          CO|            Colorado| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     2227|      2227.0|              NULL|               NULL|                Overall|            Overall|POINT (-106.13361...|         8|    AST|    AST3_1|           NMBR|
|     2018|   2018|          DC|District of Columbia| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      708|       708.0|              NULL|               NULL|                Overall|            Overall|POINT (-77.036871...|        11|    AST|    AST3_1|           NMBR|
|     2017|   2017|          GA|             Georgia| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     3520|      3520.0|              NULL|               NULL|                 Gender|             Female|POINT (-83.627580...|        13|    AST|    AST3_1|           NMBR|
|     2010|   2010|          MI|            Michigan| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      123|       123.0|              NULL|               NULL|         Race/Ethnicity|           Hispanic|POINT (-84.714390...|        26|    AST|    AST3_1|           NMBR|
|     2015|   2015|          MT|             Montana| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     NULL|        NULL|              NULL|               NULL|         Race/Ethnicity|           Hispanic|POINT (-109.42442...|        30|    AST|    AST3_1|           NMBR|
|     2013|   2013|          OR|              Oregon| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      760|       760.0|              NULL|               NULL|                 Gender|               Male|POINT (-120.15503...|        41|    AST|    AST3_1|           NMBR|
|     2013|   2013|          PR|         Puerto Rico| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     NULL|        NULL|              NULL|               NULL|                Overall|            Overall|POINT (-66.590149...|        72|    AST|    AST3_1|           NMBR|
|     2017|   2017|          PR|         Puerto Rico| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     NULL|        NULL|              NULL|               NULL|                Overall|            Overall|POINT (-66.590149...|        72|    AST|    AST3_1|           NMBR|
|     2010|   2010|          WI|           Wisconsin| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     1967|      1967.0|              NULL|               NULL|                 Gender|               Male|POINT (-89.816370...|        55|    AST|    AST3_1|           NMBR|
|     2016|   2016|          WI|           Wisconsin| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      110|       110.0|              NULL|               NULL|         Race/Ethnicity|           Hispanic|POINT (-89.816370...|        55|    AST|    AST3_1|           NMBR|
|     2014|   2014|          AL|             Alabama|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       22|        22.0|              NULL|               NULL|                 Gender|               Male|POINT (-86.631860...|         1|    AST|    AST4_1|           NMBR|
|     2015|   2015|          ID|               Idaho|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       21|        21.0|              NULL|               NULL|                Overall|            Overall|POINT (-114.36373...|        16|    AST|    AST4_1|           NMBR|
|     2016|   2016|          ID|               Idaho|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       21|        21.0|              NULL|               NULL|                Overall|            Overall|POINT (-114.36373...|        16|    AST|    AST4_1|           NMBR|
|     2020|   2020|          IL|            Illinois|      NVSS|Asthma|Asthma mortality ...|       Number|       Number|       89|        89.0|              NULL|               NULL|                 Gender|               Male|POINT (-88.997710...|        17|    AST|    AST4_1|           NMBR|
|     2012|   2012|          KS|              Kansas|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       24|        24.0|              NULL|               NULL|         Race/Ethnicity|White, non-Hispanic|POINT (-98.200781...|        20|    AST|    AST4_1|           NMBR|
|     2015|   2015|          KS|              Kansas|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       29|        29.0|              NULL|               NULL|                Overall|            Overall|POINT (-98.200781...|        20|    AST|    AST4_1|           NMBR|
|     2018|   2018|          KS|              Kansas|      NVSS|Asthma|Asthma mortality ...|       Number|       Number|       29|        29.0|              NULL|               NULL|                Overall|            Overall|POINT (-98.200781...|        20|    AST|    AST4_1|           NMBR|
|     2017|   2017|          LA|           Louisiana|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       21|        21.0|              NULL|               NULL|                 Gender|               Male|POINT (-92.445680...|        22|    AST|    AST4_1|           NMBR|
|     2017|   2017|          MA|       Massachusetts|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       28|        28.0|              NULL|               NULL|                 Gender|               Male|POINT (-72.082690...|        25|    AST|    AST4_1|           NMBR|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
only showing top 20 rows

but when I get here

test_udf = f.udf(lambda x: 1.0, FloatType())
test_spark_df_00_10.withColumn("Test", test_udf(f.col("YearStart"))).show()

here is the traceback to the error:

----> 1 test_spark_df_00_10.withColumn("Test", test_udf(f.col("GeoLocation"))).select("Test").collect()

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\pyspark\sql\dataframe.py:1263, in DataFrame.collect(self)
   1243 """Returns all the records as a list of :class:`Row`.
   1244 
   1245 .. versionadded:: 1.3.0
   (...)   1260 [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
   1261 """
   1262 with SCCallSiteSync(self._sc):
-> 1263     sock_info = self._jdf.collectToPython()
   1264 return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o552.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 68.0 failed 1 times, most recent failure: Lost task 3.0 in stage 68.0 (TID 309) (LAPTOP-3GL266K9.bbrouter executor driver): java.net.SocketException: Connection reset by peer
    at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
    at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
    at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
    at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
    at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
    at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
    at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
    at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
    at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
    at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146)
    at jdk.internal.reflect.GeneratedMethodAccessor169.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketException: Connection reset by peer
    at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
    at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
    at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
    at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
    at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
    at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
    at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
    at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
    at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

a simple user defined function like this always keeps raising a EOF error. I don't know what could be causing this. Is it the null values in the column? What am I missing here? All I want is to be able to do the necessary transformation using these user defined functions

4
  • What is your Spark and Java version? Can you provide the full error traceback?
    – Jonathan
    Commented Apr 22 at 10:17
  • edited it just now Commented Apr 22 at 10:22
  • A simple pyspark code is working? Seens that the problem is related misconfiguration. What are your Spark version, python-version? Commented Apr 22 at 19:16
  • Its spark is in 3.5.5 and the python installed globally is 3.12.6 Commented Apr 23 at 3:14

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.