diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2433e22b..61b883c7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,4 +1,4 @@ -name: Build +name: Build3 on: pull_request: @@ -17,12 +17,15 @@ jobs: spark: 3 name: Scala ${{ matrix.scala }}, Spark ${{ matrix.spark }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 + with: + persist-credentials: false - name: Set up JDK 1.8 - uses: actions/setup-java@v1 + uses: actions/setup-java@v2 with: - java-version: 1.8 - - uses: actions/cache@v2 + distribution: 'adopt' + java-version: 8 + - uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-${{ matrix.scala }}-${{ matrix.spark }}-${{ hashFiles('**/pom.xml') }} diff --git a/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala b/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala index 482558d4..d448cda2 100644 --- a/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala +++ b/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala @@ -17,9 +17,10 @@ package za.co.absa.hyperdrive.compatibility.api import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex +import org.apache.spark.sql.types.StructType trait CompatibleSparkUtil { - def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex + def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex def hasMetadata(spark: SparkSession, destination: String): Boolean def jsonStringToObject(jsonString: String): Object def objectToJsonString(obj: Object): Option[String] diff --git a/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala b/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala index 6ada93b9..ad3b346f 100644 --- a/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala +++ b/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala @@ -17,12 +17,13 @@ package za.co.absa.hyperdrive.compatibility.provider import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex +import org.apache.spark.sql.types.StructType import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil import za.co.absa.hyperdrive.compatibility.impl.SparkUtil object CompatibleSparkUtilProvider extends CompatibleSparkUtil { - def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - SparkUtil.createMetadataLogFileIndex(spark, destination) + def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + SparkUtil.createMetadataLogFileIndex(spark, destination, userSpecifiedSchema) def hasMetadata(spark: SparkSession, destination: String): Boolean = SparkUtil.hasMetadata(spark, destination) diff --git a/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala b/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala index 91fc1409..4c1f15ea 100644 --- a/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala +++ b/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala @@ -20,6 +20,7 @@ import org.apache.avro.util.internal.JacksonUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.types.StructType import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil import java.io.ByteArrayOutputStream @@ -27,8 +28,8 @@ import java.io.ByteArrayOutputStream object SparkUtil extends CompatibleSparkUtil { private lazy val objectMapper = new ObjectMapper() - override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - new MetadataLogFileIndex(spark, new Path(destination), Map.empty, None) + override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + new MetadataLogFileIndex(spark, new Path(destination), Map.empty, userSpecifiedSchema) override def hasMetadata(spark: SparkSession, destination: String): Boolean = FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration, spark.sessionState.conf) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala index 92fb79c9..36b468ff 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala @@ -16,16 +16,21 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion import org.apache.commons.configuration2.Configuration +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex import org.slf4j.LoggerFactory import org.apache.spark.sql.functions.{lit, to_date} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter +import org.apache.spark.sql.types.{DateType, StructField} import java.time.LocalDate import java.time.format.DateTimeFormatter +import org.apache.spark.sql.types._ private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer { @@ -44,22 +49,31 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val if (noCommittedParquetFilesExist(spark)) { initialVersion } else { - import spark.implicits._ - val df = spark.read.parquet(destination) - val versions = df.select(df(ColumnVersion)) - .filter(df(ColumnDate) === lit(reportDate)) - .distinct() - .as[Int] - .collect().toList - + val versions = getVersions(spark, ColumnDate, ColumnVersion, reportDate) if (versions.nonEmpty) versions.max + 1 else initialVersion } } private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = { - val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination) + val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, None) !CompatibleSparkUtilProvider.hasMetadata(spark, destination) || fileCatalog.allFiles().isEmpty } + + private def getVersions(spark: SparkSession, ColumnDate: String, ColumnVersion: String, reportDate: String): Seq[Int] = { + val fileCatalog: MetadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, Some(StructType(Seq( + StructField(ColumnDate, StringType, nullable = true), + StructField(ColumnVersion, IntegerType, nullable = true) + )))) + + fileCatalog.partitionSpec().partitions + .map { partition => + val row: InternalRow = partition.values + (row.getString(0), row.getInt(1)) + } + .filter { case (date, _) => date == reportDate } + .map { case (_, version) => version } + .toList + } } object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes {