Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ void fetchOnce() throws Exception {
(bytes, throwable) -> {
if (throwable != null) {
LOG.error(
"Failed to download remote log segment file {}.",
"Failed to download remote log segment file {} for bucket {}.",
fsPathAndFileName.getFileName(),
request.segment.tableBucket(),
ExceptionUtils.stripExecutionException(throwable));
// release the semaphore for the failed request
prefetchSemaphore.release();
Expand All @@ -178,8 +179,9 @@ void fetchOnce() throws Exception {
scannerMetricGroup.remoteFetchErrorCount().inc();
} else {
LOG.info(
"Successfully downloaded remote log segment file {} to local cost {} ms.",
"Successfully downloaded remote log segment file {} for bucket {} to local cost {} ms.",
fsPathAndFileName.getFileName(),
request.segment.tableBucket(),
System.currentTimeMillis() - startTime);
File localFile =
new File(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
DATA1_TABLE_INFO.getNumBuckets(),
DATA1_TABLE_INFO.getProperties(),
DATA1_TABLE_INFO.getCustomProperties(),
DATA1_TABLE_INFO.getRemoteDataDir().orElse(null),
DATA1_TABLE_INFO.getComment().orElse(null),
DATA1_TABLE_INFO.getCreatedTime(),
DATA1_TABLE_INFO.getModifiedTime()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,44 @@ public class ConfigOptions {
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
+ " in a Fluss supported filesystem.");

public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
key("remote.data.dirs")
.stringType()
.asList()
.defaultValues()
.withDescription(
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
+ " in a Fluss supported filesystem. "
+ "This is a list of remote data directory paths. "
+ "Example: `remote.data.dirs: oss://bucket1/fluss-remote-data, oss://bucket2/fluss-remote-data`.");

public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
key("remote.data.dirs.strategy")
.enumType(RemoteDataDirStrategy.class)
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
.withDescription(
"The strategy for selecting the remote data directory from `"
+ REMOTE_DATA_DIRS.key()
+ "`.");

public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
key("remote.data.dirs.weights")
.intType()
.asList()
.defaultValues()
.withDescription(
"The weights of the remote data directories. "
+ "This is a list of weights corresponding to the `"
+ REMOTE_DATA_DIRS.key()
+ "` in the same order. When `"
+ REMOTE_DATA_DIRS_STRATEGY.key()
+ "` is set to `"
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
+ "`, this must be configured, and its size must be equal to `"
+ REMOTE_DATA_DIRS.key()
+ "`; otherwise, it will be ignored."
+ "Example: `remote.data.dir.weights: 1, 2`");

public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
key("remote.fs.write-buffer-size")
.memoryType()
Expand Down Expand Up @@ -2058,4 +2096,10 @@ private static class ConfigOptionsHolder {
public static ConfigOption<?> getConfigOption(String key) {
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
}

/** Remote data dir select strategy for Fluss. */
public enum RemoteDataDirStrategy {
ROUND_ROBIN,
WEIGHTED_ROUND_ROBIN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.IllegalConfigurationException;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Utilities of Fluss {@link ConfigOptions}. */
@Internal
Expand Down Expand Up @@ -76,4 +78,97 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
}
return options;
}

public static void validateCoordinatorConfigs(Configuration conf) {
validServerConfigs(conf);

if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal 1.",
ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()));
}

if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal 1.",
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
}

if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal 1.",
ConfigOptions.SERVER_IO_POOL_SIZE.key()));
}

// validate remote.data.dirs
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
if (remoteDataDirs.size() != weights.size()) {
throw new IllegalConfigurationException(
String.format(
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
ConfigOptions.REMOTE_DATA_DIRS.key(),
remoteDataDirs.size(),
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.size()));
}
// validate all weights are positive
for (int i = 0; i < weights.size(); i++) {
if (weights.get(i) < 0) {
throw new IllegalConfigurationException(
String.format(
"All weights in '%s' must be no less than 0, but found %d at index %d.",
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.get(i),
i));
}
}
}
}
}

public static void validateTabletConfigs(Configuration conf) {
validServerConfigs(conf);

Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
if (!serverId.isPresent()) {
throw new IllegalConfigurationException(
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
}

if (serverId.get() < 0) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal 0.",
ConfigOptions.TABLET_SERVER_ID.key()));
}

if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal 1.",
ConfigOptions.BACKGROUND_THREADS.key()));
}

if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be less than or equal %d bytes.",
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
}
}

