Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ public void open() {
public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
throws Exception {
TableBucketWriteResult<WriteResult> tableBucketWriteResult = streamRecord.getValue();

// Check if this is a failure marker from upstream Reader
if (tableBucketWriteResult.isFailedMarker()) {
long failedTableId = tableBucketWriteResult.tableBucket().getTableId();
String failReason = tableBucketWriteResult.failReason();
LOG.info(
"Received failure marker for table {}. Reason: {}. "
+ "Cleaning up collected write results for this table.",
failedTableId,
failReason);

// Clean up any partially collected write results for the failed table
List<TableBucketWriteResult<WriteResult>> removedResults =
collectedTableBucketWriteResults.remove(failedTableId);
if (removedResults != null) {
LOG.info(
"Cleaned up {} collected write results for failed table {}.",
removedResults.size(),
failedTableId);
}
return;
}

TableBucket tableBucket = tableBucketWriteResult.tableBucket();
long tableId = tableBucket.getTableId();
registerTableBucketWriteResult(tableId, tableBucketWriteResult);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.event;

import org.apache.flink.api.connector.source.SourceEvent;

/**
* SourceEvent used by TieringSourceEnumerator to notify all TieringSourceReaders that a specific
* table's tiering has failed. Upon receiving this event, the readers should clear their internal
* state for the failed table and continue processing other tables.
*/
public class TableTieringFailedEvent implements SourceEvent {

private static final long serialVersionUID = 1L;

private final long tableId;

private final String failReason;

public TableTieringFailedEvent(long tableId, String failReason) {
this.tableId = tableId;
this.failReason = failReason;
}

public long getTableId() {
return tableId;
}

public String getFailReason() {
return failReason;
}

@Override
public String toString() {
return "TableTieringFailedEvent{"
+ "tableId="
+ tableId
+ ", failReason='"
+ failReason
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
@Nullable private final String partitionName;

// will be null when no any data write, such as for tiering an empty log split
// or when this is a failure marker
@Nullable private final WriteResult writeResult;

// the end offset of tiering, should be the last tiered record's offset + 1
Expand All @@ -57,6 +58,12 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
// for the round of tiering is finished
private final int numberOfWriteResults;

// indicates whether this is a failure marker rather than an actual write result
private final boolean failedMarker;

// the reason for failure when this is a failure marker
@Nullable private final String failReason;

public TableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
Expand All @@ -65,13 +72,53 @@ public TableBucketWriteResult(
long logEndOffset,
long maxTimestamp,
int numberOfWriteResults) {
this(
tablePath,
tableBucket,
partitionName,
writeResult,
logEndOffset,
maxTimestamp,
numberOfWriteResults,
false,
null);
}

private TableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partitionName,
@Nullable WriteResult writeResult,
long logEndOffset,
long maxTimestamp,
int numberOfWriteResults,
boolean failedMarker,
@Nullable String failReason) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.partitionName = partitionName;
this.writeResult = writeResult;
this.logEndOffset = logEndOffset;
this.maxTimestamp = maxTimestamp;
this.numberOfWriteResults = numberOfWriteResults;
this.failedMarker = failedMarker;
this.failReason = failReason;
}

/**
* Creates a failure marker result that indicates tiering for a table has failed.
*
* @param tableId the ID of the failed table
* @param failReason the reason for the failure
* @param <WriteResult> the type of write result
* @return a failure marker TableBucketWriteResult
*/
public static <WriteResult> TableBucketWriteResult<WriteResult> failedMarker(
long tableId, String failReason) {
// Use a dummy TableBucket with only tableId for the failure marker
TableBucket dummyBucket = new TableBucket(tableId, -1);
return new TableBucketWriteResult<>(
null, dummyBucket, null, null, -1, -1, -1, true, failReason);
}

public TablePath tablePath() {
Expand Down Expand Up @@ -103,4 +150,13 @@ public long logEndOffset() {
public long maxTimestamp() {
return maxTimestamp;
}

public boolean isFailedMarker() {
return failedMarker;
}

@Nullable
public String failReason() {
return failReason;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TableBucketWriteResultSerializer<WriteResult>
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int CURRENT_VERSION = 1;
private static final int CURRENT_VERSION = 2;

private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer<WriteResult>
writeResultSerializer;
Expand All @@ -53,44 +53,58 @@ public int getVersion() {
public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResult)
throws IOException {
final DataOutputSerializer out = SERIALIZER_CACHE.get();
// serialize table path
TablePath tablePath = tableBucketWriteResult.tablePath();
out.writeUTF(tablePath.getDatabaseName());
out.writeUTF(tablePath.getTableName());

// serialize bucket
TableBucket tableBucket = tableBucketWriteResult.tableBucket();
out.writeLong(tableBucket.getTableId());
// write partition
if (tableBucket.getPartitionId() != null) {
out.writeBoolean(true);
out.writeLong(tableBucket.getPartitionId());
out.writeUTF(tableBucketWriteResult.partitionName());
} else {
out.writeBoolean(false);
}
out.writeInt(tableBucket.getBucket());

// serialize write result
WriteResult writeResult = tableBucketWriteResult.writeResult();
if (writeResult == null) {
// write -1 to mark write result as null
out.writeInt(-1);
// serialize failed marker flag first
out.writeBoolean(tableBucketWriteResult.isFailedMarker());

if (tableBucketWriteResult.isFailedMarker()) {
// for failed marker, only serialize tableId and failReason
out.writeLong(tableBucketWriteResult.tableBucket().getTableId());
String failReason = tableBucketWriteResult.failReason();
out.writeBoolean(failReason != null);
if (failReason != null) {
out.writeUTF(failReason);
}
} else {
byte[] serializeBytes = writeResultSerializer.serialize(writeResult);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
// serialize table path
TablePath tablePath = tableBucketWriteResult.tablePath();
out.writeUTF(tablePath.getDatabaseName());
out.writeUTF(tablePath.getTableName());

// serialize bucket
TableBucket tableBucket = tableBucketWriteResult.tableBucket();
out.writeLong(tableBucket.getTableId());
// write partition
if (tableBucket.getPartitionId() != null) {
out.writeBoolean(true);
out.writeLong(tableBucket.getPartitionId());
out.writeUTF(tableBucketWriteResult.partitionName());
} else {
out.writeBoolean(false);
}
out.writeInt(tableBucket.getBucket());

// serialize write result
WriteResult writeResult = tableBucketWriteResult.writeResult();
if (writeResult == null) {
// write -1 to mark write result as null
out.writeInt(-1);
} else {
byte[] serializeBytes = writeResultSerializer.serialize(writeResult);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
}

// serialize log end offset
out.writeLong(tableBucketWriteResult.logEndOffset());

// serialize max timestamp
out.writeLong(tableBucketWriteResult.maxTimestamp());

// serialize number of write results
out.writeInt(tableBucketWriteResult.numberOfWriteResults());
}

// serialize log end offset
out.writeLong(tableBucketWriteResult.logEndOffset());

// serialize max timestamp
out.writeLong(tableBucketWriteResult.maxTimestamp());

// serialize number of write results
out.writeInt(tableBucketWriteResult.numberOfWriteResults());

final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
Expand All @@ -99,9 +113,17 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu
@Override
public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] serialized)
throws IOException {
if (version != CURRENT_VERSION) {
if (version == 1) {
return deserializeV1(serialized);
} else if (version == CURRENT_VERSION) {
return deserializeV2(serialized);
} else {
throw new IOException("Unknown version " + version);
}
}

private TableBucketWriteResult<WriteResult> deserializeV1(byte[] serialized)
throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
// deserialize table path
String databaseName = in.readUTF();
Expand All @@ -125,7 +147,7 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
if (writeResultLength >= 0) {
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult = writeResultSerializer.deserialize(version, writeResultBytes);
writeResult = writeResultSerializer.deserialize(1, writeResultBytes);
} else {
writeResult = null;
}
Expand All @@ -145,4 +167,64 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
maxTimestamp,
numberOfWriteResults);
}

private TableBucketWriteResult<WriteResult> deserializeV2(byte[] serialized)
throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);

// read failed marker flag
boolean isFailedMarker = in.readBoolean();

if (isFailedMarker) {
// deserialize failed marker
long tableId = in.readLong();
String failReason = null;
if (in.readBoolean()) {
failReason = in.readUTF();
}
return TableBucketWriteResult.failedMarker(tableId, failReason);
} else {
// deserialize table path
String databaseName = in.readUTF();
String tableName = in.readUTF();
TablePath tablePath = new TablePath(databaseName, tableName);

// deserialize bucket
long tableId = in.readLong();
Long partitionId = null;
String partitionName = null;
if (in.readBoolean()) {
partitionId = in.readLong();
partitionName = in.readUTF();
}
int bucketId = in.readInt();
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);

// deserialize write result
int writeResultLength = in.readInt();
WriteResult writeResult;
if (writeResultLength >= 0) {
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult = writeResultSerializer.deserialize(CURRENT_VERSION, writeResultBytes);
} else {
writeResult = null;
}

// deserialize log end offset
long logEndOffset = in.readLong();
// deserialize max timestamp
long maxTimestamp = in.readLong();
// deserialize number of write results
int numberOfWriteResults = in.readInt();
return new TableBucketWriteResult<>(
tablePath,
tableBucket,
partitionName,
writeResult,
logEndOffset,
maxTimestamp,
numberOfWriteResults);
}
}
}
Loading