From 654a596485efe89996dd1a9ed8bc8d8cc5de0b67 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Mon, 15 Dec 2025 10:16:50 +0530 Subject: [PATCH 1/4] Fixes statement name generation for CP Flink --- internal/flink/command_statement_create_onprem.go | 2 +- pkg/flink/internal/store/store_onprem.go | 2 +- pkg/flink/types/statement.go | 13 +++++++++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/flink/command_statement_create_onprem.go b/internal/flink/command_statement_create_onprem.go index 985421ea5f..d6405eff7a 100644 --- a/internal/flink/command_statement_create_onprem.go +++ b/internal/flink/command_statement_create_onprem.go @@ -49,7 +49,7 @@ func (c *command) newStatementCreateCommandOnPrem() *cobra.Command { func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error { // Flink statement name can be automatically generated or provided by the user - name := types.GenerateStatementName() + name := types.GenerateStatementNameForOnPrem() if len(args) == 1 { name = args[0] } diff --git a/pkg/flink/internal/store/store_onprem.go b/pkg/flink/internal/store/store_onprem.go index eb9526c66f..f7c8185cda 100644 --- a/pkg/flink/internal/store/store_onprem.go +++ b/pkg/flink/internal/store/store_onprem.go @@ -60,7 +60,7 @@ func (s *StoreOnPrem) ProcessStatement(statement string) (*types.ProcessedStatem return result, sErr } - statementName := s.Properties.GetOrDefault(config.KeyStatementName, types.GenerateStatementName()) + statementName := s.Properties.GetOrDefault(config.KeyStatementName, types.GenerateStatementNameForOnPrem()) if len(statementName) > 45 { // on-prem name length limit statementName = statementName[0:45] } diff --git a/pkg/flink/types/statement.go b/pkg/flink/types/statement.go index 7b395dbd0d..5c7ebbea35 100644 --- a/pkg/flink/types/statement.go +++ b/pkg/flink/types/statement.go @@ -1,6 +1,8 @@ package types import ( + "crypto/rand" + "encoding/hex" "fmt" "time" @@ -14,3 +16,14 @@ func GenerateStatementName() string { id := uuid.New().String() return fmt.Sprintf("%s-%s-%s-%s", clientName, date, localTime, id) } + +func GenerateStatementNameForOnPrem() string { + clientName := "cli" + date := time.Now().Format("20060102") // 8 chars + localTime := time.Now().Format("150405") // 6 chars + // 12 random bytes => 24 hex chars + b := make([]byte, 12) + _, _ = rand.Read(b) + randomHex := hex.EncodeToString(b) + return fmt.Sprintf("%s-%s-%s-%s", clientName, date, localTime, randomHex) +} From a934b7c1281760f617b2c1ec12c19f17d5516222 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Mon, 15 Dec 2025 10:33:27 +0530 Subject: [PATCH 2/4] Addresses Copilot's review comments --- pkg/flink/types/statement.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/flink/types/statement.go b/pkg/flink/types/statement.go index 5c7ebbea35..4322fd8626 100644 --- a/pkg/flink/types/statement.go +++ b/pkg/flink/types/statement.go @@ -19,11 +19,14 @@ func GenerateStatementName() string { func GenerateStatementNameForOnPrem() string { clientName := "cli" - date := time.Now().Format("20060102") // 8 chars - localTime := time.Now().Format("150405") // 6 chars + timeNow := time.Now() + date := timeNow.Format("20060102") // 8 chars + localTime := timeNow.Format("150405") // 6 chars // 12 random bytes => 24 hex chars b := make([]byte, 12) - _, _ = rand.Read(b) + if _, err := rand.Read(b); err != nil { + panic(fmt.Sprintf("unable to generate random bytes for statment name: %v", err)) + } randomHex := hex.EncodeToString(b) return fmt.Sprintf("%s-%s-%s-%s", clientName, date, localTime, randomHex) } From 5140dc484380faa763ec9d26d93ffbc6d7ba8f24 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Mon, 15 Dec 2025 11:57:22 +0530 Subject: [PATCH 3/4] Fix linting error --- pkg/flink/types/statement.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/flink/types/statement.go b/pkg/flink/types/statement.go index 4322fd8626..73cb9870a9 100644 --- a/pkg/flink/types/statement.go +++ b/pkg/flink/types/statement.go @@ -25,7 +25,7 @@ func GenerateStatementNameForOnPrem() string { // 12 random bytes => 24 hex chars b := make([]byte, 12) if _, err := rand.Read(b); err != nil { - panic(fmt.Sprintf("unable to generate random bytes for statment name: %v", err)) + panic(fmt.Sprintf("unable to generate random bytes for statement name: %v", err)) } randomHex := hex.EncodeToString(b) return fmt.Sprintf("%s-%s-%s-%s", clientName, date, localTime, randomHex) From fbdbd165f7bc83f34b346e17b19ec6b260421e3d Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Fri, 2 Jan 2026 11:22:30 +0530 Subject: [PATCH 4/4] Address review comments --- pkg/flink/internal/store/store_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/flink/internal/store/store_test.go b/pkg/flink/internal/store/store_test.go index 699cae8218..a6d612f19f 100644 --- a/pkg/flink/internal/store/store_test.go +++ b/pkg/flink/internal/store/store_test.go @@ -55,6 +55,13 @@ func (s *StoreTestSuite) TestGenerateStatementName() { } } +func (s *StoreTestSuite) TestGenerateStatementNameForOnPrem() { + statementRegex := `^cli-\d{8}-\d{6}-[a-f0-9]{24}$` + for i := 0; i < 10; i++ { + s.Require().Regexp(statementRegex, types.GenerateStatementNameForOnPrem()) + } +} + func TestStoreProcessLocalStatement(t *testing.T) { // Create new stores stores := make([]types.StoreInterface, 2)