Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 122 additions & 31 deletions internal/flink/command_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ func AddConnectionSecretFlags(cmd *cobra.Command) {
cmd.Flags().String("service-key", "", fmt.Sprintf("Specify service key for the type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["service-key"], "or")))
cmd.Flags().String("username", "", fmt.Sprintf("Specify username for the type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["username"], "or")))
cmd.Flags().String("password", "", fmt.Sprintf("Specify password for the type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["password"], "or")))
cmd.Flags().String("auth-type", "", fmt.Sprintf("Specify authentication type for the type : %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["auth-type"], "or")))
cmd.Flags().String("bearer-token", "", fmt.Sprintf("Specify bearer token for BEARER authentication type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["bearer-token"], "or")))
cmd.Flags().String("oauth2-token-endpoint", "", fmt.Sprintf("Specify oauth2 token endpoint: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-token-endpoint"], "or")))
cmd.Flags().String("oauth2-client-id", "", fmt.Sprintf("Specify oauth2 client id: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-client-id"], "or")))
cmd.Flags().String("oauth2-client-secret", "", fmt.Sprintf("Specify oauth2 client secret: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-client-secret"], "or")))
cmd.Flags().String("oauth2-scope", "", fmt.Sprintf("Specify oauth2 scope: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-scope"], "or")))
}

func validateConnectionType(connectionType string) error {
Expand All @@ -100,54 +106,139 @@ func validateConnectionSecrets(cmd *cobra.Command, connectionType string) (map[s
var connectionSecrets []string
connectionSecrets = append(connectionSecrets, flink.ConnectionTypeSecretMapping[connectionType]...)

for key := range flink.ConnectionSecretTypeMapping {
secret, err := cmd.Flags().GetString(key)
secretMap := map[string]string{}
var requiredSecretKeys []string
var optionalSecretKeys []string

dynamicKey, hasDynamicKey := flink.ConnectionTypeDynamicKeyMapping[connectionType]
if hasDynamicKey {
dynamicKeyValue, err := cmd.Flags().GetString(dynamicKey)
if err != nil {
return nil, err
}
if secret != "" && !slices.Contains(connectionSecrets, key) {
return nil, errors.NewErrorWithSuggestions(fmt.Sprintf("%s is invalid for connection %s.", key, connectionType), fmt.Sprintf("Valid secret types are %s.", utils.ArrayToCommaDelimitedString(connectionSecrets, "or")))
if dynamicKeyValue == "" {
return nil, fmt.Errorf("must provide %s for connection %s", dynamicKey, connectionType)
}
}

requiredSecretKeys := flink.ConnectionRequiredSecretMapping[connectionType]
var optionalSecretKeys []string
for _, secretKey := range flink.ConnectionTypeSecretMapping[connectionType] {
if !slices.Contains(requiredSecretKeys, secretKey) {
optionalSecretKeys = append(optionalSecretKeys, secretKey)
backendKey, ok := flink.ConnectionSecretBackendKeyMapping[dynamicKey]
if !ok {
return nil, fmt.Errorf(`backend key not found for "%s"`, dynamicKey)
}
secretMap[backendKey] = dynamicKeyValue

requiredSecretKeys, exists := flink.ConnectionDynamicRequiredSecretMapping[connectionType][dynamicKeyValue]
if !exists {
validTypes := make([]string, 0, len(flink.ConnectionDynamicRequiredSecretMapping[connectionType]))
for k := range flink.ConnectionDynamicRequiredSecretMapping[connectionType] {
validTypes = append(validTypes, k)
}
return nil, errors.NewErrorWithSuggestions(
fmt.Sprintf("invalid %s %s for connection %s", dynamicKey, dynamicKeyValue, connectionType),
fmt.Sprintf("Valid types are %s.", utils.ArrayToCommaDelimitedString(validTypes, "or")),
)
}
}

secretMap := map[string]string{}
for _, requiredKey := range requiredSecretKeys {
secret, err := cmd.Flags().GetString(requiredKey)
if err != nil {
return nil, err
allPossibleKeys, exists := flink.ConnectionDynamicSecretMapping[connectionType][dynamicKeyValue]
if exists {
connectionSecrets = append(connectionSecrets, allPossibleKeys...)
}

for key := range flink.ConnectionSecretTypeMapping {
secret, err := cmd.Flags().GetString(key)
if err != nil {
return nil, err
}
if secret != "" && !slices.Contains(connectionSecrets, key) {
return nil, errors.NewErrorWithSuggestions(
fmt.Sprintf("%s is invalid for connection %s with %s %s.", key, connectionType, dynamicKey, dynamicKeyValue),
fmt.Sprintf("Valid secret types are %s.", utils.ArrayToCommaDelimitedString(connectionSecrets, "or")),
)
}
}
if secret == "" {
return nil, fmt.Errorf("must provide %s for type %s", requiredKey, connectionType)

for _, secretKey := range allPossibleKeys {
if !slices.Contains(requiredSecretKeys, secretKey) {
optionalSecretKeys = append(optionalSecretKeys, secretKey)
}
}
backendKey, ok := flink.ConnectionSecretBackendKeyMapping[requiredKey]
if !ok {
return nil, fmt.Errorf(`backend key not found for "%s"`, requiredKey)

for _, requiredKey := range requiredSecretKeys {
secret, err := cmd.Flags().GetString(requiredKey)
if err != nil {
return nil, err
}
if secret == "" {
return nil, fmt.Errorf("must provide %s for %s %s on connection %s", requiredKey, dynamicKey, dynamicKeyValue, connectionType)
}
backendKey, ok := flink.ConnectionSecretBackendKeyMapping[requiredKey]
if !ok {
return nil, fmt.Errorf(`backend key not found for "%s"`, requiredKey)
}
secretMap[backendKey] = secret
}
secretMap[backendKey] = secret
}

for _, optionalSecretKey := range optionalSecretKeys {
secret, err := cmd.Flags().GetString(optionalSecretKey)
if err != nil {
return nil, err
for _, optionalKey := range optionalSecretKeys {
secret, err := cmd.Flags().GetString(optionalKey)
if err != nil {
return nil, err
}
if secret != "" {
backendKey, ok := flink.ConnectionSecretBackendKeyMapping[optionalKey]
if !ok {
return nil, fmt.Errorf(`backend key not found for "%s"`, optionalKey)
}
secretMap[backendKey] = secret
}
}
} else {
for key := range flink.ConnectionSecretTypeMapping {
secret, err := cmd.Flags().GetString(key)
if err != nil {
return nil, err
}
if secret != "" && !slices.Contains(connectionSecrets, key) {
return nil, errors.NewErrorWithSuggestions(
fmt.Sprintf("%s is invalid for connection %s.", key, connectionType),
fmt.Sprintf("Valid secret types are %s.", utils.ArrayToCommaDelimitedString(connectionSecrets, "or")),
)
}
}

backendKey, ok := flink.ConnectionSecretBackendKeyMapping[optionalSecretKey]
if !ok {
return nil, fmt.Errorf("backend key not found for %s", optionalSecretKey)
requiredSecretKeys = flink.ConnectionRequiredSecretMapping[connectionType]
for _, secretKey := range flink.ConnectionTypeSecretMapping[connectionType] {
if !slices.Contains(requiredSecretKeys, secretKey) {
optionalSecretKeys = append(optionalSecretKeys, secretKey)
}
}

if secret != "" {
for _, requiredKey := range requiredSecretKeys {
secret, err := cmd.Flags().GetString(requiredKey)
if err != nil {
return nil, err
}
if secret == "" {
return nil, fmt.Errorf("must provide %s for type %s", requiredKey, connectionType)
}
backendKey, ok := flink.ConnectionSecretBackendKeyMapping[requiredKey]
if !ok {
return nil, fmt.Errorf(`backend key not found for "%s"`, requiredKey)
}
secretMap[backendKey] = secret
}

for _, optionalKey := range optionalSecretKeys {
secret, err := cmd.Flags().GetString(optionalKey)
if err != nil {
return nil, err
}
if secret != "" {
backendKey, ok := flink.ConnectionSecretBackendKeyMapping[optionalKey]
if !ok {
return nil, fmt.Errorf("backend key not found for %s", optionalKey)
}
secretMap[backendKey] = secret
}
}
}

return secretMap, nil
Expand Down
66 changes: 51 additions & 15 deletions pkg/flink/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package flink

var (
ConnectionTypes = []string{"openai", "azureml", "azureopenai", "bedrock", "sagemaker", "googleai", "vertexai", "mongodb", "elastic", "pinecone", "couchbase", "confluent_jdbc"}
ConnectionTypes = []string{"openai", "azureml", "azureopenai", "bedrock", "sagemaker", "googleai", "vertexai", "mongodb", "elastic", "pinecone", "couchbase", "confluent_jdbc", "mcp_server"}
ConnectionTypeSecretMapping = map[string][]string{
"openai": {"api-key"},
"azureml": {"api-key"},
Expand All @@ -15,16 +15,23 @@ var (
"pinecone": {"api-key"},
"couchbase": {"username", "password"},
"confluent_jdbc": {"username", "password"},
"mcp_server": {"auth-type"},
}

ConnectionSecretTypeMapping = map[string][]string{
"api-key": {"openai", "azureml", "azureopenai", "googleai", "elastic", "pinecone"},
"aws-access-key": {"bedrock", "sagemaker"},
"aws-secret-key": {"bedrock", "sagemaker"},
"aws-session-token": {"bedrock", "sagemaker"},
"service-key": {"vertexai"},
"username": {"mongodb", "couchbase", "confluent_jdbc"},
"password": {"mongodb", "couchbase", "confluent_jdbc"},
"api-key": {"openai", "azureml", "azureopenai", "googleai", "elastic", "pinecone", "mcp_server"},
"aws-access-key": {"bedrock", "sagemaker"},
"aws-secret-key": {"bedrock", "sagemaker"},
"aws-session-token": {"bedrock", "sagemaker"},
"service-key": {"vertexai"},
"username": {"mongodb", "couchbase", "confluent_jdbc"},
"password": {"mongodb", "couchbase", "confluent_jdbc"},
"auth-type": {"mcp_server"},
"bearer-token": {"mcp_server"},
"oauth2-token-endpoint": {"mcp_server"},
"oauth2-client-secret": {"mcp_server"},
"oauth2-client-id": {"mcp_server"},
"oauth2-scope": {"mcp_server"},
}

ConnectionRequiredSecretMapping = map[string][]string{
Expand All @@ -40,14 +47,43 @@ var (
"pinecone": {"api-key"},
"couchbase": {"username", "password"},
"confluent_jdbc": {"username", "password"},
"mcp_server": {"auth-type"},
}
ConnectionSecretBackendKeyMapping = map[string]string{
"api-key": "API_KEY",
"aws-access-key": "AWS_ACCESS_KEY_ID",
"aws-secret-key": "AWS_SECRET_ACCESS_KEY",
"aws-session-token": "AWS_SESSION_TOKEN",
"service-key": "SERVICE_KEY",
"username": "USERNAME",
"password": "PASSWORD",
"api-key": "API_KEY",
"aws-access-key": "AWS_ACCESS_KEY_ID",
"aws-secret-key": "AWS_SECRET_ACCESS_KEY",
"aws-session-token": "AWS_SESSION_TOKEN",
"service-key": "SERVICE_KEY",
"username": "USERNAME",
"password": "PASSWORD",
"auth-type": "AUTH_TYPE",
"bearer-token": "BEARER_TOKEN",
"oauth2-token-endpoint": "OAUTH2_TOKEN_ENDPOINT",
"oauth2-client-secret": "OAUTH2_CLIENT_SECRET",
"oauth2-client-id": "OAUTH2_CLIENT_ID",
"oauth2-scope": "OAUTH2_SCOPE",
}

ConnectionTypeDynamicKeyMapping = map[string]string{
"mcp_server": "auth-type",
}

ConnectionDynamicRequiredSecretMapping = map[string]map[string][]string{
"mcp_server": {
"NO_AUTH": {},
"API_KEY": {"api-key"},
"BEARER": {"bearer-token"},
"OAUTH2": {"oauth2-token-endpoint", "oauth2-client-id", "oauth2-client-secret", "oauth2-scope"},
},
}

ConnectionDynamicSecretMapping = map[string]map[string][]string{
"mcp_server": {
"NO_AUTH": {},
"API_KEY": {"api-key"},
"BEARER": {"bearer-token"},
"OAUTH2": {"oauth2-token-endpoint", "oauth2-client-id", "oauth2-client-secret", "oauth2-scope"},
},
}
)