0

I am having this error when running a EMR with a notebook passing some dates:

    An error occurred: An error occurred while calling o236.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 87.0 failed 4 times, most recent failure: Lost task 5.3 in stage 87.0 (TID 4695) (ip-10-19-32-195.eu-west-2.compute.internal executor 5): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading to Spark >= 3.0: Fail to parse '2024-4-01' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.

I have read that passing one of these solve the issue:

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

or

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

and that happens when running it in the notebook, but when passing it to pyspark script and running it as step it keeps failing.

I have tried to pass it as part of the configuration when creating the cluster in the optional_node_config block like this:

def create_emr_sedona_cluster(json_cluster_config, json_build_config, session, use_spot_instances=False,
                              step_concurrency_level=1):
    current_date = datetime.now()
    formatted_date = current_date.strftime("%Y%m%d")
    cluster_name = json_cluster_config[0]['clustername']
    build = json_build_config[0]['build']
    loguri = json_cluster_config[0]['loguri'] + "/" + build
    build_cluster_name = build + cluster_name + formatted_date
    emr_client = session.client('emr')

    print("json_config: ", json_cluster_config)

    instance_market = 'SPOT' if use_spot_instances else 'ON_DEMAND'
    bootstrap_path = json_cluster_config[0]['bootstrapfilelocation']

    bootstrap_actions = [
    ]

    ebs_configuration = {        }

    optional_node_config = [
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.yarn.dist.jars": "/usr/lib/spark/jars/sedona-spark-shaded-3.3_2.12-1.7.1.jar,/usr/lib/spark/jars/geotools-wrapper-1.7.1-28.5.jar",
                "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                "spark.kryo.registrator": "org.apache.sedona.core.serde.SedonaKryoRegistrator",
                "spark.sql.extensions": "org.apache.sedona.viz.sql.SedonaVizExtensions,org.apache.sedona.sql.SedonaSqlExtensions",
                "spark.dynamicAllocation.enabled": "true",
                "spark.dynamicAllocation.minExecutors": "2",
                "spark.dynamicAllocation.maxExecutors": "10",
                "spark.executor.memory": "8g",
                "spark.executor.cores": "4",
                "spark.executor.memoryOverhead": "1024",
                "spark.driver.memory": "8g",
                "spark.sql.shuffle.partitions": "200",
                "spark.default.parallelism": "200",
                "spark.kryoserializer.buffer.max": "512m",
                "spark.sql.autoBroadcastJoinThreshold": "-1",
                "spark.network.timeout": "300s",
                "spark.executor.heartbeatInterval": "60s",
                "spark.sql.legacy.timeParserPolicy": "LEGACY"
            }
        }
    ]

    core_instance_count = json_cluster_config[0].get('coreinstancecount', 2)

    response = emr_client.run_job_flow(
    )

    # ✅ Return the Cluster ID
    cluster_id = response['JobFlowId']
    print(f"EMR cluster started with ID: {cluster_id}")
    return cluster_id

but is still not processing it correctly, is there anyway to deal with this?

thanks

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.