0

I am trying to run a batch process using Apache Airflow. The Extract and Transform stages work very fine but the load stages is giving an error. Here is my code:

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy_utils import create_database, database_exists
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
from datetime import timedelta
import psycopg2

@task
def load(df):
    try:
        connection_details = BaseHook.get_connection("my_postgres_conn")
    except Exception as e:
        print(f"❌ Could not retrieve Airflow connection: {e}")
        raise

    uid = connection_details.login
    pwd = connection_details.password
    host = connection_details.host
    port = connection_details.port
    schema = connection_details.schema

    postgres_url = f"postgresql+psycopg2://{uid}:{pwd}@{host}:{port}/{schema}"

    # Create the database if it doesn't exist
    if not database_exists(postgres_url):
        create_database(postgres_url)

    engine = create_engine(postgres_url)

    try:
        # Use the engine object directly with to_sql
        # This syntax is fully compatible with SQLAlchemy 1.4.54
        df.to_sql("Bank_Cap", con=engine, if_exists="replace", index=False)
        print("✅ Data loaded successfully into PostgreSQL")
    except Exception as e:
        print(f"❌ Error loading data: {e}")
        raise

    # Save a CSV as backup
    df.to_csv("my_largest_bank.csv", index=False)

    print(f"Postgres URL: {postgres_url}")
    print(f"DataFrame to load:\n{df}")
    log.info(f"Postgres URL: {postgres_url}")
    log.info(f"DataFrame to load: {df}")

I am getting the following error:

[2025-08-17, 02:06:00] WARNING - Using Connection.get_connection_from_secrets from airflow.models is deprecated.Please use from airflow.sdk import Connection instead: category="DeprecationWarning": filename="/home/eziuche/.local/share/virtualenvs/airflow-project-kVhV5bz8/lib/python3.11/site-packages/airflow/models/connection.py": lineno=471: source="py.warnings" [2025-08-17, 02:06:00] INFO - Connection Retrieved 'my_postgres_conn': source="airflow.hooks.base" [2025-08-17, 02:06:00] WARNING - pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.: category="UserWarning": [2025-08-17, 02:06:00] INFO - ❌ Error loading data: 'Engine' object has no attribute 'cursor': chan="stdout": source="task" [2025-08-17, 02:06:00] ERROR - Task failed with exception: source="task" AttributeError: 'Engine' object has no attribute 'cursor'

I have downgraded my SqlAlchemy and Pandas. currently I use What could be wrong. I am using apache-airflow-core==3.0.4, SQLAlchemy==1.4.54, SQLAlchemy-JSONField==1.0.2, SQLAlchemy-Utils==0.41.2 and pandas==1.5.3. What am I getting wrong

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.