Home » Tutorials » PySpark Tutorial » PySpark – unionByName()

PySpark – unionByName()

In Spark or PySpark let’s see how to merge/union two DataFrames with a different number of columns (different schema). In Spark 3.1, you can easily achieve this using unionByName() transformation by passing allowMissingColumns with the value true. In older version, this property is not available

#PySpark
merged_df = df1.unionByName(df2, allowMissingColumns=True)

The difference between unionByName() function and union() is that this function
resolves columns by name (not by position). In other words, unionByName() is used to merge two DataFrame’s by column names instead of by position.

In case if you are using older than Spark 3.1 version, use below approach to merge DataFrame’s with different column names.

Spark Merge Two DataFrames with Different Columns

In this section I will cover Spark with Scala example of how to merge two different DataFrames, first let’s create DataFrames with different number of columns. DataFrame df1 missing column state and salary and df2 missing column age.

//Create DataFrame df1 with columns name,dept & age
val data = Seq(("James","Sales",34), ("Michael","Sales",56),
               ("Robert","Sales",30), ("Maria","Finance",24) )
import spark.implicits._
val df1 = data.toDF("name","dept","age")
df1.printSchema()
#Output
//root
// |-- name: string (nullable = true)
// |-- dept: string (nullable = true)
// |-- age: long (nullable = true)

Second DataFrame

//Create DataFrame df1 with columns name,dep,state & salary
val data2=Seq(("James","Sales","NY",9000),("Maria","Finance","CA",9000),
              ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000))
val df2 = data2.toDF("name","dept","state","salary")
df2.printSchema()
#Output
//root
// |-- name: string (nullable = true)
// |-- dept: string (nullable = true)
// |-- state: string (nullable = true)
// |-- salary: long (nullable = true)

Now create a new DataFrames from existing after adding missing columns. newly added columns contains null values and we add constant column using lit() function.

val merged_cols = df1.columns.toSet ++ df2.columns.toSet
import org.apache.spark.sql.functions.{col,lit}
def getNewColumns(column: Set[String], merged_cols: Set[String]) = {
    merged_cols.toList.map(x => x match {
      case x if column.contains(x) => col(x)
      case _ => lit(null).as(x)
    })
}
val new_df1=df1.select(getNewColumns(df1.columns.toSet, merged_cols):_*)
val new_df2=df2.select(getNewColumns(df2.columns.toSet, merged_cols):_*)

Finally merge two DataFrame’s by using column names

//Finally join two dataframe's df1 & df2 by name
val merged_df=new_df1.unionByName(new_df2)
merged_df.show()
#Output
//+-------+---------+----+-----+------+
//|   name|     dept| age|state|salary|
//+-------+---------+----+-----+------+
//|  James|    Sales|  34| null|  null|
//|Michael|    Sales|  56| null|  null|
//| Robert|    Sales|  30| null|  null|
//|  Maria|  Finance|  24| null|  null|
//|  James|    Sales|null|   NY|  9000|
//|  Maria|  Finance|null|   CA|  9000|
//|    Jen|  Finance|null|   NY|  7900|
//|   Jeff|Marketing|null|   CA|  8000|
//+-------+---------+----+-----+------+

PySpark Merge Two DataFrames with Different Columns

In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let’s create DataFrame’s with different number of columns.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mytechmint').getOrCreate()

#Create DataFrame df1 with columns name,dept & age
data = [("James","Sales",34), ("Michael","Sales",56), \
    ("Robert","Sales",30), ("Maria","Finance",24) ]
columns= ["name","dept","age"]
df1 = spark.createDataFrame(data = data, schema = columns)
df1.printSchema()

#Create DataFrame df1 with columns name,dep,state & salary
data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000), \
    ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)]
columns2= ["name","dept","state","salary"]
df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.printSchema()

Now add the missing columns ‘state‘ and ‘salary‘ to df1 and ‘age‘ to df2 with null values.

#Add missing columns 'state' & 'salary' to df1
from pyspark.sql.functions import lit
for column in [column for column in df2.columns if column not in df1.columns]:
    df1 = df1.withColumn(column, lit(None))

#Add missing column 'age' to df2
for column in [column for column in df1.columns if column not in df2.columns]:
    df2 = df2.withColumn(column, lit(None))

Now merge/union the DataFrames using unionByName(). The difference between unionByName() function and union() is that this function
resolves columns by name (not by position). In other words, unionByName() is used to merge two DataFrame’s by column names instead of by position.

#Finally join two dataframe's df1 & df2 by name
merged_df=df1.unionByName(df2)
merged_df.show()

Conclusion

In this article, you have learned with PySpark examples of how to merge two DataFrames with different columns can be done by adding missing columns to the DataFrame’s and finally union them using unionByName().

Leave a Comment