I'm doing data preprocessing for this csv file of 1 million rows and hoping to shrink it down to 600000 rows. However I'm having trouble always when doing an apply
function on a column in the dataframe because it always raises an EOF
error.
import os
import numpy as np
import ast
import re
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, StructField, StructType, FloatType
DATA_DIR = "./data/chronic-disease-data"
path = os.path.join(DATA_DIR, "U.S._Chronic_Disease_Indicators__CDI___2023_Release.csv")
spark = SparkSession.builder.appName('test')\
.config("spark.executor.memory", "6g")\
.getOrCreate()
test_spark_df_00_10 = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(path)
cols_to_drop = ["Response",
"ResponseID",
"DataValueFootnoteSymbol",
"DatavalueFootnote",
"StratificationCategory2",
"Stratification2",
"StratificationCategory3",
"Stratification3",
"StratificationCategoryID1",
"StratificationID1",
"StratificationCategoryID2",
"StratificationID2",
"StratificationCategoryID3",
"StratificationID3"]
test_spark_df_00_10 = test_spark_df_00_10.drop(*cols_to_drop)
test_spark_df_00_10.show()
it shows rows fine even after dropping which is expected
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|YearStart|YearEnd|LocationAbbr| LocationDesc|DataSource| Topic| Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1| Stratification1| GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
| 2014| 2014| AR| Arkansas| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 916| 916.0| NULL| NULL| Gender| Male|POINT (-92.274490...| 5| AST| AST3_1| NMBR|
| 2018| 2018| CO| Colorado| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 2227| 2227.0| NULL| NULL| Overall| Overall|POINT (-106.13361...| 8| AST| AST3_1| NMBR|
| 2018| 2018| DC|District of Columbia| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 708| 708.0| NULL| NULL| Overall| Overall|POINT (-77.036871...| 11| AST| AST3_1| NMBR|
| 2017| 2017| GA| Georgia| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 3520| 3520.0| NULL| NULL| Gender| Female|POINT (-83.627580...| 13| AST| AST3_1| NMBR|
| 2010| 2010| MI| Michigan| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 123| 123.0| NULL| NULL| Race/Ethnicity| Hispanic|POINT (-84.714390...| 26| AST| AST3_1| NMBR|
| 2015| 2015| MT| Montana| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| NULL| NULL| NULL| NULL| Race/Ethnicity| Hispanic|POINT (-109.42442...| 30| AST| AST3_1| NMBR|
| 2013| 2013| OR| Oregon| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 760| 760.0| NULL| NULL| Gender| Male|POINT (-120.15503...| 41| AST| AST3_1| NMBR|
| 2013| 2013| PR| Puerto Rico| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| NULL| NULL| NULL| NULL| Overall| Overall|POINT (-66.590149...| 72| AST| AST3_1| NMBR|
| 2017| 2017| PR| Puerto Rico| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| NULL| NULL| NULL| NULL| Overall| Overall|POINT (-66.590149...| 72| AST| AST3_1| NMBR|
| 2010| 2010| WI| Wisconsin| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 1967| 1967.0| NULL| NULL| Gender| Male|POINT (-89.816370...| 55| AST| AST3_1| NMBR|
| 2016| 2016| WI| Wisconsin| SEDD; SID|Asthma|Hospitalizations ...| NULL| Number| 110| 110.0| NULL| NULL| Race/Ethnicity| Hispanic|POINT (-89.816370...| 55| AST| AST3_1| NMBR|
| 2014| 2014| AL| Alabama| NVSS|Asthma|Asthma mortality ...| NULL| Number| 22| 22.0| NULL| NULL| Gender| Male|POINT (-86.631860...| 1| AST| AST4_1| NMBR|
| 2015| 2015| ID| Idaho| NVSS|Asthma|Asthma mortality ...| NULL| Number| 21| 21.0| NULL| NULL| Overall| Overall|POINT (-114.36373...| 16| AST| AST4_1| NMBR|
| 2016| 2016| ID| Idaho| NVSS|Asthma|Asthma mortality ...| NULL| Number| 21| 21.0| NULL| NULL| Overall| Overall|POINT (-114.36373...| 16| AST| AST4_1| NMBR|
| 2020| 2020| IL| Illinois| NVSS|Asthma|Asthma mortality ...| Number| Number| 89| 89.0| NULL| NULL| Gender| Male|POINT (-88.997710...| 17| AST| AST4_1| NMBR|
| 2012| 2012| KS| Kansas| NVSS|Asthma|Asthma mortality ...| NULL| Number| 24| 24.0| NULL| NULL| Race/Ethnicity|White, non-Hispanic|POINT (-98.200781...| 20| AST| AST4_1| NMBR|
| 2015| 2015| KS| Kansas| NVSS|Asthma|Asthma mortality ...| NULL| Number| 29| 29.0| NULL| NULL| Overall| Overall|POINT (-98.200781...| 20| AST| AST4_1| NMBR|
| 2018| 2018| KS| Kansas| NVSS|Asthma|Asthma mortality ...| Number| Number| 29| 29.0| NULL| NULL| Overall| Overall|POINT (-98.200781...| 20| AST| AST4_1| NMBR|
| 2017| 2017| LA| Louisiana| NVSS|Asthma|Asthma mortality ...| NULL| Number| 21| 21.0| NULL| NULL| Gender| Male|POINT (-92.445680...| 22| AST| AST4_1| NMBR|
| 2017| 2017| MA| Massachusetts| NVSS|Asthma|Asthma mortality ...| NULL| Number| 28| 28.0| NULL| NULL| Gender| Male|POINT (-72.082690...| 25| AST| AST4_1| NMBR|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
only showing top 20 rows
but when I get here
test_udf = f.udf(lambda x: 1.0, FloatType())
test_spark_df_00_10.withColumn("Test", test_udf(f.col("YearStart"))).show()
here is the traceback to the error:
----> 1 test_spark_df_00_10.withColumn("Test", test_udf(f.col("GeoLocation"))).select("Test").collect()
File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\pyspark\sql\dataframe.py:1263, in DataFrame.collect(self)
1243 """Returns all the records as a list of :class:`Row`.
1244
1245 .. versionadded:: 1.3.0
(...) 1260 [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
1261 """
1262 with SCCallSiteSync(self._sc):
-> 1263 sock_info = self._jdf.collectToPython()
1264 return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))
File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)
File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o552.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 68.0 failed 1 times, most recent failure: Lost task 3.0 in stage 68.0 (TID 309) (LAPTOP-3GL266K9.bbrouter executor driver): java.net.SocketException: Connection reset by peer
at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146)
at jdk.internal.reflect.GeneratedMethodAccessor169.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketException: Connection reset by peer
at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
a simple user defined function like this always keeps raising a EOF error. I don't know what could be causing this. Is it the null values in the column? What am I missing here? All I want is to be able to do the necessary transformation using these user defined functions