Skip to content

Lightweight version-change detection API for streaming companion sync #107

@schenksj

Description

@schenksj

Summary

Add lightweight APIs to Delta, Iceberg, and Parquet source readers that can efficiently detect whether new data exists since a given version — without performing a full file listing. This is a prerequisite for efficient streaming companion sync.

Priority: P1

Motivation

The current companion sync always calls getAllFiles() or distributed log read to get the full file list, then performs an anti-join to detect changes. For a streaming mode that polls every few seconds, this is too expensive. We need a fast "has anything changed?" check.

Proposed API

DeltaTableReader

/**
 * Return the current Delta table version (latest commit number).
 * Cost: single HEAD/_delta_log/_last_checkpoint + list a few JSON files.
 */
long getCurrentVersion(String tablePath);

/**
 * Return files added/removed between fromVersion (exclusive) and toVersion (inclusive).
 * Uses Delta's commit log directly — reads only the relevant JSON commit files.
 * Much cheaper than listing all files + anti-join.
 *
 * @return VersionDiff with added/removed file lists and version range
 */
VersionDiff getChangesBetween(String tablePath, long fromVersion, long toVersion);

/**
 * Arrow FFI variant of getChangesBetween for large changelogs.
 */
int getChangesBetweenArrowFfi(
    String tablePath, long fromVersion, long toVersion,
    long[] arrayAddrs, long[] schemaAddrs,
    PartitionFilter partitionFilter
);

IcebergTableReader

/**
 * Return the current snapshot ID.
 * Cost: read metadata.json only.
 */
long getCurrentSnapshotId(String tablePath);

/**
 * Return files added/removed between two snapshot IDs.
 * Uses Iceberg's incremental scan (reads only relevant manifest files).
 */
VersionDiff getChangesBetween(String tablePath, long fromSnapshotId, long toSnapshotId);

ParquetTableReader

/**
 * Return a fingerprint of the directory listing (e.g., hash of file count + total size + latest mtime).
 * Cost: single LIST call.
 */
String getDirectoryFingerprint(String tablePath);

VersionDiff (return type)

public class VersionDiff {
    long fromVersion;
    long toVersion;
    List<CompanionSourceFile> addedFiles;
    List<String> removedFilePaths;
    boolean hasChanges();  // true if addedFiles or removedFilePaths non-empty
}

Key Design Considerations

  1. Delta: Changes are available directly from commit JSON files (_delta_log/000000N.json). Reading just the commit files between lastSyncedVersion and currentVersion gives exact adds/removes without a full file listing. This is the same mechanism Delta Lake uses for streaming.

  2. Iceberg: Incremental scan between snapshots reads only new manifest files. Iceberg already supports this via its IncrementalAppendScan / IncrementalChangelogScan APIs.

  3. Parquet: No version tracking available. A directory fingerprint (file count + total bytes + latest modification time) can serve as a cheap "has anything changed?" signal.

  4. PartitionFilter support: The getChangesBetween variants should accept optional PartitionFilter to skip reading changes for irrelevant partitions (matching the existing WHERE clause support).

Expected Impact

  • Enables <100ms "has anything changed?" checks for Delta/Iceberg
  • Eliminates full file listing on every streaming poll cycle
  • Provides exact changeset (adds/removes) for surgical incremental sync

Related

  • indextables/indextables_spark streaming sync issue (to be linked)
  • Existing: DeltaTableReader.listFiles(), IcebergTableReader.readManifestFileArrowFfi()
  • Existing: DistributedSourceScanner.getSnapshotInfo()

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions