diff --git a/internal/flink/command_connection.go b/internal/flink/command_connection.go index c6414d20d0..ce417813d0 100644 --- a/internal/flink/command_connection.go +++ b/internal/flink/command_connection.go @@ -87,6 +87,12 @@ func AddConnectionSecretFlags(cmd *cobra.Command) { cmd.Flags().String("service-key", "", fmt.Sprintf("Specify service key for the type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["service-key"], "or"))) cmd.Flags().String("username", "", fmt.Sprintf("Specify username for the type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["username"], "or"))) cmd.Flags().String("password", "", fmt.Sprintf("Specify password for the type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["password"], "or"))) + cmd.Flags().String("auth-type", "", fmt.Sprintf("Specify authentication type for the type : %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["auth-type"], "or"))) + cmd.Flags().String("bearer-token", "", fmt.Sprintf("Specify bearer token for BEARER authentication type: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["bearer-token"], "or"))) + cmd.Flags().String("oauth2-token-endpoint", "", fmt.Sprintf("Specify oauth2 token endpoint: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-token-endpoint"], "or"))) + cmd.Flags().String("oauth2-client-id", "", fmt.Sprintf("Specify oauth2 client id: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-client-id"], "or"))) + cmd.Flags().String("oauth2-client-secret", "", fmt.Sprintf("Specify oauth2 client secret: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-client-secret"], "or"))) + cmd.Flags().String("oauth2-scope", "", fmt.Sprintf("Specify oauth2 scope: %s.", utils.ArrayToCommaDelimitedString(flink.ConnectionSecretTypeMapping["oauth2-scope"], "or"))) } func validateConnectionType(connectionType string) error { @@ -100,54 +106,139 @@ func validateConnectionSecrets(cmd *cobra.Command, connectionType string) (map[s var connectionSecrets []string connectionSecrets = append(connectionSecrets, flink.ConnectionTypeSecretMapping[connectionType]...) - for key := range flink.ConnectionSecretTypeMapping { - secret, err := cmd.Flags().GetString(key) + secretMap := map[string]string{} + var requiredSecretKeys []string + var optionalSecretKeys []string + + dynamicKey, hasDynamicKey := flink.ConnectionTypeDynamicKeyMapping[connectionType] + if hasDynamicKey { + dynamicKeyValue, err := cmd.Flags().GetString(dynamicKey) if err != nil { return nil, err } - if secret != "" && !slices.Contains(connectionSecrets, key) { - return nil, errors.NewErrorWithSuggestions(fmt.Sprintf("%s is invalid for connection %s.", key, connectionType), fmt.Sprintf("Valid secret types are %s.", utils.ArrayToCommaDelimitedString(connectionSecrets, "or"))) + if dynamicKeyValue == "" { + return nil, fmt.Errorf("must provide %s for connection %s", dynamicKey, connectionType) } - } - requiredSecretKeys := flink.ConnectionRequiredSecretMapping[connectionType] - var optionalSecretKeys []string - for _, secretKey := range flink.ConnectionTypeSecretMapping[connectionType] { - if !slices.Contains(requiredSecretKeys, secretKey) { - optionalSecretKeys = append(optionalSecretKeys, secretKey) + backendKey, ok := flink.ConnectionSecretBackendKeyMapping[dynamicKey] + if !ok { + return nil, fmt.Errorf(`backend key not found for "%s"`, dynamicKey) + } + secretMap[backendKey] = dynamicKeyValue + + requiredSecretKeys, exists := flink.ConnectionDynamicRequiredSecretMapping[connectionType][dynamicKeyValue] + if !exists { + validTypes := make([]string, 0, len(flink.ConnectionDynamicRequiredSecretMapping[connectionType])) + for k := range flink.ConnectionDynamicRequiredSecretMapping[connectionType] { + validTypes = append(validTypes, k) + } + return nil, errors.NewErrorWithSuggestions( + fmt.Sprintf("invalid %s %s for connection %s", dynamicKey, dynamicKeyValue, connectionType), + fmt.Sprintf("Valid types are %s.", utils.ArrayToCommaDelimitedString(validTypes, "or")), + ) } - } - secretMap := map[string]string{} - for _, requiredKey := range requiredSecretKeys { - secret, err := cmd.Flags().GetString(requiredKey) - if err != nil { - return nil, err + allPossibleKeys, exists := flink.ConnectionDynamicSecretMapping[connectionType][dynamicKeyValue] + if exists { + connectionSecrets = append(connectionSecrets, allPossibleKeys...) + } + + for key := range flink.ConnectionSecretTypeMapping { + secret, err := cmd.Flags().GetString(key) + if err != nil { + return nil, err + } + if secret != "" && !slices.Contains(connectionSecrets, key) { + return nil, errors.NewErrorWithSuggestions( + fmt.Sprintf("%s is invalid for connection %s with %s %s.", key, connectionType, dynamicKey, dynamicKeyValue), + fmt.Sprintf("Valid secret types are %s.", utils.ArrayToCommaDelimitedString(connectionSecrets, "or")), + ) + } } - if secret == "" { - return nil, fmt.Errorf("must provide %s for type %s", requiredKey, connectionType) + + for _, secretKey := range allPossibleKeys { + if !slices.Contains(requiredSecretKeys, secretKey) { + optionalSecretKeys = append(optionalSecretKeys, secretKey) + } } - backendKey, ok := flink.ConnectionSecretBackendKeyMapping[requiredKey] - if !ok { - return nil, fmt.Errorf(`backend key not found for "%s"`, requiredKey) + + for _, requiredKey := range requiredSecretKeys { + secret, err := cmd.Flags().GetString(requiredKey) + if err != nil { + return nil, err + } + if secret == "" { + return nil, fmt.Errorf("must provide %s for %s %s on connection %s", requiredKey, dynamicKey, dynamicKeyValue, connectionType) + } + backendKey, ok := flink.ConnectionSecretBackendKeyMapping[requiredKey] + if !ok { + return nil, fmt.Errorf(`backend key not found for "%s"`, requiredKey) + } + secretMap[backendKey] = secret } - secretMap[backendKey] = secret - } - for _, optionalSecretKey := range optionalSecretKeys { - secret, err := cmd.Flags().GetString(optionalSecretKey) - if err != nil { - return nil, err + for _, optionalKey := range optionalSecretKeys { + secret, err := cmd.Flags().GetString(optionalKey) + if err != nil { + return nil, err + } + if secret != "" { + backendKey, ok := flink.ConnectionSecretBackendKeyMapping[optionalKey] + if !ok { + return nil, fmt.Errorf(`backend key not found for "%s"`, optionalKey) + } + secretMap[backendKey] = secret + } + } + } else { + for key := range flink.ConnectionSecretTypeMapping { + secret, err := cmd.Flags().GetString(key) + if err != nil { + return nil, err + } + if secret != "" && !slices.Contains(connectionSecrets, key) { + return nil, errors.NewErrorWithSuggestions( + fmt.Sprintf("%s is invalid for connection %s.", key, connectionType), + fmt.Sprintf("Valid secret types are %s.", utils.ArrayToCommaDelimitedString(connectionSecrets, "or")), + ) + } } - backendKey, ok := flink.ConnectionSecretBackendKeyMapping[optionalSecretKey] - if !ok { - return nil, fmt.Errorf("backend key not found for %s", optionalSecretKey) + requiredSecretKeys = flink.ConnectionRequiredSecretMapping[connectionType] + for _, secretKey := range flink.ConnectionTypeSecretMapping[connectionType] { + if !slices.Contains(requiredSecretKeys, secretKey) { + optionalSecretKeys = append(optionalSecretKeys, secretKey) + } } - if secret != "" { + for _, requiredKey := range requiredSecretKeys { + secret, err := cmd.Flags().GetString(requiredKey) + if err != nil { + return nil, err + } + if secret == "" { + return nil, fmt.Errorf("must provide %s for type %s", requiredKey, connectionType) + } + backendKey, ok := flink.ConnectionSecretBackendKeyMapping[requiredKey] + if !ok { + return nil, fmt.Errorf(`backend key not found for "%s"`, requiredKey) + } secretMap[backendKey] = secret } + + for _, optionalKey := range optionalSecretKeys { + secret, err := cmd.Flags().GetString(optionalKey) + if err != nil { + return nil, err + } + if secret != "" { + backendKey, ok := flink.ConnectionSecretBackendKeyMapping[optionalKey] + if !ok { + return nil, fmt.Errorf("backend key not found for %s", optionalKey) + } + secretMap[backendKey] = secret + } + } } return secretMap, nil diff --git a/pkg/flink/utils.go b/pkg/flink/utils.go index 0e36979b29..c5465b4e6c 100644 --- a/pkg/flink/utils.go +++ b/pkg/flink/utils.go @@ -1,7 +1,7 @@ package flink var ( - ConnectionTypes = []string{"openai", "azureml", "azureopenai", "bedrock", "sagemaker", "googleai", "vertexai", "mongodb", "elastic", "pinecone", "couchbase", "confluent_jdbc"} + ConnectionTypes = []string{"openai", "azureml", "azureopenai", "bedrock", "sagemaker", "googleai", "vertexai", "mongodb", "elastic", "pinecone", "couchbase", "confluent_jdbc", "mcp_server"} ConnectionTypeSecretMapping = map[string][]string{ "openai": {"api-key"}, "azureml": {"api-key"}, @@ -15,16 +15,23 @@ var ( "pinecone": {"api-key"}, "couchbase": {"username", "password"}, "confluent_jdbc": {"username", "password"}, + "mcp_server": {"auth-type"}, } ConnectionSecretTypeMapping = map[string][]string{ - "api-key": {"openai", "azureml", "azureopenai", "googleai", "elastic", "pinecone"}, - "aws-access-key": {"bedrock", "sagemaker"}, - "aws-secret-key": {"bedrock", "sagemaker"}, - "aws-session-token": {"bedrock", "sagemaker"}, - "service-key": {"vertexai"}, - "username": {"mongodb", "couchbase", "confluent_jdbc"}, - "password": {"mongodb", "couchbase", "confluent_jdbc"}, + "api-key": {"openai", "azureml", "azureopenai", "googleai", "elastic", "pinecone", "mcp_server"}, + "aws-access-key": {"bedrock", "sagemaker"}, + "aws-secret-key": {"bedrock", "sagemaker"}, + "aws-session-token": {"bedrock", "sagemaker"}, + "service-key": {"vertexai"}, + "username": {"mongodb", "couchbase", "confluent_jdbc"}, + "password": {"mongodb", "couchbase", "confluent_jdbc"}, + "auth-type": {"mcp_server"}, + "bearer-token": {"mcp_server"}, + "oauth2-token-endpoint": {"mcp_server"}, + "oauth2-client-secret": {"mcp_server"}, + "oauth2-client-id": {"mcp_server"}, + "oauth2-scope": {"mcp_server"}, } ConnectionRequiredSecretMapping = map[string][]string{ @@ -40,14 +47,43 @@ var ( "pinecone": {"api-key"}, "couchbase": {"username", "password"}, "confluent_jdbc": {"username", "password"}, + "mcp_server": {"auth-type"}, } ConnectionSecretBackendKeyMapping = map[string]string{ - "api-key": "API_KEY", - "aws-access-key": "AWS_ACCESS_KEY_ID", - "aws-secret-key": "AWS_SECRET_ACCESS_KEY", - "aws-session-token": "AWS_SESSION_TOKEN", - "service-key": "SERVICE_KEY", - "username": "USERNAME", - "password": "PASSWORD", + "api-key": "API_KEY", + "aws-access-key": "AWS_ACCESS_KEY_ID", + "aws-secret-key": "AWS_SECRET_ACCESS_KEY", + "aws-session-token": "AWS_SESSION_TOKEN", + "service-key": "SERVICE_KEY", + "username": "USERNAME", + "password": "PASSWORD", + "auth-type": "AUTH_TYPE", + "bearer-token": "BEARER_TOKEN", + "oauth2-token-endpoint": "OAUTH2_TOKEN_ENDPOINT", + "oauth2-client-secret": "OAUTH2_CLIENT_SECRET", + "oauth2-client-id": "OAUTH2_CLIENT_ID", + "oauth2-scope": "OAUTH2_SCOPE", + } + + ConnectionTypeDynamicKeyMapping = map[string]string{ + "mcp_server": "auth-type", + } + + ConnectionDynamicRequiredSecretMapping = map[string]map[string][]string{ + "mcp_server": { + "NO_AUTH": {}, + "API_KEY": {"api-key"}, + "BEARER": {"bearer-token"}, + "OAUTH2": {"oauth2-token-endpoint", "oauth2-client-id", "oauth2-client-secret", "oauth2-scope"}, + }, + } + + ConnectionDynamicSecretMapping = map[string]map[string][]string{ + "mcp_server": { + "NO_AUTH": {}, + "API_KEY": {"api-key"}, + "BEARER": {"bearer-token"}, + "OAUTH2": {"oauth2-token-endpoint", "oauth2-client-id", "oauth2-client-secret", "oauth2-scope"}, + }, } )