I am trying to run a batch process using Apache Airflow. The Extract and Transform stages work very fine but the load stages is giving an error. Here is my code:
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy_utils import create_database, database_exists
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
from datetime import timedelta
import psycopg2
@task
def load(df):
try:
connection_details = BaseHook.get_connection("my_postgres_conn")
except Exception as e:
print(f"❌ Could not retrieve Airflow connection: {e}")
raise
uid = connection_details.login
pwd = connection_details.password
host = connection_details.host
port = connection_details.port
schema = connection_details.schema
postgres_url = f"postgresql+psycopg2://{uid}:{pwd}@{host}:{port}/{schema}"
# Create the database if it doesn't exist
if not database_exists(postgres_url):
create_database(postgres_url)
engine = create_engine(postgres_url)
try:
# Use the engine object directly with to_sql
# This syntax is fully compatible with SQLAlchemy 1.4.54
df.to_sql("Bank_Cap", con=engine, if_exists="replace", index=False)
print("✅ Data loaded successfully into PostgreSQL")
except Exception as e:
print(f"❌ Error loading data: {e}")
raise
# Save a CSV as backup
df.to_csv("my_largest_bank.csv", index=False)
print(f"Postgres URL: {postgres_url}")
print(f"DataFrame to load:\n{df}")
log.info(f"Postgres URL: {postgres_url}")
log.info(f"DataFrame to load: {df}")
I am getting the following error:
[2025-08-17, 02:06:00] WARNING - Using Connection.get_connection_from_secrets from airflow.models is deprecated.Please use from airflow.sdk import Connection instead: category="DeprecationWarning": filename="/home/eziuche/.local/share/virtualenvs/airflow-project-kVhV5bz8/lib/python3.11/site-packages/airflow/models/connection.py": lineno=471: source="py.warnings"
[2025-08-17, 02:06:00] INFO - Connection Retrieved 'my_postgres_conn': source="airflow.hooks.base"
[2025-08-17, 02:06:00] WARNING - pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.: category="UserWarning":
[2025-08-17, 02:06:00] INFO - ❌ Error loading data: 'Engine' object has no attribute 'cursor': chan="stdout": source="task"
[2025-08-17, 02:06:00] ERROR - Task failed with exception: source="task"
AttributeError: 'Engine' object has no attribute 'cursor'
I have downgraded my SqlAlchemy and Pandas. currently I use What could be wrong. I am using apache-airflow-core==3.0.4, SQLAlchemy==1.4.54, SQLAlchemy-JSONField==1.0.2, SQLAlchemy-Utils==0.41.2 and pandas==1.5.3. What am I getting wrong