Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.github.dfa1.vortex.reader.array;

import io.github.dfa1.vortex.core.VortexException;

/// Package-private helper for the {@link LazyDateTimePartsLongArray} record.
///
/// {@code days}, {@code seconds} and {@code subseconds} children can each be one of
/// the four signed-integer typed array interfaces; the writer picks the narrowest
/// ptype that fits. {@link #readLong(Array, long)} centralises the per-row read so
/// the record itself stays compact.
final class DateTimePartsArrays {

private DateTimePartsArrays() {
}

/// Reads {@code arr[i]} as a signed long. Recurses through {@link MaskedArray};
/// throws on null cells so callers don't silently get garbage for nullable
/// columns.
///
/// @param arr source typed Array
/// @param i row index
/// @return cell value as long
/// @throws VortexException for null cells or unsupported array types
static long readLong(Array arr, long i) {
return switch (arr) {
case ByteArray a -> a.getByte(i);
case ShortArray a -> a.getShort(i);
case IntArray a -> a.getInt(i);
case LongArray a -> a.getLong(i);
case MaskedArray a -> {
if (!a.isValid(i)) {
throw new VortexException("DateTimeParts: null cell at index " + i);
}
yield readLong(a.inner(), i);
}
default -> throw new VortexException(
"DateTimeParts: unsupported child array type: " + arr.getClass().getSimpleName());
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.github.dfa1.vortex.reader.array;

import io.github.dfa1.vortex.core.DType;

import java.util.function.LongBinaryOperator;
import java.util.function.LongConsumer;

/// Lazy {@code vortex.datetimeparts} reassembly as a {@link LongArray}.
///
/// The encoding splits each raw epoch count into three children — {@code days},
/// {@code seconds} (within the day) and {@code subseconds} (within the second).
/// Reconstruction is
///
/// ```
/// raw = days * unitsPerDay + seconds * unitsPerSecond + subseconds
/// ```
///
/// where {@code unitsPerSecond} = `TimeUnit.divisor()` and
/// {@code unitsPerDay} = `86_400 * unitsPerSecond`. The reassembled long carries the
/// same epoch count the downstream extension decoder
/// ({@code TimestampExtensionDecoder}, {@code DateExtensionDecoder}, etc.) expects;
/// no buffer materialisation occurs at construction time.
///
/// The record's {@link #dtype()} is the parent Extension dtype (e.g.
/// {@code vortex.timestamp}) so it slots transparently into the extension-decode
/// pipeline. Children may be any signed integer typed Array
/// ({@link ByteArray}/{@link ShortArray}/{@link IntArray}/{@link LongArray}); the
/// per-row {@link DateTimePartsArrays#readLong} switch handles widening.
///
/// @param dtype logical element type (typically a {@code DType.Extension})
/// @param length total logical row count
/// @param daysArr per-row signed days
/// @param secondsArr per-row signed seconds within the day
/// @param subsecondsArr per-row signed sub-second count
/// @param unitsPerDay multiplier for the days component (= 86_400 × unitsPerSecond)
/// @param unitsPerSecond multiplier for the seconds component (= unit divisor)
public record LazyDateTimePartsLongArray(
DType dtype, long length,
Array daysArr, Array secondsArr, Array subsecondsArr,
long unitsPerDay, long unitsPerSecond)
implements LongArray {

@Override
public long getLong(long i) {
return DateTimePartsArrays.readLong(daysArr, i) * unitsPerDay
+ DateTimePartsArrays.readLong(secondsArr, i) * unitsPerSecond
+ DateTimePartsArrays.readLong(subsecondsArr, i);
}

@Override
public void forEachLong(LongConsumer c) {
long n = length;
for (long i = 0; i < n; i++) {
c.accept(getLong(i));
}
}

@Override
public long fold(long identity, LongBinaryOperator op) {
long acc = identity;
long n = length;
for (long i = 0; i < n; i++) {
acc = op.applyAsLong(acc, getLong(i));
}
return acc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@
import io.github.dfa1.vortex.core.DType;
import io.github.dfa1.vortex.core.PType;
import io.github.dfa1.vortex.core.VortexException;
import io.github.dfa1.vortex.reader.array.Array;
import io.github.dfa1.vortex.reader.array.GenericArray;
import io.github.dfa1.vortex.encoding.EncodingId;
import io.github.dfa1.vortex.encoding.TimeUnit;
import io.github.dfa1.vortex.proto.DateTimePartsMetadata;
import io.github.dfa1.vortex.reader.array.Array;
import io.github.dfa1.vortex.reader.array.LazyDateTimePartsLongArray;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

/// Read-only decoder for {@code vortex.datetimeparts}.
///
/// Reassembles the three children (days, seconds, subseconds) into a
/// {@link LazyDateTimePartsLongArray} of epoch counts in the extension's
/// {@link TimeUnit}. No per-row materialisation happens at decode time —
/// the downstream extension decoder reads the reassembled long via the
/// lazy {@code getLong} accessor.
public final class DateTimePartsEncodingDecoder implements EncodingDecoder {

private static final long SECONDS_PER_DAY = 86_400L;

/// Public no-arg constructor required by {@link java.util.ServiceLoader}.
public DateTimePartsEncodingDecoder() {
}
Expand Down Expand Up @@ -52,7 +61,27 @@ public Array decode(DecodeContext ctx) {
Array seconds = ctx.decodeChild(1, new DType.Primitive(secondsPtype, false), ctx.rowCount());
Array subseconds = ctx.decodeChild(2, new DType.Primitive(subsecondsPtype, false), ctx.rowCount());

return new GenericArray(ctx.dtype(), ctx.rowCount(), new MemorySegment[0],
new Array[]{days, seconds, subseconds});
if (!(ctx.dtype() instanceof DType.Extension ext)) {
throw new VortexException(EncodingId.VORTEX_DATETIMEPARTS,
"expected Extension dtype, got " + ctx.dtype());
}
long unitsPerSecond = readUnitsPerSecond(ext);
long unitsPerDay = SECONDS_PER_DAY * unitsPerSecond;

return new LazyDateTimePartsLongArray(ctx.dtype(), ctx.rowCount(),
days, seconds, subseconds, unitsPerDay, unitsPerSecond);
}

/// Returns {@code TimeUnit.divisor()} for the extension's declared time unit, or
/// {@code 1} when the unit is {@link TimeUnit#Days} (days carry no sub-second
/// component; seconds and subseconds children are expected to be zero).
private static long readUnitsPerSecond(DType.Extension ext) {
ByteBuffer extMeta = ext.metadata();
if (extMeta == null || !extMeta.hasRemaining()) {
throw new VortexException(EncodingId.VORTEX_DATETIMEPARTS,
"extension " + ext.extensionId() + " missing TimeUnit metadata byte");
}
TimeUnit unit = TimeUnit.fromTag(extMeta.get(extMeta.position()));
return unit == TimeUnit.Days ? 1L : unit.divisor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.github.dfa1.vortex.reader.array;

import io.github.dfa1.vortex.core.DType;
import io.github.dfa1.vortex.core.PType;
import org.junit.jupiter.api.Test;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;

import static org.assertj.core.api.Assertions.assertThat;

/// Unit tests for {@link LazyDateTimePartsLongArray}. Verifies the
/// {@code days * unitsPerDay + seconds * unitsPerSecond + subseconds}
/// reassembly across the supported time units, and the widening read path
/// that lets each child use whatever signed-integer ptype the encoder picked.
class LazyDateTimePartsLongArrayTest {

private static final DType I64 = new DType.Primitive(PType.I64, false);
private static final DType I32 = new DType.Primitive(PType.I32, false);
// Decoder constructs records carrying the parent Extension dtype; use I64 here
// as a stand-in since the record never inspects dtype semantics directly.

@Test
void millisecondsReassembly() {
// Given 2 rows of arbitrary (days, seconds_in_day, subseconds) for ms unit.
// unitsPerSecond = 1_000; unitsPerDay = 86_400_000.
// Row 0: days=20_000 -> 2024-12-31 area, seconds=12345, subseconds=678 -> 1_728_012_345_678
try (Arena arena = Arena.ofConfined()) {
LongArray days = longArray(arena, 20_000L, 0L);
LongArray seconds = longArray(arena, 12_345L, 0L);
LongArray subseconds = longArray(arena, 678L, 0L);
long unitsPerSecond = 1_000L;
long unitsPerDay = 86_400L * unitsPerSecond;

var sut = new LazyDateTimePartsLongArray(I64, 2,
days, seconds, subseconds, unitsPerDay, unitsPerSecond);

assertThat(sut.getLong(0)).isEqualTo(
20_000L * unitsPerDay + 12_345L * unitsPerSecond + 678L);
assertThat(sut.getLong(1)).isZero();
}
}

@Test
void widensFromNarrowerChildPtypes() {
// Days as I32, seconds as I32, subseconds as I64 — encoder is free to pick.
try (Arena arena = Arena.ofConfined()) {
IntArray days = intArray(arena, 1);
IntArray seconds = intArray(arena, 2);
LongArray subseconds = longArray(arena, 3L);
long ups = 1_000_000_000L; // nanos
long upd = 86_400L * ups;

var sut = new LazyDateTimePartsLongArray(I64, 1,
days, seconds, subseconds, upd, ups);

assertThat(sut.getLong(0)).isEqualTo(1L * upd + 2L * ups + 3L);
}
}

@Test
void foldSumsAllRows() {
try (Arena arena = Arena.ofConfined()) {
LongArray days = longArray(arena, 1L, 2L, 3L);
LongArray seconds = longArray(arena, 0L, 0L, 0L);
LongArray subseconds = longArray(arena, 0L, 0L, 0L);
long ups = 1L;
long upd = 86_400L;

var sut = new LazyDateTimePartsLongArray(I64, 3,
days, seconds, subseconds, upd, ups);

long sum = sut.fold(0L, Long::sum);
// 1*86400 + 2*86400 + 3*86400 = 6*86400
assertThat(sum).isEqualTo(6L * upd);
}
}

private static LongArray longArray(Arena arena, long... vs) {
MemorySegment seg = arena.allocate(vs.length * 8L, 8);
for (int i = 0; i < vs.length; i++) {
seg.setAtIndex(ValueLayout.JAVA_LONG, i, vs[i]);
}
return new MaterializedLongArray(I64, vs.length, seg.asReadOnly());
}

private static IntArray intArray(Arena arena, int... vs) {
MemorySegment seg = arena.allocate(vs.length * 4L, 4);
for (int i = 0; i < vs.length; i++) {
seg.setAtIndex(ValueLayout.JAVA_INT, i, vs[i]);
}
return new MaterializedIntArray(I32, vs.length, seg.asReadOnly());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.github.dfa1.vortex.core.DType;
import io.github.dfa1.vortex.core.PType;
import io.github.dfa1.vortex.reader.array.GenericArray;
import io.github.dfa1.vortex.reader.array.LongArray;
import io.github.dfa1.vortex.reader.decode.ArrayNode;
import io.github.dfa1.vortex.encoding.DTypes;
Expand Down Expand Up @@ -104,15 +103,10 @@ void roundTrip_milliseconds_preservesDaysSecondsSubseconds() {
MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new);
DecodeContext ctx = new DecodeContext(
toArrayNode(result.rootNode()), EXT_TIMESTAMP_MS, 1, bufs, REGISTRY, Arena.global());
GenericArray decoded = (GenericArray) DECODER.decode(ctx);
LongArray decoded = (LongArray) DECODER.decode(ctx);

assertThat(decoded.length()).isEqualTo(1);
LongArray days = (LongArray) decoded.child(0);
LongArray seconds = (LongArray) decoded.child(1);
LongArray subseconds = (LongArray) decoded.child(2);
assertThat(days.getLong(0)).isEqualTo(1L);
assertThat(seconds.getLong(0)).isEqualTo(3723L);
assertThat(subseconds.getLong(0)).isEqualTo(456L);
assertThat(decoded.getLong(0)).isEqualTo(ts);
}

@Test
Expand All @@ -126,14 +120,9 @@ void roundTrip_nanoseconds_preservesSubsecondPrecision() {
MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new);
DecodeContext ctx = new DecodeContext(
toArrayNode(result.rootNode()), EXT_TIMESTAMP_NS, 1, bufs, REGISTRY, Arena.global());
GenericArray decoded = (GenericArray) DECODER.decode(ctx);

LongArray days = (LongArray) decoded.child(0);
LongArray seconds = (LongArray) decoded.child(1);
LongArray subseconds = (LongArray) decoded.child(2);
assertThat(days.getLong(0)).isEqualTo(1L);
assertThat(seconds.getLong(0)).isEqualTo(3723L);
assertThat(subseconds.getLong(0)).isEqualTo(456_789_123L);
LongArray decoded = (LongArray) DECODER.decode(ctx);

assertThat(decoded.getLong(0)).isEqualTo(ts);
}

@Test
Expand All @@ -145,14 +134,9 @@ void roundTrip_epoch_allZero() {
MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new);
DecodeContext ctx = new DecodeContext(
toArrayNode(result.rootNode()), EXT_TIMESTAMP_MS, 1, bufs, REGISTRY, Arena.global());
GenericArray decoded = (GenericArray) DECODER.decode(ctx);

LongArray days = (LongArray) decoded.child(0);
LongArray seconds = (LongArray) decoded.child(1);
LongArray subseconds = (LongArray) decoded.child(2);
assertThat(days.getLong(0)).isEqualTo(0L);
assertThat(seconds.getLong(0)).isEqualTo(0L);
assertThat(subseconds.getLong(0)).isEqualTo(0L);
LongArray decoded = (LongArray) DECODER.decode(ctx);

assertThat(decoded.getLong(0)).isZero();
}

@Test
Expand All @@ -165,17 +149,12 @@ void roundTrip_multipleTimestamps_allRowsPreserved() {
MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new);
DecodeContext ctx = new DecodeContext(
toArrayNode(result.rootNode()), EXT_TIMESTAMP_MS, 4, bufs, REGISTRY, Arena.global());
GenericArray decoded = (GenericArray) DECODER.decode(ctx);
LongArray decoded = (LongArray) DECODER.decode(ctx);

assertThat(decoded.length()).isEqualTo(4);
LongArray days = (LongArray) decoded.child(0);
assertThat(days.getLong(0)).isEqualTo(0L);
assertThat(days.getLong(1)).isEqualTo(1L);
assertThat(days.getLong(2)).isEqualTo(1L);
assertThat(days.getLong(3)).isEqualTo(1L);
LongArray subseconds = (LongArray) decoded.child(2);
assertThat(subseconds.getLong(2)).isEqualTo(0L);
assertThat(subseconds.getLong(3)).isEqualTo(1L);
for (int i = 0; i < timestamps.length; i++) {
assertThat(decoded.getLong(i)).as("row %d", i).isEqualTo(timestamps[i]);
}
}

@ParameterizedTest
Expand All @@ -189,14 +168,9 @@ void roundTrip_allUnits_epochIsZero(TimeUnit unit) {
MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new);
DecodeContext ctx = new DecodeContext(
toArrayNode(result.rootNode()), dtype, 1, bufs, REGISTRY, Arena.global());
GenericArray decoded = (GenericArray) DECODER.decode(ctx);

LongArray days = (LongArray) decoded.child(0);
LongArray seconds = (LongArray) decoded.child(1);
LongArray subseconds = (LongArray) decoded.child(2);
assertThat(days.getLong(0)).isZero();
assertThat(seconds.getLong(0)).isZero();
assertThat(subseconds.getLong(0)).isZero();
LongArray decoded = (LongArray) DECODER.decode(ctx);

assertThat(decoded.getLong(0)).isZero();
}

@Test
Expand Down