From ca0e362748754e94d635ea6b65baeb55bf86ebed Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Mon, 19 May 2025 14:15:27 +0200 Subject: [PATCH 1/9] Use MetadataLogFileIndex to get latest version in AddDateVersionTransformer --- .../api/CompatibleSparkUtil.scala | 3 +- .../CompatibleSparkUtilProvider.scala | 5 ++-- .../compatibility/impl/SparkUtil.scala | 5 ++-- .../AddDateVersionTransformer.scala | 28 +++++++++++++------ 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala b/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala index 482558d4..d448cda2 100644 --- a/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala +++ b/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala @@ -17,9 +17,10 @@ package za.co.absa.hyperdrive.compatibility.api import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex +import org.apache.spark.sql.types.StructType trait CompatibleSparkUtil { - def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex + def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex def hasMetadata(spark: SparkSession, destination: String): Boolean def jsonStringToObject(jsonString: String): Object def objectToJsonString(obj: Object): Option[String] diff --git a/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala b/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala index 6ada93b9..ad3b346f 100644 --- a/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala +++ b/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala @@ -17,12 +17,13 @@ package za.co.absa.hyperdrive.compatibility.provider import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex +import org.apache.spark.sql.types.StructType import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil import za.co.absa.hyperdrive.compatibility.impl.SparkUtil object CompatibleSparkUtilProvider extends CompatibleSparkUtil { - def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - SparkUtil.createMetadataLogFileIndex(spark, destination) + def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + SparkUtil.createMetadataLogFileIndex(spark, destination, userSpecifiedSchema) def hasMetadata(spark: SparkSession, destination: String): Boolean = SparkUtil.hasMetadata(spark, destination) diff --git a/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala b/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala index 91fc1409..4c1f15ea 100644 --- a/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala +++ b/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala @@ -20,6 +20,7 @@ import org.apache.avro.util.internal.JacksonUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.types.StructType import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil import java.io.ByteArrayOutputStream @@ -27,8 +28,8 @@ import java.io.ByteArrayOutputStream object SparkUtil extends CompatibleSparkUtil { private lazy val objectMapper = new ObjectMapper() - override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - new MetadataLogFileIndex(spark, new Path(destination), Map.empty, None) + override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + new MetadataLogFileIndex(spark, new Path(destination), Map.empty, userSpecifiedSchema) override def hasMetadata(spark: SparkSession, destination: String): Boolean = FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration, spark.sessionState.conf) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala index 92fb79c9..2af2fc09 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala @@ -16,16 +16,21 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion import org.apache.commons.configuration2.Configuration +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex import org.slf4j.LoggerFactory import org.apache.spark.sql.functions.{lit, to_date} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter +import org.apache.spark.sql.types.{DateType, StructField} import java.time.LocalDate import java.time.format.DateTimeFormatter +import org.apache.spark.sql.types._ private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer { @@ -44,22 +49,27 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val if (noCommittedParquetFilesExist(spark)) { initialVersion } else { - import spark.implicits._ - val df = spark.read.parquet(destination) - val versions = df.select(df(ColumnVersion)) - .filter(df(ColumnDate) === lit(reportDate)) - .distinct() - .as[Int] - .collect().toList - + val versions = getVersions(spark, ColumnDate, ColumnVersion, reportDate) if (versions.nonEmpty) versions.max + 1 else initialVersion } } private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = { - val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination) + val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, None) !CompatibleSparkUtilProvider.hasMetadata(spark, destination) || fileCatalog.allFiles().isEmpty } + + private def getVersions(spark: SparkSession, ColumnDate: String, ColumnVersion: String, reportDate: String): Seq[Int] = { + val fileCatalog: MetadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, Some(StructType(Seq( + StructField(ColumnDate, StringType, nullable = true), + StructField(ColumnVersion, IntegerType, nullable = true) + )))) + + fileCatalog.partitionSpec().partitions.map { partition => + val row: InternalRow = partition.values + (row.get(0, DateType), row.getInt(1)) + }.filter { case (date, _) => date.toString == reportDate }.map { case (_, version) => version }.toList + } } object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes { From 30b931a71a12aa85b12412b0bb85359c1410c3f1 Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Mon, 19 May 2025 14:18:23 +0200 Subject: [PATCH 2/9] Fix --- .../transformer/dateversion/AddDateVersionTransformer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala index 2af2fc09..1bd0e421 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala @@ -67,8 +67,8 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val fileCatalog.partitionSpec().partitions.map { partition => val row: InternalRow = partition.values - (row.get(0, DateType), row.getInt(1)) - }.filter { case (date, _) => date.toString == reportDate }.map { case (_, version) => version }.toList + (row.getString(0), row.getInt(1)) + }.filter { case (date, _) => date == reportDate }.map { case (_, version) => version }.toList } } From e0f18055eb761f6399f04423b64c62a77733530d Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:16:41 +0200 Subject: [PATCH 3/9] Update checkout action --- .github/workflows/build.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2433e22b..6519995d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,7 +17,9 @@ jobs: spark: 3 name: Scala ${{ matrix.scala }}, Spark ${{ matrix.spark }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 + with: + persist-credentials: false - name: Set up JDK 1.8 uses: actions/setup-java@v1 with: From 9f1a89724b74a2cc48184d72ba95f148539ef326 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:20:09 +0200 Subject: [PATCH 4/9] Update cache action --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6519995d..186414e1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -24,7 +24,7 @@ jobs: uses: actions/setup-java@v1 with: java-version: 1.8 - - uses: actions/cache@v2 + - uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-${{ matrix.scala }}-${{ matrix.spark }}-${{ hashFiles('**/pom.xml') }} From 8f471ec1271abed0307cb2338e62299033c4f8c2 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:23:19 +0200 Subject: [PATCH 5/9] Fix format --- .../dateversion/AddDateVersionTransformer.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala index 1bd0e421..36b468ff 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala @@ -65,10 +65,14 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val StructField(ColumnVersion, IntegerType, nullable = true) )))) - fileCatalog.partitionSpec().partitions.map { partition => - val row: InternalRow = partition.values - (row.getString(0), row.getInt(1)) - }.filter { case (date, _) => date == reportDate }.map { case (_, version) => version }.toList + fileCatalog.partitionSpec().partitions + .map { partition => + val row: InternalRow = partition.values + (row.getString(0), row.getInt(1)) + } + .filter { case (date, _) => date == reportDate } + .map { case (_, version) => version } + .toList } } From 0168700e0e731c8fac21a2b985a3b517705a8349 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:28:36 +0200 Subject: [PATCH 6/9] Some change --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 186414e1..3d471cfb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,4 +1,4 @@ -name: Build +name: Build2 on: pull_request: From c54175373cd41193cf12e770fc690df823ee6067 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:35:38 +0200 Subject: [PATCH 7/9] Use adopt because zulu fails with cloudflare --- .github/workflows/build.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3d471cfb..cd54bcd7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,4 +1,4 @@ -name: Build2 +name: Build on: pull_request: @@ -21,8 +21,9 @@ jobs: with: persist-credentials: false - name: Set up JDK 1.8 - uses: actions/setup-java@v1 + uses: actions/setup-java@v2 with: + distribution: 'adopt' java-version: 1.8 - uses: actions/cache@v4 with: From e65c3279802e20fd1dab16525392baafcf003a58 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:36:39 +0200 Subject: [PATCH 8/9] some change --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cd54bcd7..33121a13 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,4 +1,4 @@ -name: Build +name: Build3 on: pull_request: From 61a256177da448fe7426c5bcabb2fe29f73b5de4 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 27 May 2025 10:37:42 +0200 Subject: [PATCH 9/9] Fix java version --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 33121a13..61b883c7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -24,7 +24,7 @@ jobs: uses: actions/setup-java@v2 with: distribution: 'adopt' - java-version: 1.8 + java-version: 8 - uses: actions/cache@v4 with: path: ~/.m2/repository