private static void validServerConfigs(Configuration conf) {
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
throw new IllegalConfigurationException(
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package org.apache.fluss.metadata;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.fs.FsPath;

import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Optional;

/**
* Information of a partition metadata, includes the partition's name and the partition id that
Expand All @@ -31,10 +35,17 @@
public class PartitionInfo {
private final long partitionId;
private final ResolvedPartitionSpec partitionSpec;
private final @Nullable FsPath remoteDataDir;

public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
this(partitionId, partitionSpec, null);
}

public PartitionInfo(
long partitionId, ResolvedPartitionSpec partitionSpec, @Nullable FsPath remoteDataDir) {
this.partitionId = partitionId;
this.partitionSpec = partitionSpec;
this.remoteDataDir = remoteDataDir;
}

/** Get the partition id. The id is globally unique in the Fluss cluster. */
Expand All @@ -58,6 +69,10 @@ public PartitionSpec getPartitionSpec() {
return partitionSpec.toPartitionSpec();
}

public Optional<FsPath> getRemoteDataDir() {
return Optional.ofNullable(remoteDataDir);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -67,16 +82,25 @@ public boolean equals(Object o) {
return false;
}
PartitionInfo that = (PartitionInfo) o;
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
return partitionId == that.partitionId
&& Objects.equals(partitionSpec, that.partitionSpec)
&& Objects.equals(remoteDataDir, that.remoteDataDir);
}

@Override
public int hashCode() {
return Objects.hash(partitionId, partitionSpec);
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
}

@Override
public String toString() {
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
return "Partition{name='"
+ getPartitionName()
+ '\''
+ ", id="
+ partitionId
+ ", remoteDataDir="
+ remoteDataDir
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -59,6 +60,7 @@ public final class TableInfo {
private final Configuration properties;
private final TableConfig tableConfig;
private final Configuration customProperties;
private final @Nullable FsPath remoteDataDir;
private final @Nullable String comment;

private final long createdTime;
Expand All @@ -74,6 +76,7 @@ public TableInfo(
int numBuckets,
Configuration properties,
Configuration customProperties,
@Nullable FsPath remoteDataDir,
@Nullable String comment,
long createdTime,
long modifiedTime) {
Expand All @@ -90,6 +93,7 @@ public TableInfo(
this.properties = properties;
this.tableConfig = new TableConfig(properties);
this.customProperties = customProperties;
this.remoteDataDir = remoteDataDir;
this.comment = comment;
this.createdTime = createdTime;
this.modifiedTime = modifiedTime;
Expand Down Expand Up @@ -263,6 +267,11 @@ public Configuration getCustomProperties() {
return customProperties;
}

/** Returns the remote data directory of the table. */
public Optional<FsPath> getRemoteDataDir() {
return Optional.ofNullable(remoteDataDir);
}

/** Returns the comment/description of the table. */
public Optional<String> getComment() {
return Optional.ofNullable(comment);
Expand Down Expand Up @@ -308,12 +317,23 @@ public TableDescriptor toTableDescriptor() {
.build();
}

public static TableInfo of(
TablePath tablePath,
long tableId,
int schemaId,
TableDescriptor tableDescriptor,
long createdTime,
long modifiedTime) {
return of(tablePath, tableId, schemaId, tableDescriptor, null, createdTime, modifiedTime);
}

/** Utility to create a {@link TableInfo} from a {@link TableDescriptor} and other metadata. */
public static TableInfo of(
TablePath tablePath,
long tableId,
int schemaId,
TableDescriptor tableDescriptor,
String remoteDataDir,
long createdTime,
long modifiedTime) {
Schema schema = tableDescriptor.getSchema();
Expand All @@ -335,6 +355,7 @@ public static TableInfo of(
numBuckets,
Configuration.fromMap(tableDescriptor.getProperties()),
Configuration.fromMap(tableDescriptor.getCustomProperties()),
remoteDataDir == null ? null : new FsPath(remoteDataDir),
tableDescriptor.getComment().orElse(null),
createdTime,
modifiedTime);
Expand All @@ -358,6 +379,7 @@ public boolean equals(Object o) {
&& Objects.equals(partitionKeys, that.partitionKeys)
&& Objects.equals(properties, that.properties)
&& Objects.equals(customProperties, that.customProperties)
&& Objects.equals(remoteDataDir, that.remoteDataDir)
&& Objects.equals(comment, that.comment);
}

Expand All @@ -376,6 +398,7 @@ public int hashCode() {
numBuckets,
properties,
customProperties,
remoteDataDir,
comment);
}

Expand All @@ -402,6 +425,8 @@ public String toString() {
+ properties
+ ", customProperties="
+ customProperties
+ ", remoteDataDir="
+ remoteDataDir
+ ", comment='"
+ comment
+ '\''
Expand Down
Loading