11

I have a data frame (df). For showing its schema I use:

from pyspark.sql.functions import *
df1.printSchema()

And I get the following result:

#root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)

Sometimes the schema changes (the column type or name):

df2.printSchema()


 #root
        # |-- name: array (nullable = true)
        # |-- gender: integer (nullable = true)
        # |-- age: long (nullable = true)

I would like to compare between the two schemas (df1 and df2) and get only the differences in types and columns names (Sometimes the column can move to another position). The results should be a table (or data frame) something like this:

   column                df1          df2     diff                       
    name:               string       array     type                             
    gender:              N/A         integer   new column 

(age column is the same and didn't change. In case of omission of column there will be indication 'omitted') How can I do it if efficiently if I have many columns in each?

1
  • 2
    Its missing the solution you have attempted... Commented Feb 7, 2018 at 21:22

7 Answers 7

13

Without any external library, we can find the schema difference using

from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame

def schema_diff(spark: SparkSession, df_1: DataFrame, df_2: DataFrame):
    s1 = spark.createDataFrame(df_1.dtypes, ["d1_name", "d1_type"])
    s2 = spark.createDataFrame(df_2.dtypes, ["d2_name", "d2_type"])
    difference = (
        s1.join(s2, s1.d1_name == s2.d2_name, how="outer")
        .where(s1.d1_type.isNull() | s2.d2_type.isNull())
        .select(s1.d1_name, s1.d1_type, s2.d2_name, s2.d2_type)
        .fillna("")
    )
    return difference

  • fillna is optional. I prefer to view them as empty string.
  • in where clause we use type because this will help us to show even if column exists in both dataframe but they have different schemas.
  • this will also show all columns that are in second dataframe but not in first dataframe

Usage:

diff = schema_diff(spark, df_1, df_2)
diff.show(diff.count(), truncate=False)
Sign up to request clarification or add additional context in comments.

4 Comments

This won't detect type differences as the join condition uses only column names. It should be (s1["d1_name"] == s2["d2_name"]) & (s1["d1_type"] == s2["d2_type"]) instead.
@blackbishop The intent of above function is to show the difference. The intent is not to filter out odd ones. So the output difference dataframe will have all the details (s1.d1_name, s1.d1_type, s2.d2_name, s2.d2_type) so the consumer of this function can do anything he wants.
Okay, in this case you should remove the where clause otherwise it won't return columns with same names but different types. Just try it with theses dataframes: df1 = spark.createDataFrame([("a", (1, 2))], "a: string, b struct<x:int,y:int>"), df2 = spark.createDataFrame([("a", (1, 2))], "a: string, b struct<z:int,t:int>")
The correct clause to only highlight different columns should be: .where(s1.d1_type.isNull() | s2.d2_type.isNull() | (s1.d1_type != s2.d2_type)) Also, coalescing the column name would make the result clearer
6

You can try creating two pandas dataframes with metadata from both DF1 and DF2 like below

pd_df1=pd.DataFrame(df1.dtypes,columns=['column','data_type'])
pd_df2=pd.DataFrame(df2.dtypes,columns=['column','data_type'])

and then join those two pandas dataframes through 'outer' join?

Comments

2

A custom function that could be useful for someone.

def SchemaDiff(DF1, DF2):
  # Getting schema for both dataframes in a dictionary
  DF1Schema = {x[0]:x[1] for x in DF1.dtypes}
  DF2Schema = {x[0]:x[1] for x in DF2.dtypes}
    
   # Column present in DF1 but not in DF2
  DF1MinusDF2 = dict.fromkeys((set(DF1.columns) - set(DF2.columns)), '')
  for column_name in DF1MinusDF2:
    DF1MinusDF2[column_name] = DF1Schema[column_name]
  

  # Column present in DF2 but not in DF1
  DF2MinusDF1 = dict.fromkeys((set(DF2.columns) - set(DF1.columns)), '')
  for column_name in DF2MinusDF1:
    DF2MinusDF1[column_name] = DF2Schema[column_name]
  
  # Find data type changed in DF1 as compared to DF2
  UpdatedDF1Schema = {k:v for k,v in DF1Schema.items() if k not in DF1MinusDF2}
  UpdatedDF1Schema = {**UpdatedDF1Schema, **DF2MinusDF1}
  DF1DataTypesChanged = {}
  for column_name in UpdatedDF1Schema:
    if UpdatedDF1Schema[column_name] != DF2Schema[column_name]:
      DF1DataTypesChanged[column_name] = DF2Schema[column_name]
  
  
  return DF1MinusDF2, DF2MinusDF1, DF1DataTypesChanged

Comments

1

I think that one of the ways to do so is to call df1.schema.jsonValue() or json() method from structType and then jsonValue() returns dict and looks somewhat like below:

{'type': 'struct',
 'fields': [
    {'name': 'SK_CUSTOMER','type': 'long','nullable': True,'metadata': {}},  
    {'name': 'AS_OF_DATE', 'type': 'date', 'nullable': True, 'metadata': {}},...
]}

So this:

sorted([str(entry) for entry in df1.schema.jsonValue()["fields"]])

gives you list of:

["{'name': 'AS_OF_DATE', 'type': 'date', 'nullable': True, 'metadata': {}}",
...]

and then based on that, i'ts just a comparison of strings (so you can wrap those lists in sets and subtract them):

schema_1 = sorted([str(entry) for entry in df1.schema.jsonValue()["fields"]])
schema_2 = sorted([str(entry) for entry in df2.schema.jsonValue()["fields"]])
diff = set(schema_1) - set(schema_2)

Comments

0

extending the excelent example of sairam krish

from typing import Optional

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, coalesce

def diff_action(
    current_name: Optional[str],
    current_type: Optional[str],
    future_name: Optional[str],
    future_type: Optional[str],
):
    if current_name is None and future_name is not None:
        return "add_column"
    if future_name is None and current_name is not None:
        return "drop_column"
    if current_name == future_name and current_type != future_type:
        return "alter_column"

    return None

diff_action_udf = udf(diff_action, StringType())

def schema_diff(spark: SparkSession, current: DataFrame, future: DataFrame):
    current_types = spark.createDataFrame(
        current.dtypes, ["current_name", "current_type"]
    )
    future_types = spark.createDataFrame(future.dtypes, ["future_name", "future_type"])

    difference = (
        current_types.join(
            future_types,
            current_types.current_name == future_types.future_name,
            how="outer",
        )
        .where(
            current_types.current_type.isNull()
            | future_types.future_type.isNull()
            | (current_types.current_type != future_types.future_type)
        )
        .select(
            current_types.current_name,
            current_types.current_type,
            future_types.future_name,
            future_types.future_type,
        )
        .withColumn("column", coalesce("current_name", "future_name"))
        .withColumn("type", coalesce("future_type", "current_type"))
        .withColumn(
            "action",
            diff_action_udf(
                "current_name", "current_type", "future_name", "future_type"
            ),
        )
        .drop("current_name", "current_type", "future_name", "future_type")
    )
    return difference

the according unit tests are below

import chispa

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("column", StringType()),
        StructField("type", StringType()),
        StructField("action", StringType()),
    ]
)


