-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Summary
Add lightweight APIs to Delta, Iceberg, and Parquet source readers that can efficiently detect whether new data exists since a given version — without performing a full file listing. This is a prerequisite for efficient streaming companion sync.
Priority: P1
Motivation
The current companion sync always calls getAllFiles() or distributed log read to get the full file list, then performs an anti-join to detect changes. For a streaming mode that polls every few seconds, this is too expensive. We need a fast "has anything changed?" check.
Proposed API
DeltaTableReader
/**
* Return the current Delta table version (latest commit number).
* Cost: single HEAD/_delta_log/_last_checkpoint + list a few JSON files.
*/
long getCurrentVersion(String tablePath);
/**
* Return files added/removed between fromVersion (exclusive) and toVersion (inclusive).
* Uses Delta's commit log directly — reads only the relevant JSON commit files.
* Much cheaper than listing all files + anti-join.
*
* @return VersionDiff with added/removed file lists and version range
*/
VersionDiff getChangesBetween(String tablePath, long fromVersion, long toVersion);
/**
* Arrow FFI variant of getChangesBetween for large changelogs.
*/
int getChangesBetweenArrowFfi(
String tablePath, long fromVersion, long toVersion,
long[] arrayAddrs, long[] schemaAddrs,
PartitionFilter partitionFilter
);IcebergTableReader
/**
* Return the current snapshot ID.
* Cost: read metadata.json only.
*/
long getCurrentSnapshotId(String tablePath);
/**
* Return files added/removed between two snapshot IDs.
* Uses Iceberg's incremental scan (reads only relevant manifest files).
*/
VersionDiff getChangesBetween(String tablePath, long fromSnapshotId, long toSnapshotId);ParquetTableReader
/**
* Return a fingerprint of the directory listing (e.g., hash of file count + total size + latest mtime).
* Cost: single LIST call.
*/
String getDirectoryFingerprint(String tablePath);VersionDiff (return type)
public class VersionDiff {
long fromVersion;
long toVersion;
List<CompanionSourceFile> addedFiles;
List<String> removedFilePaths;
boolean hasChanges(); // true if addedFiles or removedFilePaths non-empty
}Key Design Considerations
-
Delta: Changes are available directly from commit JSON files (
_delta_log/000000N.json). Reading just the commit files betweenlastSyncedVersionandcurrentVersiongives exact adds/removes without a full file listing. This is the same mechanism Delta Lake uses for streaming. -
Iceberg: Incremental scan between snapshots reads only new manifest files. Iceberg already supports this via its
IncrementalAppendScan/IncrementalChangelogScanAPIs. -
Parquet: No version tracking available. A directory fingerprint (file count + total bytes + latest modification time) can serve as a cheap "has anything changed?" signal.
-
PartitionFilter support: The
getChangesBetweenvariants should accept optionalPartitionFilterto skip reading changes for irrelevant partitions (matching the existing WHERE clause support).
Expected Impact
- Enables <100ms "has anything changed?" checks for Delta/Iceberg
- Eliminates full file listing on every streaming poll cycle
- Provides exact changeset (adds/removes) for surgical incremental sync
Related
- indextables/indextables_spark streaming sync issue (to be linked)
- Existing:
DeltaTableReader.listFiles(),IcebergTableReader.readManifestFileArrowFfi() - Existing:
DistributedSourceScanner.getSnapshotInfo()