diff --git a/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala b/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala index 482558d4..d448cda2 100644 --- a/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala +++ b/compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala @@ -17,9 +17,10 @@ package za.co.absa.hyperdrive.compatibility.api import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex +import org.apache.spark.sql.types.StructType trait CompatibleSparkUtil { - def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex + def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex def hasMetadata(spark: SparkSession, destination: String): Boolean def jsonStringToObject(jsonString: String): Object def objectToJsonString(obj: Object): Option[String] diff --git a/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala b/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala index 6ada93b9..ad3b346f 100644 --- a/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala +++ b/compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala @@ -17,12 +17,13 @@ package za.co.absa.hyperdrive.compatibility.provider import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex +import org.apache.spark.sql.types.StructType import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil import za.co.absa.hyperdrive.compatibility.impl.SparkUtil object CompatibleSparkUtilProvider extends CompatibleSparkUtil { - def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - SparkUtil.createMetadataLogFileIndex(spark, destination) + def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + SparkUtil.createMetadataLogFileIndex(spark, destination, userSpecifiedSchema) def hasMetadata(spark: SparkSession, destination: String): Boolean = SparkUtil.hasMetadata(spark, destination) diff --git a/compatibility-provider/src/test/scala/za/co/absa/hyperdrive/compatibility/provider/TestCompatibleSparkUtilProvider.scala b/compatibility-provider/src/test/scala/za/co/absa/hyperdrive/compatibility/provider/TestCompatibleSparkUtilProvider.scala new file mode 100644 index 00000000..e9e393dd --- /dev/null +++ b/compatibility-provider/src/test/scala/za/co/absa/hyperdrive/compatibility/provider/TestCompatibleSparkUtilProvider.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.compatibility.provider + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.commons.io.{TempDirectory, TempFile} +import za.co.absa.spark.commons.test.SparkTestBase + +class TestCompatibleSparkUtilProvider extends AnyFlatSpec with Matchers with SparkTestBase { + + "createMetadataLogFileIndex" should "return a metadata log file index" in { + val result = CompatibleSparkUtilProvider.createMetadataLogFileIndex( + spark, + TempDirectory("createMetadataLogFileIndex").path.toAbsolutePath.toString, + None + ) + + result shouldBe a[org.apache.spark.sql.execution.streaming.MetadataLogFileIndex] + } + + "hasMetadata" should "return true if metadata exists" in { + val result = CompatibleSparkUtilProvider.hasMetadata( + spark, + TempFile("hasMetadata").path.toAbsolutePath.toString + ) + + result shouldBe false + } + + "jsonStringToObject" should "return an object from a JSON string" in { + val jsonString = """{"key": "value"}""" + + val result = CompatibleSparkUtilProvider.jsonStringToObject(jsonString) + + import scala.collection.JavaConverters._ + result.asInstanceOf[java.util.Map[String, String]].asScala should contain("key" -> "value") + } + + "objectToJsonString" should "return a JSON string from an object" in { + import scala.collection.JavaConverters._ + val obj = Map("key" -> "value").asJava + + val result = CompatibleSparkUtilProvider.objectToJsonString(obj) + + result shouldBe defined + result.get shouldBe """{"key":"value"}""" + } +} diff --git a/compatibility_spark-2/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala b/compatibility_spark-2/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala index 42b6f3ed..15cfad6b 100644 --- a/compatibility_spark-2/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala +++ b/compatibility_spark-2/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala @@ -19,6 +19,7 @@ import org.apache.avro.util.internal.JacksonUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.types.StructType import org.codehaus.jackson.map.ObjectMapper import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil @@ -27,8 +28,8 @@ import java.io.ByteArrayOutputStream object SparkUtil extends CompatibleSparkUtil { private lazy val objectMapper = new ObjectMapper() - override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - new MetadataLogFileIndex(spark, new Path(destination), None) + override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + new MetadataLogFileIndex(spark, new Path(destination), userSpecifiedSchema) override def hasMetadata(spark: SparkSession, destination: String): Boolean = FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration) diff --git a/compatibility_spark-2/src/test/scala/za/co/absa/hyperdrive/compatibility/impl/TestSparkUtil.scala b/compatibility_spark-2/src/test/scala/za/co/absa/hyperdrive/compatibility/impl/TestSparkUtil.scala new file mode 100644 index 00000000..f0c1c702 --- /dev/null +++ b/compatibility_spark-2/src/test/scala/za/co/absa/hyperdrive/compatibility/impl/TestSparkUtil.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.compatibility.impl + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.spark.commons.test.SparkTestBase + +import za.co.absa.commons.io.{TempDirectory, TempFile} + +class TestSparkUtil extends AnyFlatSpec with Matchers with SparkTestBase { + + "createMetadataLogFileIndex" should "return a metadata log file index" in { + val result = SparkUtil.createMetadataLogFileIndex( + spark, + TempDirectory("createMetadataLogFileIndex").path.toAbsolutePath.toString, + None + ) + + result shouldBe a[org.apache.spark.sql.execution.streaming.MetadataLogFileIndex] + } + + "hasMetadata" should "return true if metadata exists" in { + val result = SparkUtil.hasMetadata( + spark, + TempFile("hasMetadata").path.toAbsolutePath.toString + ) + + result shouldBe false + } + + "jsonStringToObject" should "return an object from a JSON string" in { + val jsonString = """{"key": "value"}""" + + val result = SparkUtil.jsonStringToObject(jsonString) + + import scala.collection.JavaConverters._ + result.asInstanceOf[java.util.Map[String, String]].asScala should contain("key" -> "value") + } + + "objectToJsonString" should "return a JSON string from an object" in { + import scala.collection.JavaConverters._ + val obj = Map("key" -> "value").asJava + + val result = SparkUtil.objectToJsonString(obj) + + result shouldBe defined + result.get shouldBe """{"key":"value"}""" + } +} diff --git a/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala b/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala index 91fc1409..4c1f15ea 100644 --- a/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala +++ b/compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala @@ -20,6 +20,7 @@ import org.apache.avro.util.internal.JacksonUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.types.StructType import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil import java.io.ByteArrayOutputStream @@ -27,8 +28,8 @@ import java.io.ByteArrayOutputStream object SparkUtil extends CompatibleSparkUtil { private lazy val objectMapper = new ObjectMapper() - override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex = - new MetadataLogFileIndex(spark, new Path(destination), Map.empty, None) + override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex = + new MetadataLogFileIndex(spark, new Path(destination), Map.empty, userSpecifiedSchema) override def hasMetadata(spark: SparkSession, destination: String): Boolean = FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration, spark.sessionState.conf) diff --git a/compatibility_spark-3/src/test/scala/za/co/absa/hyperdrive/compatibility/impl/TestSparkUtil.scala b/compatibility_spark-3/src/test/scala/za/co/absa/hyperdrive/compatibility/impl/TestSparkUtil.scala new file mode 100644 index 00000000..f0c1c702 --- /dev/null +++ b/compatibility_spark-3/src/test/scala/za/co/absa/hyperdrive/compatibility/impl/TestSparkUtil.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.compatibility.impl + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.spark.commons.test.SparkTestBase + +import za.co.absa.commons.io.{TempDirectory, TempFile} + +class TestSparkUtil extends AnyFlatSpec with Matchers with SparkTestBase { + + "createMetadataLogFileIndex" should "return a metadata log file index" in { + val result = SparkUtil.createMetadataLogFileIndex( + spark, + TempDirectory("createMetadataLogFileIndex").path.toAbsolutePath.toString, + None + ) + + result shouldBe a[org.apache.spark.sql.execution.streaming.MetadataLogFileIndex] + } + + "hasMetadata" should "return true if metadata exists" in { + val result = SparkUtil.hasMetadata( + spark, + TempFile("hasMetadata").path.toAbsolutePath.toString + ) + + result shouldBe false + } + + "jsonStringToObject" should "return an object from a JSON string" in { + val jsonString = """{"key": "value"}""" + + val result = SparkUtil.jsonStringToObject(jsonString) + + import scala.collection.JavaConverters._ + result.asInstanceOf[java.util.Map[String, String]].asScala should contain("key" -> "value") + } + + "objectToJsonString" should "return a JSON string from an object" in { + import scala.collection.JavaConverters._ + val obj = Map("key" -> "value").asJava + + val result = SparkUtil.objectToJsonString(obj) + + result shouldBe defined + result.get shouldBe """{"key":"value"}""" + } +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala index 92fb79c9..7de55d28 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala @@ -16,16 +16,21 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion import org.apache.commons.configuration2.Configuration +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex import org.slf4j.LoggerFactory import org.apache.spark.sql.functions.{lit, to_date} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter +import org.apache.spark.sql.types.{DateType, StructField} import java.time.LocalDate import java.time.format.DateTimeFormatter +import org.apache.spark.sql.types._ private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer { @@ -44,22 +49,30 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val if (noCommittedParquetFilesExist(spark)) { initialVersion } else { - import spark.implicits._ - val df = spark.read.parquet(destination) - val versions = df.select(df(ColumnVersion)) - .filter(df(ColumnDate) === lit(reportDate)) - .distinct() - .as[Int] - .collect().toList - + val versions = getVersions(spark, ColumnDate, ColumnVersion, reportDate) if (versions.nonEmpty) versions.max + 1 else initialVersion } } private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = { - val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination) + val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, None) !CompatibleSparkUtilProvider.hasMetadata(spark, destination) || fileCatalog.allFiles().isEmpty } + + private def getVersions(spark: SparkSession, ColumnDate: String, ColumnVersion: String, reportDate: String): Seq[Int] = { + val fileCatalog: MetadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, Some(StructType(Seq( + StructField(ColumnDate, StringType, nullable = true), + StructField(ColumnVersion, IntegerType, nullable = true) + )))) + + fileCatalog.partitionSpec().partitions.map { partition => + val row: InternalRow = partition.values + (row.getString(0), row.getInt(1)) + } + .filter { case (date, _) => date == reportDate } + .map { case (_, version) => version } + .toList + } } object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes { diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/MetadataLogUtil.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/MetadataLogUtil.scala index 53994cdd..9db479a6 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/MetadataLogUtil.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/MetadataLogUtil.scala @@ -60,7 +60,7 @@ object MetadataLogUtil { } private def getMetadataLogFiles(spark: SparkSession, rootPath: String): Try[Set[String]] = { - val metadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, rootPath) + val metadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, rootPath, None) val parquetFilesArr = metadataLogFileIndex.inputFiles val parquetFiles = parquetFilesArr.toSet if (parquetFiles.size != parquetFilesArr.length) {