def test_schema_diff_equal(spark):
    # ARRANGE
    current = spark.createDataFrame([(1, "name", "address")], ["id", "name", "active"])
    future = spark.createDataFrame([(1, "name", "address")], ["id", "name", "active"])

    # ACT
    diff = schema_diff(spark, current, future)

    # ASSERT
    expect = spark.createDataFrame([], schema)

    chispa.assert_df_equality(
        diff,
        expect,
        ignore_row_order=True,
    )


def test_schema_diff_add_column(spark):
    # ARRANGE
    current = spark.createDataFrame([(1, "name")], ["id", "name"])
    future = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])

    # ACT
    diff = schema_diff(spark, current, future)

    # ASSERT
    expect = spark.createDataFrame([("active", "string", "add_column")], schema)
    chispa.assert_df_equality(
        diff,
        expect,
        ignore_row_order=True,
    )


def test_schema_diff_drop_column(spark):
    # ARRANGE
    current = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
    future = spark.createDataFrame([(1, "name")], ["id", "name"])

    # ACT
    diff = schema_diff(spark, current, future)

    # ASSERT
    expect = spark.createDataFrame([("active", "string", "drop_column")], schema)
    chispa.assert_df_equality(
        diff,
        expect,
        ignore_row_order=True,
    )


def test_schema_diff_name_change(spark):
    # ARRANGE
    current = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
    future = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "isActive"])

    # ACT
    diff = schema_diff(spark, current, future)

    # ASSERT
    expect = spark.createDataFrame(
        [("active", "string", "drop_column"), ("isActive", "string", "add_column")],
        schema,
    )
    chispa.assert_df_equality(
        diff,
        expect,
        ignore_row_order=True,
    )


def test_schema_diff_type_change(spark):
    # ARRANGE
    current = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
    future = spark.createDataFrame([(1, "name", True)], ["id", "name", "active"])

    # ACT
    diff = schema_diff(spark, current, future)

    # ASSERT
    expect = spark.createDataFrame([("active", "boolean", "alter_column")], schema)
    chispa.assert_df_equality(
        diff,
        expect,
        ignore_row_order=True,
    )


Comments

0

A compact drop-in to quickly compare complex schemas.

def display_schema_diff(df_or_schema1, df_or_schema2, keep_col_order=False):
    import contextlib, difflib, io, json
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, ArrayType, MapType

    def sort_schema(schema: StructType) -> StructType:
        if isinstance(schema, StructType):
            return StructType(
                [
                    StructField(f.name, sort_schema(f.dataType), f.nullable, f.metadata)
                    for f in sorted(schema, key=lambda x: x.name)
                ]
            )
        if isinstance(schema, ArrayType):
            return ArrayType(sort_schema(schema.elementType), schema.containsNull)
        if isinstance(schema, MapType):
            return MapType(
                sort_schema(schema.keyType),
                sort_schema(schema.valueType),
                schema.valueContainsNull,
            )

        return schema

    def prep_schema(df_or_schema) -> StructType:
        if not isinstance(df_or_schema, StructType):
            df_or_schema = df_or_schema.schema

        if not keep_col_order:
            df_or_schema = sort_schema(df_or_schema)

        return df_or_schema

    def schema_str(schema: StructType) -> str:
        buf = io.StringIO()
        with contextlib.redirect_stdout(buf):
            SparkSession.getActiveSession().createDataFrame([], schema).printSchema()

        return buf.getvalue()

    diff_html = difflib.HtmlDiff().make_file(
        schema_str(prep_schema(df_or_schema1)).splitlines(),
        schema_str(prep_schema(df_or_schema2)).splitlines(),
    )

    displayHTML(diff_html)

I'm using it in Databricks, but define your own displayHTML if you're working with Jupyter notebook:

def displayHTML(html: str):
    from IPython.display import display, HTML

    display(HTML(html))

Example output: enter image description here

Comments

-3

you can simply use

df1.printSchema() == df2.printSchema()

1 Comment

Hi Pytrick, no, i don't think so your solution will work. If you do that your comparing the address of those objects, not the content. stackoverflow.com/questions/53162285/…

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.