From 6c130f4ab2c3d35b69904ffdd07057b567d579e5 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Wed, 25 Feb 2026 20:20:34 -0800 Subject: [PATCH 1/2] Fix --- .../org/apache/texera/web/TexeraWebApplication.scala | 4 ++++ .../apache/texera/service/util/LargeBinaryManager.scala | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala b/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala index 4264a9ca180..8e81f465fdc 100644 --- a/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala +++ b/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala @@ -27,6 +27,7 @@ import io.dropwizard.setup.{Bootstrap, Environment} import io.dropwizard.websockets.WebsocketBundle import org.apache.texera.amber.config.StorageConfig import org.apache.texera.amber.engine.common.Utils +import org.apache.texera.service.util.LargeBinaryManager import org.apache.texera.amber.util.ObjectMapperUtils import org.apache.texera.auth.SessionUser import org.apache.texera.dao.SqlServer @@ -104,6 +105,9 @@ class TexeraWebApplication StorageConfig.jdbcPassword ) + // ensure the large-binary S3 bucket exists before any workflow execution attempts to use it + LargeBinaryManager.initialize() + // redirect all 404 to index page, according to Angular routing requirements val eph = new ErrorPageErrorHandler eph.addErrorPage(404, "/") diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 211d7d3b757..7886328fd56 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -32,6 +32,13 @@ import java.util.UUID object LargeBinaryManager extends LazyLogging { private val DEFAULT_BUCKET = "texera-large-binaries" + /** + * Ensures the large-binary bucket exists. Should be called once at service startup. + */ + def initialize(): Unit = { + S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) + } + /** * Creates a new LargeBinary reference. * The actual data upload happens separately via LargeBinaryOutputStream. @@ -39,8 +46,6 @@ object LargeBinaryManager extends LazyLogging { * @return S3 URI string for the new LargeBinary (format: s3://bucket/key) */ def create(): String = { - S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) - val objectKey = s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}" val uri = s"s3://$DEFAULT_BUCKET/$objectKey" From 9d39bd9fce41ad0790eb70ae98c898ff2dd1126e Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 19 Mar 2026 10:22:10 -0700 Subject: [PATCH 2/2] Address comments --- .../org/apache/texera/web/TexeraWebApplication.scala | 4 ---- .../apache/texera/service/util/LargeBinaryManager.scala | 9 +-------- .../scala/org/apache/texera/service/FileService.scala | 3 +++ 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala b/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala index 8e81f465fdc..4264a9ca180 100644 --- a/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala +++ b/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala @@ -27,7 +27,6 @@ import io.dropwizard.setup.{Bootstrap, Environment} import io.dropwizard.websockets.WebsocketBundle import org.apache.texera.amber.config.StorageConfig import org.apache.texera.amber.engine.common.Utils -import org.apache.texera.service.util.LargeBinaryManager import org.apache.texera.amber.util.ObjectMapperUtils import org.apache.texera.auth.SessionUser import org.apache.texera.dao.SqlServer @@ -105,9 +104,6 @@ class TexeraWebApplication StorageConfig.jdbcPassword ) - // ensure the large-binary S3 bucket exists before any workflow execution attempts to use it - LargeBinaryManager.initialize() - // redirect all 404 to index page, according to Angular routing requirements val eph = new ErrorPageErrorHandler eph.addErrorPage(404, "/") diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 7886328fd56..b23edb7ae92 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -30,14 +30,7 @@ import java.util.UUID * normal tuple size limits. */ object LargeBinaryManager extends LazyLogging { - private val DEFAULT_BUCKET = "texera-large-binaries" - - /** - * Ensures the large-binary bucket exists. Should be called once at service startup. - */ - def initialize(): Unit = { - S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) - } + val DEFAULT_BUCKET: String = "texera-large-binaries" /** * Creates a new LargeBinary reference. diff --git a/file-service/src/main/scala/org/apache/texera/service/FileService.scala b/file-service/src/main/scala/org/apache/texera/service/FileService.scala index a92eaca511a..37a6f3f7e8d 100644 --- a/file-service/src/main/scala/org/apache/texera/service/FileService.scala +++ b/file-service/src/main/scala/org/apache/texera/service/FileService.scala @@ -37,6 +37,7 @@ import org.apache.texera.service.resource.{ HealthCheckResource } import org.apache.texera.service.util.S3StorageClient +import org.apache.texera.service.util.LargeBinaryManager import org.eclipse.jetty.server.session.SessionHandler import java.nio.file.Path @@ -63,6 +64,8 @@ class FileService extends Application[FileServiceConfiguration] with LazyLogging // check if the texera dataset bucket exists, if not create it S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName) + // ensure the large-binary S3 bucket exists before any workflow execution attempts to use it + S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET) // check if we can connect to the lakeFS service LakeFSStorageClient.healthCheck()