Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import za.co.absa.pramen.core.utils.SparkMaster.Databricks
import java.io.ByteArrayOutputStream
import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -155,45 +155,84 @@ object SparkUtils {
}

/**
* Compares 2 schemas.
* Compares two schemas represented as `StructType` and identifies the differences
* between them, such as newly added fields, deleted fields, or fields with changed types.
*
* @param schemaA the first schema to compare
* @param schemaB the second schema to compare
* @return a list of `FieldChange` that represents the differences between the two schemas
*/
def compareSchemas(schema1: StructType, schema2: StructType): List[FieldChange] = {
def compareSchemas(schemaA: StructType, schemaB: StructType): List[FieldChange] = {
val newFields = new ListBuffer[FieldChange]
val deletedFields = new ListBuffer[FieldChange]
val changedFields = new ListBuffer[FieldChange]

def dataTypeToString(dt: DataType, metadata: Metadata): String = {
val maxLength = getLengthFromMetadata(metadata).getOrElse(0)

dt match {
case _: StructType | _: ArrayType => dt.simpleString
case _: StringType if maxLength > 0 => s"varchar($maxLength)"
case _ => dt.typeName
case a: ArrayType if a.elementType.isInstanceOf[StructType] => "array<struct<...>>"
case a: ArrayType => s"array<${a.elementType.typeName}>"
case _: StructType => "struct<...>"
Comment on lines +174 to +176
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Apache Spark Scala API ArrayType nested typeName catalogString simpleString behavior documentation

💡 Result:

In Spark’s Scala API (org.apache.spark.sql.types), these methods behave as follows for ArrayType (including nested arrays):

  • typeName: String
    Returns the JSON type name for the data type. For ArrayType, this is just "array" (it does not include the element type, even when nested). [1][2]

  • simpleString: String
    Returns a readable type string and recursively uses the nested type’s simpleString. For an array, it renders like:
    array<elementTypeSimpleString>
    Example: ArrayType(ArrayType(IntegerType))array<array<int>>. [1][2]

  • catalogString: String
    Returns the string representation used when saving the type in external catalogs and recursively uses nested types’ catalogString. For an array, it renders like:
    array<elementTypeCatalogString>
    Example: ArrayType(StructType(...))array<struct<...>>. [1][2]

  • SQL docs alignment
    Spark SQL documentation describes ArrayType as ARRAY<element_type>, matching the angle-bracket form used by simpleString / catalogString. [3]

Sources: Spark DataType API docs (defines typeName, simpleString, catalogString) [1], Spark ScalaDoc for ArrayType [2], Spark SQL data types reference [3].


🏁 Script executed:

cd pramen/core/src/main/scala/za/co/absa/pramen/core/utils && sed -n '165,190p' SparkUtils.scala

Repository: AbsaOSS/pramen

Length of output: 1387


Preserve nested array detail in rendered type strings.

Using a.elementType.typeName returns just "array" for nested arrays, collapsing type detail (e.g., array<array<int>> becomes array<array>). This reduces fidelity of new/deleted type notifications.

💡 Proposed fix
     def dataTypeToString(dt: DataType, metadata: Metadata): String = {
       val maxLength = getLengthFromMetadata(metadata).getOrElse(0)

       dt match {
-        case a: ArrayType if a.elementType.isInstanceOf[StructType] => "array<struct<...>>"
-        case a: ArrayType                                           => s"array<${a.elementType.typeName}>"
+        case a: ArrayType                                           => s"array<${dataTypeToString(a.elementType, metadata)}>"
         case _: StructType                                          => "struct<...>"
         case _: StringType if maxLength > 0                         => s"varchar($maxLength)"
         case _                                                      => dt.typeName
       }
     }

The recursive call handles both nested arrays and struct arrays correctly, consolidating the special case.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala`
around lines 174 - 176, The array handling in the pattern match (the cases
matching "case a: ArrayType if a.elementType.isInstanceOf[StructType]" and "case
a: ArrayType => s\"array<${a.elementType.typeName}>\"") loses nested array
detail by using a.elementType.typeName; update the second ArrayType branch to
render the element type recursively using the same renderer used for other
branches (i.e., call the surrounding type-to-string helper on a.elementType
instead of .typeName) so nested arrays and struct arrays produce full strings
like "array<array<int>>" or "array<struct<...>>".

case _: StringType if maxLength > 0 => s"varchar($maxLength)"
case _ => dt.typeName
}
}

val fields1 = schema1.fields.map(f => (f.name, f)).toMap
val fields2 = schema2.fields.map(f => (f.name, f)).toMap

val newColumns: Array[FieldChange] = schema2.fields
.filter(f => !fields1.contains(f.name))
.map(f => FieldChange.NewField(f.name, dataTypeToString(f.dataType, f.metadata)))

val deletedColumns: Array[FieldChange] = schema1.fields
.filter(f => !fields2.contains(f.name))
.map(f => FieldChange.DeletedField(f.name, dataTypeToString(f.dataType, f.metadata)))
def processStruct(schema1: StructType, schema2: StructType, path: String = ""): Unit = {
val fields1 = schema1.fields.map(f => (f.name, f)).toMap
val fields2 = schema2.fields.map(f => (f.name, f)).toMap

val newColumns: Array[FieldChange] = schema2.fields
.filter(f => !fields1.contains(f.name))
.map(f => FieldChange.NewField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata)))
newFields ++= newColumns

val deletedColumns: Array[FieldChange] = schema1.fields
.filter(f => !fields2.contains(f.name))
.map(f => FieldChange.DeletedField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata)))
deletedFields ++= deletedColumns

schema1.fields
.filter(f => fields2.contains(f.name))
.foreach(f1 => {
val f2 = fields2(f1.name)

(f1.dataType, f2.dataType) match {
case (st1: StructType, st2: StructType) =>
processStruct(st1, st2, s"$path${f1.name}.")
case (ar1: ArrayType, ar2: ArrayType) =>
processArray(ar1, ar2, f1.metadata, f2.metadata, s"$path${f1.name}")
case _ =>
val dt1 = dataTypeToString(f1.dataType, f1.metadata)
val dt2 = dataTypeToString(f2.dataType, f2.metadata)

if (dt1 != dt2) {
changedFields += FieldChange.ChangedType(s"$path${f1.name}", dt1, dt2)
}
}
})
}

val changedType: Array[FieldChange] = schema1.fields
.filter(f => fields2.contains(f.name))
.flatMap(f1 => {
val dt1 = dataTypeToString(f1.dataType, f1.metadata)
val f2 = fields2(f1.name)
val dt2 = dataTypeToString(f2.dataType, f2.metadata)
def processArray(array1: ArrayType, array2: ArrayType, metadata1: Metadata, metadata2: Metadata, path: String = ""): Unit = {
(array1.elementType, array2.elementType) match {
case (st1: StructType, st2: StructType) =>
processStruct(st1, st2, s"$path[].")
case (ar1: ArrayType, ar2: ArrayType) =>
processArray(ar1, ar2, metadata1, metadata2, s"$path[]")
case _ =>
val dt1 = dataTypeToString(array1, metadata1)
val dt2 = dataTypeToString(array2, metadata2)

if (dt1 == dt2) {
Seq.empty[FieldChange]
} else {
Seq(FieldChange.ChangedType(f1.name, dt1, dt2))
}
})
if (dt1 != dt2) {
changedFields += FieldChange.ChangedType(path, dt1, dt2)
}
}
}

(newColumns ++ deletedColumns ++ changedType).toList
processStruct(schemaA, schemaB)
val allChanges = newFields ++ deletedFields ++ changedFields
allChanges.toList
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,68 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
assert(diff.head.asInstanceOf[ChangedType].oldType == "varchar(10)")
assert(diff.head.asInstanceOf[ChangedType].newType == "varchar(15)")
}

"detect nested type changes" in {
val schema1 = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("address", StructType(Seq(
StructField("street", StringType, nullable = true),
StructField("city", StringType, nullable = true)
))),
StructField("tags", ArrayType(StringType, containsNull = true), nullable = true),
StructField("phones", ArrayType(StructType(Seq(
StructField("type", StringType, nullable = true),
StructField("number", IntegerType, nullable = true)
)), containsNull = true), nullable = true),
StructField("error_info", StructType(Seq(
StructField("reason", StringType, nullable = true),
StructField("value", StringType, nullable = true)
)))
))

val schema2 = StructType(Seq(
StructField("Id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("address", StructType(Seq(
StructField("street", StringType, nullable = true),
StructField("city", LongType, nullable = true),
StructField("state", StringType, nullable = true)
))),
StructField("tags", ArrayType(IntegerType, containsNull = true), nullable = true),
StructField("phones", ArrayType(StructType(Seq(
StructField("type", StringType, nullable = true),
StructField("number", StringType, nullable = true),
StructField("country", StringType, nullable = true)
)), containsNull = true), nullable = true),
StructField("additional_properties", ArrayType(StructType(Seq(
StructField("key", StringType, nullable = true),
StructField("value", StringType, nullable = true)
)), containsNull = true), nullable = true)
))

val diff = compareSchemas(schema1, schema2)

assert(diff.length == 9)
assert(diff.count(_.isInstanceOf[ChangedType]) == 3)
assert(diff.count(_.isInstanceOf[NewField]) == 4)
assert(diff.count(_.isInstanceOf[DeletedField]) == 2)

val changedTypes = diff.collect { case ct: ChangedType => ct }
assert(changedTypes.exists(c => c.columnName == "address.city" && c.oldType == "string" && c.newType == "long"))
assert(changedTypes.exists(c => c.columnName == "tags" && c.oldType == "array<string>" && c.newType == "array<integer>"))
assert(changedTypes.exists(c => c.columnName == "phones[].number" && c.oldType == "integer" && c.newType == "string"))

val newFields = diff.collect { case nf: NewField => nf }
assert(newFields.exists(n => n.columnName == "Id" && n.dataType == "integer"))
assert(newFields.exists(n => n.columnName == "address.state" && n.dataType == "string"))
assert(newFields.exists(n => n.columnName == "phones[].country" && n.dataType == "string"))
assert(newFields.exists(n => n.columnName == "additional_properties" && n.dataType == "array<struct<...>>"))

val deletedFields = diff.collect { case df: DeletedField => df }
assert(deletedFields.exists(d => d.columnName == "id" && d.dataType == "integer"))
assert(deletedFields.exists(d => d.columnName == "error_info" && d.dataType == "struct<...>"))
}
}

"applyTransformations" should {
Expand Down
Loading