diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java index 49afd5c44f..8c2cc80a75 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -124,6 +124,29 @@ public void open() { public void processElement(StreamRecord> streamRecord) throws Exception { TableBucketWriteResult tableBucketWriteResult = streamRecord.getValue(); + + // Check if this is a failure marker from upstream Reader + if (tableBucketWriteResult.isFailedMarker()) { + long failedTableId = tableBucketWriteResult.tableBucket().getTableId(); + String failReason = tableBucketWriteResult.failReason(); + LOG.info( + "Received failure marker for table {}. Reason: {}. " + + "Cleaning up collected write results for this table.", + failedTableId, + failReason); + + // Clean up any partially collected write results for the failed table + List> removedResults = + collectedTableBucketWriteResults.remove(failedTableId); + if (removedResults != null) { + LOG.info( + "Cleaned up {} collected write results for failed table {}.", + removedResults.size(), + failedTableId); + } + return; + } + TableBucket tableBucket = tableBucketWriteResult.tableBucket(); long tableId = tableBucket.getTableId(); registerTableBucketWriteResult(tableId, tableBucketWriteResult); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TableTieringFailedEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TableTieringFailedEvent.java new file mode 100644 index 0000000000..1a521e1cc8 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TableTieringFailedEvent.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * SourceEvent used by TieringSourceEnumerator to notify all TieringSourceReaders that a specific + * table's tiering has failed. Upon receiving this event, the readers should clear their internal + * state for the failed table and continue processing other tables. + */ +public class TableTieringFailedEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + + private final long tableId; + + private final String failReason; + + public TableTieringFailedEvent(long tableId, String failReason) { + this.tableId = tableId; + this.failReason = failReason; + } + + public long getTableId() { + return tableId; + } + + public String getFailReason() { + return failReason; + } + + @Override + public String toString() { + return "TableTieringFailedEvent{" + + "tableId=" + + tableId + + ", failReason='" + + failReason + + '\'' + + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java index ba648f2977..cbfe08c609 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java @@ -43,6 +43,7 @@ public class TableBucketWriteResult implements Serializable { @Nullable private final String partitionName; // will be null when no any data write, such as for tiering an empty log split + // or when this is a failure marker @Nullable private final WriteResult writeResult; // the end offset of tiering, should be the last tiered record's offset + 1 @@ -57,6 +58,12 @@ public class TableBucketWriteResult implements Serializable { // for the round of tiering is finished private final int numberOfWriteResults; + // indicates whether this is a failure marker rather than an actual write result + private final boolean failedMarker; + + // the reason for failure when this is a failure marker + @Nullable private final String failReason; + public TableBucketWriteResult( TablePath tablePath, TableBucket tableBucket, @@ -65,6 +72,28 @@ public TableBucketWriteResult( long logEndOffset, long maxTimestamp, int numberOfWriteResults) { + this( + tablePath, + tableBucket, + partitionName, + writeResult, + logEndOffset, + maxTimestamp, + numberOfWriteResults, + false, + null); + } + + private TableBucketWriteResult( + TablePath tablePath, + TableBucket tableBucket, + @Nullable String partitionName, + @Nullable WriteResult writeResult, + long logEndOffset, + long maxTimestamp, + int numberOfWriteResults, + boolean failedMarker, + @Nullable String failReason) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partitionName = partitionName; @@ -72,6 +101,24 @@ public TableBucketWriteResult( this.logEndOffset = logEndOffset; this.maxTimestamp = maxTimestamp; this.numberOfWriteResults = numberOfWriteResults; + this.failedMarker = failedMarker; + this.failReason = failReason; + } + + /** + * Creates a failure marker result that indicates tiering for a table has failed. + * + * @param tableId the ID of the failed table + * @param failReason the reason for the failure + * @param the type of write result + * @return a failure marker TableBucketWriteResult + */ + public static TableBucketWriteResult failedMarker( + long tableId, String failReason) { + // Use a dummy TableBucket with only tableId for the failure marker + TableBucket dummyBucket = new TableBucket(tableId, -1); + return new TableBucketWriteResult<>( + null, dummyBucket, null, null, -1, -1, -1, true, failReason); } public TablePath tablePath() { @@ -103,4 +150,13 @@ public long logEndOffset() { public long maxTimestamp() { return maxTimestamp; } + + public boolean isFailedMarker() { + return failedMarker; + } + + @Nullable + public String failReason() { + return failReason; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java index 3651760955..3fef93efa6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java @@ -33,7 +33,7 @@ public class TableBucketWriteResultSerializer private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer writeResultSerializer; @@ -53,44 +53,58 @@ public int getVersion() { public byte[] serialize(TableBucketWriteResult tableBucketWriteResult) throws IOException { final DataOutputSerializer out = SERIALIZER_CACHE.get(); - // serialize table path - TablePath tablePath = tableBucketWriteResult.tablePath(); - out.writeUTF(tablePath.getDatabaseName()); - out.writeUTF(tablePath.getTableName()); - - // serialize bucket - TableBucket tableBucket = tableBucketWriteResult.tableBucket(); - out.writeLong(tableBucket.getTableId()); - // write partition - if (tableBucket.getPartitionId() != null) { - out.writeBoolean(true); - out.writeLong(tableBucket.getPartitionId()); - out.writeUTF(tableBucketWriteResult.partitionName()); - } else { - out.writeBoolean(false); - } - out.writeInt(tableBucket.getBucket()); - // serialize write result - WriteResult writeResult = tableBucketWriteResult.writeResult(); - if (writeResult == null) { - // write -1 to mark write result as null - out.writeInt(-1); + // serialize failed marker flag first + out.writeBoolean(tableBucketWriteResult.isFailedMarker()); + + if (tableBucketWriteResult.isFailedMarker()) { + // for failed marker, only serialize tableId and failReason + out.writeLong(tableBucketWriteResult.tableBucket().getTableId()); + String failReason = tableBucketWriteResult.failReason(); + out.writeBoolean(failReason != null); + if (failReason != null) { + out.writeUTF(failReason); + } } else { - byte[] serializeBytes = writeResultSerializer.serialize(writeResult); - out.writeInt(serializeBytes.length); - out.write(serializeBytes); + // serialize table path + TablePath tablePath = tableBucketWriteResult.tablePath(); + out.writeUTF(tablePath.getDatabaseName()); + out.writeUTF(tablePath.getTableName()); + + // serialize bucket + TableBucket tableBucket = tableBucketWriteResult.tableBucket(); + out.writeLong(tableBucket.getTableId()); + // write partition + if (tableBucket.getPartitionId() != null) { + out.writeBoolean(true); + out.writeLong(tableBucket.getPartitionId()); + out.writeUTF(tableBucketWriteResult.partitionName()); + } else { + out.writeBoolean(false); + } + out.writeInt(tableBucket.getBucket()); + + // serialize write result + WriteResult writeResult = tableBucketWriteResult.writeResult(); + if (writeResult == null) { + // write -1 to mark write result as null + out.writeInt(-1); + } else { + byte[] serializeBytes = writeResultSerializer.serialize(writeResult); + out.writeInt(serializeBytes.length); + out.write(serializeBytes); + } + + // serialize log end offset + out.writeLong(tableBucketWriteResult.logEndOffset()); + + // serialize max timestamp + out.writeLong(tableBucketWriteResult.maxTimestamp()); + + // serialize number of write results + out.writeInt(tableBucketWriteResult.numberOfWriteResults()); } - // serialize log end offset - out.writeLong(tableBucketWriteResult.logEndOffset()); - - // serialize max timestamp - out.writeLong(tableBucketWriteResult.maxTimestamp()); - - // serialize number of write results - out.writeInt(tableBucketWriteResult.numberOfWriteResults()); - final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -99,9 +113,17 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu @Override public TableBucketWriteResult deserialize(int version, byte[] serialized) throws IOException { - if (version != CURRENT_VERSION) { + if (version == 1) { + return deserializeV1(serialized); + } else if (version == CURRENT_VERSION) { + return deserializeV2(serialized); + } else { throw new IOException("Unknown version " + version); } + } + + private TableBucketWriteResult deserializeV1(byte[] serialized) + throws IOException { final DataInputDeserializer in = new DataInputDeserializer(serialized); // deserialize table path String databaseName = in.readUTF(); @@ -125,7 +147,7 @@ public TableBucketWriteResult deserialize(int version, byte[] seria if (writeResultLength >= 0) { byte[] writeResultBytes = new byte[writeResultLength]; in.readFully(writeResultBytes); - writeResult = writeResultSerializer.deserialize(version, writeResultBytes); + writeResult = writeResultSerializer.deserialize(1, writeResultBytes); } else { writeResult = null; } @@ -145,4 +167,64 @@ public TableBucketWriteResult deserialize(int version, byte[] seria maxTimestamp, numberOfWriteResults); } + + private TableBucketWriteResult deserializeV2(byte[] serialized) + throws IOException { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + + // read failed marker flag + boolean isFailedMarker = in.readBoolean(); + + if (isFailedMarker) { + // deserialize failed marker + long tableId = in.readLong(); + String failReason = null; + if (in.readBoolean()) { + failReason = in.readUTF(); + } + return TableBucketWriteResult.failedMarker(tableId, failReason); + } else { + // deserialize table path + String databaseName = in.readUTF(); + String tableName = in.readUTF(); + TablePath tablePath = new TablePath(databaseName, tableName); + + // deserialize bucket + long tableId = in.readLong(); + Long partitionId = null; + String partitionName = null; + if (in.readBoolean()) { + partitionId = in.readLong(); + partitionName = in.readUTF(); + } + int bucketId = in.readInt(); + TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); + + // deserialize write result + int writeResultLength = in.readInt(); + WriteResult writeResult; + if (writeResultLength >= 0) { + byte[] writeResultBytes = new byte[writeResultLength]; + in.readFully(writeResultBytes); + writeResult = writeResultSerializer.deserialize(CURRENT_VERSION, writeResultBytes); + } else { + writeResult = null; + } + + // deserialize log end offset + long logEndOffset = in.readLong(); + // deserialize max timestamp + long maxTimestamp = in.readLong(); + // deserialize number of write results + int numberOfWriteResults = in.readInt(); + return new TableBucketWriteResult<>( + tablePath, + tableBucket, + partitionName, + writeResult, + logEndOffset, + maxTimestamp, + numberOfWriteResults); + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index a6b14b320d..663c611848 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -19,17 +19,28 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.client.Connection; +import org.apache.fluss.flink.tiering.event.FailedTieringEvent; +import org.apache.fluss.flink.tiering.event.TableTieringFailedEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.InputStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; /** A {@link SourceReader} that read records from Fluss and write to lake. */ @Internal @@ -40,18 +51,61 @@ public final class TieringSourceReader TieringSplit, TieringSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(TieringSourceReader.class); + private final Connection connection; + private final TieringSplitReader splitReader; + // Queue to store failed table info detected from SplitReader + private final Queue failedTableQueue; + + // Queue to store failure markers that need to be sent to downstream Committer + // These markers are generated when receiving TableTieringFailedEvent from Enumerator + private final Queue> failedMarkersForCommitter; public TieringSourceReader( SourceReaderContext context, Connection connection, LakeTieringFactory lakeTieringFactory) { + this( + context, + connection, + new FutureCompletingBlockingQueue<>(), + new ConcurrentLinkedQueue<>(), + lakeTieringFactory); + } + + private TieringSourceReader( + SourceReaderContext context, + Connection connection, + FutureCompletingBlockingQueue>> + elementsQueue, + Queue failedTableQueue, + LakeTieringFactory lakeTieringFactory) { + this( + context, + connection, + elementsQueue, + new TieringSplitReader<>(connection, lakeTieringFactory, failedTableQueue), + failedTableQueue); + } + + private TieringSourceReader( + SourceReaderContext context, + Connection connection, + FutureCompletingBlockingQueue>> + elementsQueue, + TieringSplitReader splitReader, + Queue failedTableQueue) { super( - () -> new TieringSplitReader<>(connection, lakeTieringFactory), + elementsQueue, + () -> splitReader, new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); this.connection = connection; + this.splitReader = splitReader; + this.failedTableQueue = failedTableQueue; + this.failedMarkersForCommitter = new ConcurrentLinkedQueue<>(); } @Override @@ -62,11 +116,78 @@ public void start() { } } + @Override + public InputStatus pollNext(ReaderOutput> output) + throws Exception { + // Check for failed tables and send events to Enumerator + processFailedTables(); + + // Emit any pending failure markers to the downstream Committer + emitFailedMarkersToCommitter(output); + + return super.pollNext(output); + } + + /** + * Processes any failed tables detected from the SplitReader and sends FailedTieringEvent to the + * Enumerator. + */ + private void processFailedTables() { + TieringSplitReader.FailedTableInfo failedTable; + while ((failedTable = failedTableQueue.poll()) != null) { + LOG.info( + "Detected table {} tiering failure, sending FailedTieringEvent to Enumerator. Reason: {}", + failedTable.getTableId(), + failedTable.getFailReason()); + context.sendSourceEventToCoordinator( + new FailedTieringEvent(failedTable.getTableId(), failedTable.getFailReason())); + } + } + + /** Emits any pending failure markers to the downstream Committer. */ + private void emitFailedMarkersToCommitter( + ReaderOutput> output) { + TableBucketWriteResult failedMarker; + while ((failedMarker = failedMarkersForCommitter.poll()) != null) { + LOG.info( + "Emitting failure marker for table {} to downstream Committer.", + failedMarker.tableBucket().getTableId()); + output.collect(failedMarker); + } + } + @Override protected void onSplitFinished(Map finishedSplitIds) { context.sendSplitRequest(); } + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof TableTieringFailedEvent) { + TableTieringFailedEvent failedEvent = (TableTieringFailedEvent) sourceEvent; + long failedTableId = failedEvent.getTableId(); + String failReason = failedEvent.getFailReason(); + + LOG.info( + "Received TableTieringFailedEvent from Enumerator for table {}. Reason: {}", + failedTableId, + failReason); + + // Notify the SplitReader to clean up state for the failed table + splitReader.notifyTableTieringFailed(failedTableId); + + // Create a failure marker and queue it for sending to downstream Committer + TableBucketWriteResult failedMarker = + TableBucketWriteResult.failedMarker(failedTableId, failReason); + failedMarkersForCommitter.offer(failedMarker); + LOG.info( + "Queued failure marker for table {} to be sent to downstream Committer.", + failedTableId); + } else { + super.handleSourceEvents(sourceEvent); + } + } + @Override public List snapshotState(long checkpointId) { // we return empty list to make source reader be stateless diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index b9fe79e3d3..251a7e5a78 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -33,6 +33,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -54,6 +55,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -94,8 +96,21 @@ public class TieringSplitReader private final Map currentTableStoppingOffsets; private final Set currentTableEmptyLogSplits; + // table ids that have been marked as failed by Enumerator, should be cleaned from pending state + private final Set failedTableIds; + + // Queue to store failed table info for the SourceReader to send to Enumerator + private final Queue failedTableQueue; + public TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory) { + this(connection, lakeTieringFactory, new ConcurrentLinkedQueue<>()); + } + + public TieringSplitReader( + Connection connection, + LakeTieringFactory lakeTieringFactory, + Queue failedTableQueue) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; @@ -106,10 +121,15 @@ public TieringSplitReader( this.currentTableSplitsByBucket = new HashMap<>(); this.lakeWriters = new HashMap<>(); this.currentPendingSnapshotSplits = new ArrayDeque<>(); + this.failedTableIds = new HashSet<>(); + this.failedTableQueue = failedTableQueue; } @Override public RecordsWithSplitIds> fetch() throws IOException { + // first clean up any failed tables notified from Enumerator + cleanUpFailedTables(); + // check empty splits if (!currentTableEmptyLogSplits.isEmpty()) { LOG.info("Empty split(s) {} finished.", currentTableEmptyLogSplits); @@ -124,18 +144,27 @@ public RecordsWithSplitIds> fetch() throws I // may read snapshot firstly if (currentSnapshotSplitReader != null) { - CloseableIterator recordIterator = currentSnapshotSplitReader.readBatch(); - if (recordIterator == null) { - LOG.info("Split {} is finished", currentSnapshotSplit.splitId()); - return finishCurrentSnapshotSplit(); - } else { - return forSnapshotSplitRecords( - currentSnapshotSplit.getTableBucket(), recordIterator); + try { + CloseableIterator recordIterator = + currentSnapshotSplitReader.readBatch(); + if (recordIterator == null) { + LOG.info("Split {} is finished", currentSnapshotSplit.splitId()); + return finishCurrentSnapshotSplit(); + } else { + return forSnapshotSplitRecords( + currentSnapshotSplit.getTableBucket(), recordIterator); + } + } catch (Exception e) { + return handleTableTieringException(e); } } else { if (currentLogScanner != null) { - ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT); - return forLogRecords(scanRecords); + try { + ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT); + return forLogRecords(scanRecords); + } catch (Exception e) { + return handleTableTieringException(e); + } } else { return emptyTableBucketWriteResultWithSplitIds(); } @@ -441,6 +470,133 @@ private void finishCurrentTable() throws IOException { currentTableSplitsByBucket.clear(); } + /** Cleans up the state for tables that have been marked as failed by the Enumerator. */ + private void cleanUpFailedTables() { + if (failedTableIds.isEmpty()) { + return; + } + + // clean up pending splits for failed tables + for (Long failedTableId : failedTableIds) { + pendingTieringSplits.remove(failedTableId); + pendingTieringTables.remove(failedTableId); + + // if the failed table is the current table being processed, clean it up + if (currentTableId != null && currentTableId.equals(failedTableId)) { + LOG.info( + "Cleaning up current table {} (id={}) as it has been marked as failed.", + currentTablePath, + currentTableId); + try { + cleanupCurrentTableState(); + } catch (Exception e) { + LOG.warn( + "Error while cleaning up failed table {} state: {}", + failedTableId, + e.getMessage()); + } + } + } + failedTableIds.clear(); + } + + public void notifyTableTieringFailed(long tableId) { + LOG.info("Received notification that table {} tiering has failed.", tableId); + failedTableIds.add(tableId); + } + + /** + * Handles an exception that occurred during table tiering. + * + * @param e the exception that occurred + * @return an empty result to indicate no records are available for this fetch cycle + */ + private RecordsWithSplitIds> handleTableTieringException( + Exception e) { + if (currentTableId == null) { + // no current table, re-throw the exception + LOG.error("Exception occurred but no current table is being processed.", e); + throw new RuntimeException(e); + } + + LOG.error( + "Tiering failed for table {} (id={}). Error: {}. " + + "The table will be marked as failed and skipped.", + currentTablePath, + currentTableId, + e.getMessage(), + e); + + // store the failure info in the queue + long failedTableId = currentTableId; + String failReason = ExceptionUtils.stringifyException(e); + failedTableQueue.offer(new FailedTableInfo(failedTableId, failReason)); + + // clean up current table state + try { + cleanupCurrentTableState(); + } catch (Exception cleanupException) { + LOG.warn( + "Error while cleaning up state for failed table {}: {}", + failedTableId, + cleanupException.getMessage()); + } + + return emptyTableBucketWriteResultWithSplitIds(); + } + + private void cleanupCurrentTableState() { + // close lake writers for current table + for (LakeWriter writer : lakeWriters.values()) { + try { + writer.close(); + } catch (Exception e) { + LOG.warn("Error closing lake writer: {}", e.getMessage()); + } + } + lakeWriters.clear(); + + // close current snapshot split reader + if (currentSnapshotSplitReader != null) { + try { + currentSnapshotSplitReader.close(); + } catch (Exception e) { + LOG.warn("Error closing snapshot split reader: {}", e.getMessage()); + } + currentSnapshotSplitReader = null; + } + currentSnapshotSplit = null; + + // close current log scanner + if (currentLogScanner != null) { + try { + currentLogScanner.close(); + } catch (Exception e) { + LOG.warn("Error closing log scanner: {}", e.getMessage()); + } + currentLogScanner = null; + } + + // close current table + if (currentTable != null) { + try { + currentTable.close(); + } catch (Exception e) { + LOG.warn("Error closing table: {}", e.getMessage()); + } + currentTable = null; + } + + // clear current table state + currentTableId = null; + currentTablePath = null; + currentTableNumberOfSplits = null; + currentPendingSnapshotSplits.clear(); + currentTableStoppingOffsets.clear(); + currentTableEmptyLogSplits.clear(); + currentTableSplitsByBucket.clear(); + } + @Override public void wakeUp() { if (currentLogScanner != null) { @@ -507,6 +663,25 @@ private TableBucketWriteResult toTableBucketWriteResult( numberOfSplits); } + /** Info class to hold failed table information. */ + public static class FailedTableInfo { + private final long tableId; + private final String failReason; + + public FailedTableInfo(long tableId, String failReason) { + this.tableId = tableId; + this.failReason = failReason; + } + + public long getTableId() { + return tableId; + } + + public String getFailReason() { + return failReason; + } + } + private class TableBucketWriteResultWithSplitIds implements RecordsWithSplitIds> { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index c1c26390a3..7763e80608 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -26,6 +26,7 @@ import org.apache.fluss.flink.metrics.FlinkMetricRegistry; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; +import org.apache.fluss.flink.tiering.event.TableTieringFailedEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; @@ -261,6 +262,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } else { failedTableEpochs.put(failedTableId, tieringEpoch); } + + // Broadcast TableTieringFailedEvent to all other readers + broadcastTableTieringFailedEvent(subtaskId, failedTableId, failedEvent.failReason()); } if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) { @@ -270,6 +274,30 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } + /** + * Broadcasts a TableTieringFailedEvent to all readers except the source reader that reported + * the failure. This allows other readers to clean up their state for the failed table. + * + * @param sourceSubtaskId the subtask ID of the reader that reported the failure + * @param failedTableId the ID of the failed table + * @param failReason the reason for the failure + */ + private void broadcastTableTieringFailedEvent( + int sourceSubtaskId, long failedTableId, String failReason) { + TableTieringFailedEvent failedEvent = + new TableTieringFailedEvent(failedTableId, failReason); + + for (Integer readerSubtaskId : context.registeredReaders().keySet()) { + if (readerSubtaskId != sourceSubtaskId) { + LOG.info( + "Sending TableTieringFailedEvent for table {} to reader {}.", + failedTableId, + readerSubtaskId); + context.sendEventToSourceReader(readerSubtaskId, failedEvent); + } + } + } + private void handleSourceReaderFailOver() { LOG.info( "Handling source reader fail over, mark current tiering table epoch {} as failed.", diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index bdecfa8fab..272cf8a58b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -376,6 +376,134 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { mockMissingCommittedLakeSnapshot)); } + @Test + void testCommitFailsWhenTableRecreated() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_commit_fails_when_table_recreated"); + long originalTableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + // Send write results for the first bucket + TableBucket tableBucket = new TableBucket(originalTableId, 0); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, tableBucket, 1, 1, 1L, numberOfWriteResults)); + + // Drop and recreate the table with the same path + admin.dropTable(tablePath, true).get(); + long newTableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + + // Verify that the table id has changed + assertThat(newTableId).isNotEqualTo(originalTableId); + + // Try to commit the remaining write results - should fail because table was recreated + for (int bucket = 1; bucket < numberOfWriteResults; bucket++) { + tableBucket = new TableBucket(originalTableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + bucket, + bucket, + (long) bucket, + numberOfWriteResults)); + } + + // Verify that a FailedTieringEvent was sent with the expected error message + List operatorEvents = mockOperatorEventGateway.getEventsSent(); + SourceEventWrapper sourceEventWrapper = + (SourceEventWrapper) operatorEvents.get(operatorEvents.size() - 1); + FailedTieringEvent failedTieringEvent = + (FailedTieringEvent) sourceEventWrapper.getSourceEvent(); + assertThat(failedTieringEvent.getTableId()).isEqualTo(originalTableId); + assertThat(failedTieringEvent.failReason()) + .contains("different from the table id") + .contains("dropped and recreated during tiering"); + } + + @Test + void testFailedMarkerCleansUpCollectedWriteResults() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_failed_marker"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + // Send partial write results for the table + TableBucket t1b0 = new TableBucket(tableId, 0); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, t1b0, 1, 11, 21L, numberOfWriteResults)); + + TableBucket t1b1 = new TableBucket(tableId, 1); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, t1b1, 2, 12, 22L, numberOfWriteResults)); + + // Verify no lake snapshot yet (not all write results received) + verifyNoLakeSnapshot(tablePath); + + // Now send a failure marker for the table + StreamRecord> failedMarkerRecord = + new StreamRecord<>(TableBucketWriteResult.failedMarker(tableId, "Test failure")); + committerOperator.processElement(failedMarkerRecord); + + // Verify no lake snapshot after failure marker (state should be cleaned) + verifyNoLakeSnapshot(tablePath); + + // Now send new write results for a fresh tiering round + // This verifies that the old partial state was cleaned up and doesn't interfere + for (int bucket = 0; bucket < 3; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, tableBucket, bucket + 10, bucket + 100, bucket + 1000L, 3)); + } + + // Verify lake snapshot was created with the new write results + Map expectedLogEndOffsets = new HashMap<>(); + expectedLogEndOffsets.put(new TableBucket(tableId, 0), 100L); + expectedLogEndOffsets.put(new TableBucket(tableId, 1), 101L); + expectedLogEndOffsets.put(new TableBucket(tableId, 2), 102L); + verifyLakeSnapshot(tablePath, tableId, 1, expectedLogEndOffsets); + } + + @Test + void testFailedMarkerForNonExistentTableDoesNothing() throws Exception { + // Create a table + TablePath tablePath = TablePath.of("fluss", "test_failed_marker_nonexistent"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + + // Send a failure marker for a non-existent table (no collected state) + long nonExistentTableId = tableId + 9999; + StreamRecord> failedMarkerRecord = + new StreamRecord<>( + TableBucketWriteResult.failedMarker( + nonExistentTableId, "Test failure for non-existent table")); + + // This should not throw an exception + committerOperator.processElement(failedMarkerRecord); + + // The operator should continue working normally + // Send write results for the actual table + int numberOfWriteResults = 3; + for (int bucket = 0; bucket < 3; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + bucket, + bucket, + (long) bucket, + numberOfWriteResults)); + } + + // Verify lake snapshot was created + Map expectedLogEndOffsets = new HashMap<>(); + expectedLogEndOffsets.put(new TableBucket(tableId, 0), 0L); + expectedLogEndOffsets.put(new TableBucket(tableId, 1), 1L); + expectedLogEndOffsets.put(new TableBucket(tableId, 2), 2L); + verifyLakeSnapshot(tablePath, tableId, 1, expectedLogEndOffsets); + } + private StreamRecord> createTableBucketWriteResultStreamRecord( TablePath tablePath, @@ -461,50 +589,6 @@ private void verifyLakeSnapshot( assertThat(failedTieringEvent.failReason()).contains(failedReason); } - @Test - void testCommitFailsWhenTableRecreated() throws Exception { - TablePath tablePath = TablePath.of("fluss", "test_commit_fails_when_table_recreated"); - long originalTableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); - int numberOfWriteResults = 3; - - // Send write results for the first bucket - TableBucket tableBucket = new TableBucket(originalTableId, 0); - committerOperator.processElement( - createTableBucketWriteResultStreamRecord( - tablePath, tableBucket, 1, 1, 1L, numberOfWriteResults)); - - // Drop and recreate the table with the same path - admin.dropTable(tablePath, true).get(); - long newTableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); - - // Verify that the table id has changed - assertThat(newTableId).isNotEqualTo(originalTableId); - - // Try to commit the remaining write results - should fail because table was recreated - for (int bucket = 1; bucket < numberOfWriteResults; bucket++) { - tableBucket = new TableBucket(originalTableId, bucket); - committerOperator.processElement( - createTableBucketWriteResultStreamRecord( - tablePath, - tableBucket, - bucket, - bucket, - (long) bucket, - numberOfWriteResults)); - } - - // Verify that a FailedTieringEvent was sent with the expected error message - List operatorEvents = mockOperatorEventGateway.getEventsSent(); - SourceEventWrapper sourceEventWrapper = - (SourceEventWrapper) operatorEvents.get(operatorEvents.size() - 1); - FailedTieringEvent failedTieringEvent = - (FailedTieringEvent) sourceEventWrapper.getSourceEvent(); - assertThat(failedTieringEvent.getTableId()).isEqualTo(originalTableId); - assertThat(failedTieringEvent.failReason()) - .contains("different from the table id") - .contains("dropped and recreated during tiering"); - } - private CommittedLakeSnapshot mockCommittedLakeSnapshot( long tableId, TablePath tablePath, int snapshotId, Map logEndOffsets) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java index dbb40eae17..31c9816488 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -60,6 +61,8 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { assertThat(deserializedWriteResult.getWriteResult()) .isEqualTo(testingWriteResult.getWriteResult()); assertThat(deserialized.numberOfWriteResults()).isEqualTo(20); + assertThat(deserialized.isFailedMarker()).isFalse(); + assertThat(deserialized.failReason()).isNull(); // verify when writeResult is null tableBucketWriteResult = @@ -74,5 +77,48 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { assertThat(deserialized.partitionName()).isEqualTo(partitionName); assertThat(deserialized.writeResult()).isNull(); assertThat(deserialized.numberOfWriteResults()).isEqualTo(30); + assertThat(deserialized.isFailedMarker()).isFalse(); + } + + @Test + void testSerializeAndDeserializeFailedMarker() throws Exception { + long tableId = 12345L; + String failReason = "Test failure reason"; + + TableBucketWriteResult failedMarker = + TableBucketWriteResult.failedMarker(tableId, failReason); + + // test serialize and deserialize + byte[] serialized = tableBucketWriteResultSerializer.serialize(failedMarker); + TableBucketWriteResult deserialized = + tableBucketWriteResultSerializer.deserialize( + tableBucketWriteResultSerializer.getVersion(), serialized); + + assertThat(deserialized.isFailedMarker()).isTrue(); + assertThat(deserialized.tableBucket().getTableId()).isEqualTo(tableId); + assertThat(deserialized.failReason()).isEqualTo(failReason); + + // verify other fields are set to expected values for failed markers + assertThat(deserialized.tablePath()).isNull(); + assertThat(deserialized.writeResult()).isNull(); + assertThat(deserialized.tableBucket().getBucket()).isEqualTo(-1); + } + + @Test + void testSerializeAndDeserializeFailedMarkerWithNullReason() throws Exception { + long tableId = 67890L; + + TableBucketWriteResult failedMarker = + TableBucketWriteResult.failedMarker(tableId, null); + + // test serialize and deserialize + byte[] serialized = tableBucketWriteResultSerializer.serialize(failedMarker); + TableBucketWriteResult deserialized = + tableBucketWriteResultSerializer.deserialize( + tableBucketWriteResultSerializer.getVersion(), serialized); + + assertThat(deserialized.isFailedMarker()).isTrue(); + assertThat(deserialized.tableBucket().getTableId()).isEqualTo(tableId); + assertThat(deserialized.failReason()).isNull(); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index 15f62f3d1b..f545e80d49 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; +import org.apache.fluss.flink.tiering.event.TableTieringFailedEvent; import org.apache.fluss.flink.tiering.source.TieringTestBase; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; @@ -31,6 +32,7 @@ import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket; import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -594,6 +596,22 @@ void testHandleFailedTieringTableEvent() throws Throwable { context.getSplitsAssignmentSequence().clear(); enumerator.handleSourceEvent(1, new FailedTieringEvent(tableId, "test_reason")); + // verify failure event is broadcast to other readers + Map> sentSourceEvents = context.getSentSourceEvent(); + assertThat(sentSourceEvents).hasSize(numSubtasks - 1); + for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { + if (subtaskId == 1) { + assertThat(sentSourceEvents).doesNotContainKey(subtaskId); + continue; + } + List sourceEvents = sentSourceEvents.get(subtaskId); + assertThat(sourceEvents).hasSize(1); + SourceEvent sourceEvent = sourceEvents.get(0); + TableTieringFailedEvent failedEvent = (TableTieringFailedEvent) sourceEvent; + assertThat(failedEvent.getTableId()).isEqualTo(tableId); + assertThat(failedEvent.getFailReason()).isEqualTo("test_reason"); + } + // request tiering table splits, should get splits for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);