Skip to content
This repository was archived by the owner on Nov 28, 2025. It is now read-only.
This repository was archived by the owner on Nov 28, 2025. It is now read-only.

Possible reason that may fail 'Propagate' - 'write data locally' in LocalWriteSuite test #164

@simonzhaoms

Description

@simonzhaoms

private def writePartitionLocal(
index: Int,
part: Iterator[(BytesWritable, NullWritable)],
localPath: String,
mode: SaveMode): Iterator[Int] = {
val dir = new File(localPath)
if (dir.exists()) {
if (mode == SaveMode.ErrorIfExists) {
throw new IllegalStateException(
s"LocalPath $localPath already exists. SaveMode: ErrorIfExists.")
}
if (mode == SaveMode.Ignore) {
return Iterator.empty
}
}
// Make the directory if it does not exist
dir.mkdirs()
// The path to the partition file.
val filePath = localPath + s"/part-" + String.format("%05d", java.lang.Integer.valueOf(index))
val fos = new DataOutputStream(new FileOutputStream(filePath))
var count = 0
try {
val tfw = new TFRecordWriter(fos)
for((bw, _) <- part) {
tfw.write(bw.getBytes)
count += 1
}
} finally {
fos.close()
}
Iterator(count)
}
// Working around the closure variable captures.
private def writePartitionLocalFun(
localPath: String,
mode: SaveMode): (Int, Iterator[(BytesWritable, NullWritable)]) => Iterator[Int] = {
def mapFun(index: Int, part: Iterator[(BytesWritable, NullWritable)]) = {
writePartitionLocal(index, part, localPath, mode)
}
mapFun
}

The check if (dir.exists()) in line 179 above may cause subsequent partition write failed if partitions are more than 2 in the test below. Because partitions are written in a map in line 211 above, subsequent partition writes would fail when checking if (dir.exists()).

"Propagate" should {
"write data locally" in {
// Create a dataframe with 2 partitions
val rdd = spark.sparkContext.parallelize(testRows, numSlices = 2)
val df = spark.createDataFrame(rdd, schema)
// Write the partitions onto the local hard drive. Since it is going to be the
// local file system, the partitions will be written in the same directory of the
// same machine.
// In a distributed setting though, two different machines would each hold a single
// partition.
val localPath = Files.createTempDirectory("spark-connector-propagate").toAbsolutePath.toString
val savePath = localPath + "/testResult"
df.write.format("tfrecords")
.option("recordType", "Example")
.option("writeLocality", "local")
.save(savePath)
// Read again this directory, this time using the Hadoop file readers, it should
// return the same data.
// This only works in this test and does not hold in general, because the partitions
// will be written on the workers. Everything runs locally for tests.
val df2 = spark.read.format("tfrecords").option("recordType", "Example")
.load(savePath).sort("id").select("id", "IntegerTypeLabel", "LongTypeLabel",
"FloatTypeLabel", "DoubleTypeLabel", "VectorLabel", "name") // Correct column order.
assert(df2.collect().toSeq === testRows.toSeq)
}
}

The exception thrown should be similar to #141 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions