I am having this error when running a EMR with a notebook passing some dates:
An error occurred: An error occurred while calling o236.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 87.0 failed 4 times, most recent failure: Lost task 5.3 in stage 87.0 (TID 4695) (ip-10-19-32-195.eu-west-2.compute.internal executor 5): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading to Spark >= 3.0: Fail to parse '2024-4-01' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
I have read that passing one of these solve the issue:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
or
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
and that happens when running it in the notebook, but when passing it to pyspark script and running it as step it keeps failing.
I have tried to pass it as part of the configuration when creating the cluster in the optional_node_config block like this:
def create_emr_sedona_cluster(json_cluster_config, json_build_config, session, use_spot_instances=False,
step_concurrency_level=1):
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
cluster_name = json_cluster_config[0]['clustername']
build = json_build_config[0]['build']
loguri = json_cluster_config[0]['loguri'] + "/" + build
build_cluster_name = build + cluster_name + formatted_date
emr_client = session.client('emr')
print("json_config: ", json_cluster_config)
instance_market = 'SPOT' if use_spot_instances else 'ON_DEMAND'
bootstrap_path = json_cluster_config[0]['bootstrapfilelocation']
bootstrap_actions = [
]
ebs_configuration = { }
optional_node_config = [
{
"Classification": "spark-defaults",
"Properties": {
"spark.yarn.dist.jars": "/usr/lib/spark/jars/sedona-spark-shaded-3.3_2.12-1.7.1.jar,/usr/lib/spark/jars/geotools-wrapper-1.7.1-28.5.jar",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryo.registrator": "org.apache.sedona.core.serde.SedonaKryoRegistrator",
"spark.sql.extensions": "org.apache.sedona.viz.sql.SedonaVizExtensions,org.apache.sedona.sql.SedonaSqlExtensions",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "10",
"spark.executor.memory": "8g",
"spark.executor.cores": "4",
"spark.executor.memoryOverhead": "1024",
"spark.driver.memory": "8g",
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",
"spark.kryoserializer.buffer.max": "512m",
"spark.sql.autoBroadcastJoinThreshold": "-1",
"spark.network.timeout": "300s",
"spark.executor.heartbeatInterval": "60s",
"spark.sql.legacy.timeParserPolicy": "LEGACY"
}
}
]
core_instance_count = json_cluster_config[0].get('coreinstancecount', 2)
response = emr_client.run_job_flow(
)
# ✅ Return the Cluster ID
cluster_id = response['JobFlowId']
print(f"EMR cluster started with ID: {cluster_id}")
return cluster_id
but is still not processing it correctly, is there anyway to deal with this?
thanks