From 80fbbaee61446b0245dd7b07fde49a594bf01ff3 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 25 Feb 2026 10:03:24 +0100 Subject: [PATCH 1/4] #710 Make the schema change renderer nested schema friendly. --- .../absa/pramen/core/utils/SparkUtils.scala | 97 +++++++++++++------ .../core/tests/utils/SparkUtilsSuite.scala | 60 ++++++++++++ 2 files changed, 129 insertions(+), 28 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index fb180378..c583c0bc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -32,7 +32,7 @@ import za.co.absa.pramen.core.utils.SparkMaster.Databricks import java.io.ByteArrayOutputStream import java.time.format.DateTimeFormatter import java.time.{Instant, LocalDate} -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.reflect.runtime.universe._ import scala.util.{Failure, Success, Try} @@ -157,43 +157,84 @@ object SparkUtils { /** * Compares 2 schemas. */ - def compareSchemas(schema1: StructType, schema2: StructType): List[FieldChange] = { + def compareSchemas(schemaA: StructType, schemaB: StructType): List[FieldChange] = { + val newFields = new ListBuffer[FieldChange] + val deletedFields = new ListBuffer[FieldChange] + val changedFields = new ListBuffer[FieldChange] + def dataTypeToString(dt: DataType, metadata: Metadata): String = { val maxLength = getLengthFromMetadata(metadata).getOrElse(0) dt match { - case _: StructType | _: ArrayType => dt.simpleString - case _: StringType if maxLength > 0 => s"varchar($maxLength)" - case _ => dt.typeName + case a: ArrayType if a.elementType.isInstanceOf[StructType] => "array>" + case a: ArrayType => s"array<${a.elementType.typeName}>" + case _: StructType => "struct<...>" + case _: StringType if maxLength > 0 => s"varchar($maxLength)" + case _ => dt.typeName } } - val fields1 = schema1.fields.map(f => (f.name, f)).toMap - val fields2 = schema2.fields.map(f => (f.name, f)).toMap - - val newColumns: Array[FieldChange] = schema2.fields - .filter(f => !fields1.contains(f.name)) - .map(f => FieldChange.NewField(f.name, dataTypeToString(f.dataType, f.metadata))) - - val deletedColumns: Array[FieldChange] = schema1.fields - .filter(f => !fields2.contains(f.name)) - .map(f => FieldChange.DeletedField(f.name, dataTypeToString(f.dataType, f.metadata))) + def processStruct(schema1: StructType, schema2: StructType, path: String = ""): Unit = { + val fields1 = schema1.fields.map(f => (f.name.toLowerCase, f)).toMap + val fields2 = schema2.fields.map(f => (f.name.toLowerCase, f)).toMap + + val newColumns: Array[FieldChange] = schema2.fields + .filter(f => !fields1.contains(f.name)) + .map(f => FieldChange.NewField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata))) + + val deletedColumns: Array[FieldChange] = schema1.fields + .filter(f => !fields2.contains(f.name)) + .map(f => FieldChange.DeletedField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata))) + + val changedType: Array[FieldChange] = schema1.fields + .filter(f => fields2.contains(f.name)) + .flatMap(f1 => { + val f2 = fields2(f1.name) + + (f1.dataType, f2.dataType) match { + case (st1: StructType, st2: StructType) => + processStruct(st1, st2, s"$path${f1.name}.") + Seq.empty + case (ar1: ArrayType, ar2: ArrayType) => + processArray(ar1, ar2, f1.metadata, f2.metadata, s"$path${f1.name}") + Seq.empty + case _ => + val dt1 = dataTypeToString(f1.dataType, f1.metadata) + val dt2 = dataTypeToString(f2.dataType, f2.metadata) + + if (dt1 == dt2) { + Seq.empty[FieldChange] + } else { + Seq(FieldChange.ChangedType(s"$path${f1.name}", dt1, dt2)) + } + } + }) + newFields ++= newColumns + deletedFields ++= deletedColumns + changedFields ++= changedType + } - val changedType: Array[FieldChange] = schema1.fields - .filter(f => fields2.contains(f.name)) - .flatMap(f1 => { - val dt1 = dataTypeToString(f1.dataType, f1.metadata) - val f2 = fields2(f1.name) - val dt2 = dataTypeToString(f2.dataType, f2.metadata) + def processArray(array1: ArrayType, array2: ArrayType, metadata1: Metadata, metadata2: Metadata, path: String = ""): Unit = { + (array1.elementType, array2.elementType) match { + case (st1: StructType, st2: StructType) => + processStruct(st1, st2, s"$path[].") + Seq.empty + case (ar1: ArrayType, ar2: ArrayType) => + processArray(ar1, ar2, metadata1, metadata2, s"$path[]") + Seq.empty + case _ => + val dt1 = dataTypeToString(array1, metadata1) + val dt2 = dataTypeToString(array2, metadata2) - if (dt1 == dt2) { - Seq.empty[FieldChange] - } else { - Seq(FieldChange.ChangedType(f1.name, dt1, dt2)) - } - }) + if (dt1 != dt2) { + changedFields += FieldChange.ChangedType(path, dt1, dt2) + } + } + } - (newColumns ++ deletedColumns ++ changedType).toList + processStruct(schemaA, schemaB) + val allChanges = newFields ++ deletedFields ++ changedFields + allChanges.toList } /** diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index 46d49104..9a211e0b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -287,6 +287,66 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture assert(diff.head.asInstanceOf[ChangedType].oldType == "varchar(10)") assert(diff.head.asInstanceOf[ChangedType].newType == "varchar(15)") } + + "detect nested type changes" in { + val schema1 = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("name", StringType, nullable = true), + StructField("address", StructType(Seq( + StructField("street", StringType, nullable = true), + StructField("city", StringType, nullable = true) + ))), + StructField("tags", ArrayType(StringType, containsNull = true), nullable = true), + StructField("phones", ArrayType(StructType(Seq( + StructField("type", StringType, nullable = true), + StructField("number", IntegerType, nullable = true) + )), containsNull = true), nullable = true), + StructField("error_info", StructType(Seq( + StructField("reason", StringType, nullable = true), + StructField("value", StringType, nullable = true) + ))), + )) + + val schema2 = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("name", StringType, nullable = true), + StructField("address", StructType(Seq( + StructField("street", StringType, nullable = true), + StructField("city", LongType, nullable = true), + StructField("state", StringType, nullable = true) + ))), + StructField("tags", ArrayType(IntegerType, containsNull = true), nullable = true), + StructField("phones", ArrayType(StructType(Seq( + StructField("type", StringType, nullable = true), + StructField("number", StringType, nullable = true), + StructField("country", StringType, nullable = true) + )), containsNull = true), nullable = true), + StructField("additional_properties", ArrayType(StructType(Seq( + StructField("key", StringType, nullable = true), + StructField("value", StringType, nullable = true) + )), containsNull = true), nullable = true) + )) + + val diff = compareSchemas(schema1, schema2) + + assert(diff.length == 7) + assert(diff.count(_.isInstanceOf[ChangedType]) == 3) + assert(diff.count(_.isInstanceOf[NewField]) == 3) + assert(diff.count(_.isInstanceOf[DeletedField]) == 1) + + val changedTypes = diff.collect { case ct: ChangedType => ct } + assert(changedTypes.exists(ct => ct.columnName == "address.city" && ct.oldType == "string" && ct.newType == "long")) + assert(changedTypes.exists(ct => ct.columnName == "tags" && ct.oldType == "array" && ct.newType == "array")) + assert(changedTypes.exists(ct => ct.columnName == "phones[].number" && ct.oldType == "integer" && ct.newType == "string")) + + val newFields = diff.collect { case nf: NewField => nf } + assert(newFields.exists(nf => nf.columnName == "address.state" && nf.dataType == "string")) + assert(newFields.exists(nf => nf.columnName == "phones[].country" && nf.dataType == "string")) + assert(newFields.exists(nf => nf.columnName == "additional_properties" && nf.dataType == "array>")) + + val deletedFields = diff.collect { case df: DeletedField => df } + assert(deletedFields.exists(df => df.columnName == "error_info" && df.dataType == "struct<...>")) + } } "applyTransformations" should { From 790a100893248bdf0403000050d7f47083ec2063 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 25 Feb 2026 10:20:08 +0100 Subject: [PATCH 2/4] #710 Make the schema change comparison case sensitive for notification purposes. We want to know when schema changes even if the change is casing of fields. --- .../scala/za/co/absa/pramen/core/utils/SparkUtils.scala | 6 ++---- .../co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index c583c0bc..433dcfc6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -175,8 +175,8 @@ object SparkUtils { } def processStruct(schema1: StructType, schema2: StructType, path: String = ""): Unit = { - val fields1 = schema1.fields.map(f => (f.name.toLowerCase, f)).toMap - val fields2 = schema2.fields.map(f => (f.name.toLowerCase, f)).toMap + val fields1 = schema1.fields.map(f => (f.name, f)).toMap + val fields2 = schema2.fields.map(f => (f.name, f)).toMap val newColumns: Array[FieldChange] = schema2.fields .filter(f => !fields1.contains(f.name)) @@ -218,10 +218,8 @@ object SparkUtils { (array1.elementType, array2.elementType) match { case (st1: StructType, st2: StructType) => processStruct(st1, st2, s"$path[].") - Seq.empty case (ar1: ArrayType, ar2: ArrayType) => processArray(ar1, ar2, metadata1, metadata2, s"$path[]") - Seq.empty case _ => val dt1 = dataTypeToString(array1, metadata1) val dt2 = dataTypeToString(array2, metadata2) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index 9a211e0b..fbdf6eac 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -304,7 +304,7 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture StructField("error_info", StructType(Seq( StructField("reason", StringType, nullable = true), StructField("value", StringType, nullable = true) - ))), + ))) )) val schema2 = StructType(Seq( From 484fea4a6faeeabcd53f0e48c9248e3e81c13ce0 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 25 Feb 2026 11:21:37 +0100 Subject: [PATCH 3/4] #710 Make schema comparison case-sensitive for notification purposes. --- .../core/tests/utils/SparkUtilsSuite.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index fbdf6eac..9f28a67a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -308,7 +308,7 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture )) val schema2 = StructType(Seq( - StructField("id", IntegerType, nullable = false), + StructField("Id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("address", StructType(Seq( StructField("street", StringType, nullable = true), @@ -329,23 +329,25 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture val diff = compareSchemas(schema1, schema2) - assert(diff.length == 7) + assert(diff.length == 9) assert(diff.count(_.isInstanceOf[ChangedType]) == 3) - assert(diff.count(_.isInstanceOf[NewField]) == 3) - assert(diff.count(_.isInstanceOf[DeletedField]) == 1) + assert(diff.count(_.isInstanceOf[NewField]) == 4) + assert(diff.count(_.isInstanceOf[DeletedField]) == 2) val changedTypes = diff.collect { case ct: ChangedType => ct } - assert(changedTypes.exists(ct => ct.columnName == "address.city" && ct.oldType == "string" && ct.newType == "long")) - assert(changedTypes.exists(ct => ct.columnName == "tags" && ct.oldType == "array" && ct.newType == "array")) - assert(changedTypes.exists(ct => ct.columnName == "phones[].number" && ct.oldType == "integer" && ct.newType == "string")) + assert(changedTypes.exists(c => c.columnName == "address.city" && c.oldType == "string" && c.newType == "long")) + assert(changedTypes.exists(c => c.columnName == "tags" && c.oldType == "array" && c.newType == "array")) + assert(changedTypes.exists(c => c.columnName == "phones[].number" && c.oldType == "integer" && c.newType == "string")) val newFields = diff.collect { case nf: NewField => nf } - assert(newFields.exists(nf => nf.columnName == "address.state" && nf.dataType == "string")) - assert(newFields.exists(nf => nf.columnName == "phones[].country" && nf.dataType == "string")) - assert(newFields.exists(nf => nf.columnName == "additional_properties" && nf.dataType == "array>")) + assert(newFields.exists(n => n.columnName == "Id" && n.dataType == "integer")) + assert(newFields.exists(n => n.columnName == "address.state" && n.dataType == "string")) + assert(newFields.exists(n => n.columnName == "phones[].country" && n.dataType == "string")) + assert(newFields.exists(n => n.columnName == "additional_properties" && n.dataType == "array>")) val deletedFields = diff.collect { case df: DeletedField => df } - assert(deletedFields.exists(df => df.columnName == "error_info" && df.dataType == "struct<...>")) + assert(deletedFields.exists(d => d.columnName == "id" && d.dataType == "integer")) + assert(deletedFields.exists(d => d.columnName == "error_info" && d.dataType == "struct<...>")) } } From eac31883b0b2f8707cc50eaae56c9d68fb20e293 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 25 Feb 2026 11:46:52 +0100 Subject: [PATCH 4/4] #710 Tidy up the code for Spark schema comparison. --- .../absa/pramen/core/utils/SparkUtils.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index 433dcfc6..99f3a45d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -155,7 +155,12 @@ object SparkUtils { } /** - * Compares 2 schemas. + * Compares two schemas represented as `StructType` and identifies the differences + * between them, such as newly added fields, deleted fields, or fields with changed types. + * + * @param schemaA the first schema to compare + * @param schemaB the second schema to compare + * @return a list of `FieldChange` that represents the differences between the two schemas */ def compareSchemas(schemaA: StructType, schemaB: StructType): List[FieldChange] = { val newFields = new ListBuffer[FieldChange] @@ -181,37 +186,32 @@ object SparkUtils { val newColumns: Array[FieldChange] = schema2.fields .filter(f => !fields1.contains(f.name)) .map(f => FieldChange.NewField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata))) + newFields ++= newColumns val deletedColumns: Array[FieldChange] = schema1.fields .filter(f => !fields2.contains(f.name)) .map(f => FieldChange.DeletedField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata))) + deletedFields ++= deletedColumns - val changedType: Array[FieldChange] = schema1.fields + schema1.fields .filter(f => fields2.contains(f.name)) - .flatMap(f1 => { + .foreach(f1 => { val f2 = fields2(f1.name) (f1.dataType, f2.dataType) match { case (st1: StructType, st2: StructType) => processStruct(st1, st2, s"$path${f1.name}.") - Seq.empty case (ar1: ArrayType, ar2: ArrayType) => processArray(ar1, ar2, f1.metadata, f2.metadata, s"$path${f1.name}") - Seq.empty case _ => val dt1 = dataTypeToString(f1.dataType, f1.metadata) val dt2 = dataTypeToString(f2.dataType, f2.metadata) - if (dt1 == dt2) { - Seq.empty[FieldChange] - } else { - Seq(FieldChange.ChangedType(s"$path${f1.name}", dt1, dt2)) + if (dt1 != dt2) { + changedFields += FieldChange.ChangedType(s"$path${f1.name}", dt1, dt2) } } }) - newFields ++= newColumns - deletedFields ++= deletedColumns - changedFields ++= changedType } def processArray(array1: ArrayType, array2: ArrayType, metadata1: Metadata, metadata2: Metadata, path: String = ""): Unit = {