diff --git a/api/v1beta1/temporalnamespace_types.go b/api/v1beta1/temporalnamespace_types.go index 1ab2d45a..7a7e4c87 100644 --- a/api/v1beta1/temporalnamespace_types.go +++ b/api/v1beta1/temporalnamespace_types.go @@ -67,6 +67,14 @@ type TemporalNamespaceSpec struct { // If not set, the default cluster configuration is used. // +optional Archival *TemporalNamespaceArchivalSpec `json:"archival,omitempty"` + // CustomSearchAttributes is an optional mapping of custom search attribute names to types. + // Supported types: Text, Keyword, Int, Double, Bool, DateTime, KeywordList. + // +optional + CustomSearchAttributes map[string]string `json:"customSearchAttributes,omitempty"` + // AllowSearchAttributeDeletion makes the controller remove custom search attributes + // from the Temporal server if they are not present in the spec. + // +optional + AllowSearchAttributeDeletion bool `json:"allowSearchAttributeDeletion,omitempty"` } // TemporalNamespaceStatus defines the observed state of Namespace. diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index ce8d20cb..6f33f6af 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -1845,6 +1845,13 @@ func (in *TemporalNamespaceSpec) DeepCopyInto(out *TemporalNamespaceSpec) { *out = new(TemporalNamespaceArchivalSpec) (*in).DeepCopyInto(*out) } + if in.CustomSearchAttributes != nil { + in, out := &in.CustomSearchAttributes, &out.CustomSearchAttributes + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalNamespaceSpec. diff --git a/config/crd/bases/temporal.io_temporalnamespaces.yaml b/config/crd/bases/temporal.io_temporalnamespaces.yaml index 37ec3aaa..66bee4b3 100644 --- a/config/crd/bases/temporal.io_temporalnamespaces.yaml +++ b/config/crd/bases/temporal.io_temporalnamespaces.yaml @@ -50,6 +50,11 @@ spec: AllowDeletion makes the controller delete the Temporal namespace if the CRD is deleted. type: boolean + allowSearchAttributeDeletion: + description: |- + AllowSearchAttributeDeletion makes the controller remove custom search attributes + from the Temporal server if they are not present in the spec. + type: boolean archival: description: |- Archival is a per-namespace archival configuration. @@ -130,6 +135,13 @@ spec: items: type: string type: array + customSearchAttributes: + additionalProperties: + type: string + description: |- + CustomSearchAttributes is an optional mapping of custom search attribute names to types. + Supported types: Text, Keyword, Int, Double, Bool, DateTime, KeywordList. + type: object data: additionalProperties: type: string diff --git a/controllers/temporalnamespace_controller.go b/controllers/temporalnamespace_controller.go index cfae9fd2..83d3ab15 100644 --- a/controllers/temporalnamespace_controller.go +++ b/controllers/temporalnamespace_controller.go @@ -136,6 +136,21 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re } } + // Reconcile custom search attributes if any are configured or deletion is allowed. + if len(namespace.Spec.CustomSearchAttributes) > 0 || namespace.Spec.AllowSearchAttributeDeletion { + clusterClient, err := temporal.GetClusterClient(ctx, r.Client, cluster) + if err != nil { + err = fmt.Errorf("can't create cluster client for search attributes: %w", err) + return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) + } + defer clusterClient.Close() + + if err := temporal.ReconcileSearchAttributes(ctx, clusterClient.OperatorService(), namespace); err != nil { + err = fmt.Errorf("can't reconcile search attributes for \"%s\" namespace: %w", namespace.GetName(), err) + return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) + } + } + logger.Info("Successfully reconciled namespace", "namespace", namespace.GetName()) v1beta1.SetTemporalNamespaceReady(namespace, metav1.ConditionTrue, v1beta1.TemporalNamespaceCreatedReason, "Namespace successfully created") diff --git a/pkg/temporal/search_attributes.go b/pkg/temporal/search_attributes.go new file mode 100644 index 00000000..86b749e5 --- /dev/null +++ b/pkg/temporal/search_attributes.go @@ -0,0 +1,182 @@ +// Licensed to Alexandre VILAIN under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Alexandre VILAIN licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package temporal + +import ( + "context" + "fmt" + + "github.com/alexandrevilain/temporal-operator/api/v1beta1" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" + "google.golang.org/grpc" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// searchAttributeTypes maps user-facing type names to Temporal IndexedValueType. +var searchAttributeTypes = map[string]enums.IndexedValueType{ + "Text": enums.INDEXED_VALUE_TYPE_TEXT, + "Keyword": enums.INDEXED_VALUE_TYPE_KEYWORD, + "Int": enums.INDEXED_VALUE_TYPE_INT, + "Double": enums.INDEXED_VALUE_TYPE_DOUBLE, + "Bool": enums.INDEXED_VALUE_TYPE_BOOL, + "DateTime": enums.INDEXED_VALUE_TYPE_DATETIME, + "KeywordList": enums.INDEXED_VALUE_TYPE_KEYWORD_LIST, +} + +// searchAttributeTypeNames is the reverse mapping from IndexedValueType to string. +var searchAttributeTypeNames = map[enums.IndexedValueType]string{ + enums.INDEXED_VALUE_TYPE_TEXT: "Text", + enums.INDEXED_VALUE_TYPE_KEYWORD: "Keyword", + enums.INDEXED_VALUE_TYPE_INT: "Int", + enums.INDEXED_VALUE_TYPE_DOUBLE: "Double", + enums.INDEXED_VALUE_TYPE_BOOL: "Bool", + enums.INDEXED_VALUE_TYPE_DATETIME: "DateTime", + enums.INDEXED_VALUE_TYPE_KEYWORD_LIST: "KeywordList", +} + +// SearchAttributeTypeFromString converts a user-facing type name to its IndexedValueType. +func SearchAttributeTypeFromString(s string) (enums.IndexedValueType, error) { + t, ok := searchAttributeTypes[s] + if !ok { + return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("invalid search attribute type %q: valid types are Text, Keyword, Int, Double, Bool, DateTime, KeywordList", s) + } + return t, nil +} + +// SearchAttributeTypeToString converts an IndexedValueType to its user-facing name. +func SearchAttributeTypeToString(t enums.IndexedValueType) (string, error) { + name, ok := searchAttributeTypeNames[t] + if !ok { + return "", fmt.Errorf("unknown IndexedValueType: %v", t) + } + return name, nil +} + +// OperatorServiceClient is an interface for the Temporal OperatorService gRPC methods +// needed by search attribute reconciliation. It is satisfied by the client returned +// from temporalclient.Client.OperatorService(). +type OperatorServiceClient interface { + ListSearchAttributes(ctx context.Context, in *operatorservice.ListSearchAttributesRequest, opts ...grpc.CallOption) (*operatorservice.ListSearchAttributesResponse, error) + AddSearchAttributes(ctx context.Context, in *operatorservice.AddSearchAttributesRequest, opts ...grpc.CallOption) (*operatorservice.AddSearchAttributesResponse, error) + RemoveSearchAttributes(ctx context.Context, in *operatorservice.RemoveSearchAttributesRequest, opts ...grpc.CallOption) (*operatorservice.RemoveSearchAttributesResponse, error) +} + +// parseDesiredAttributes converts the spec's string type map into typed IndexedValueType map. +func parseDesiredAttributes(spec map[string]string) (map[string]enums.IndexedValueType, error) { + desired := make(map[string]enums.IndexedValueType, len(spec)) + for name, typeStr := range spec { + t, err := SearchAttributeTypeFromString(typeStr) + if err != nil { + return nil, fmt.Errorf("search attribute %q: %w", name, err) + } + desired[name] = t + } + return desired, nil +} + +// computeAttributesToAdd returns attributes present in desired but not in existing, +// and returns an error if any existing attribute has a type mismatch. +func computeAttributesToAdd(desired, existing map[string]enums.IndexedValueType) (map[string]enums.IndexedValueType, error) { + toAdd := make(map[string]enums.IndexedValueType) + for name, desiredType := range desired { + existingType, exists := existing[name] + if !exists { + toAdd[name] = desiredType + continue + } + if existingType != desiredType { + existingTypeName, _ := SearchAttributeTypeToString(existingType) + desiredTypeName, _ := SearchAttributeTypeToString(desiredType) + return nil, fmt.Errorf("search attribute %q has type %s on server but %s in spec; Temporal does not allow type changes", name, existingTypeName, desiredTypeName) + } + } + return toAdd, nil +} + +// computeAttributesToRemove returns attribute names present in existing but not in desired. +func computeAttributesToRemove(desired, existing map[string]enums.IndexedValueType) []string { + var toRemove []string + for name := range existing { + if _, inSpec := desired[name]; !inSpec { + toRemove = append(toRemove, name) + } + } + return toRemove +} + +// ReconcileSearchAttributes ensures the custom search attributes on the Temporal server +// match the desired state declared in the TemporalNamespace spec. +func ReconcileSearchAttributes(ctx context.Context, operatorSvc OperatorServiceClient, namespace *v1beta1.TemporalNamespace) error { + logger := log.FromContext(ctx) + nsName := namespace.GetName() + + listResp, err := operatorSvc.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{ + Namespace: nsName, + }) + if err != nil { + return fmt.Errorf("listing search attributes: %w", err) + } + + existing := listResp.GetCustomAttributes() + logger.V(1).Info("Listed existing custom search attributes", "namespace", nsName, "count", len(existing)) + + desired, err := parseDesiredAttributes(namespace.Spec.CustomSearchAttributes) + if err != nil { + return err + } + + toAdd, err := computeAttributesToAdd(desired, existing) + if err != nil { + return err + } + + var toRemove []string + if namespace.Spec.AllowSearchAttributeDeletion { + toRemove = computeAttributesToRemove(desired, existing) + } + + if len(toAdd) == 0 && len(toRemove) == 0 { + logger.V(1).Info("Search attributes are up to date", "namespace", nsName) + return nil + } + + if len(toAdd) > 0 { + logger.Info("Adding search attributes", "namespace", nsName, "count", len(toAdd)) + _, err := operatorSvc.AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: toAdd, + Namespace: nsName, + }) + if err != nil { + return fmt.Errorf("adding search attributes: %w", err) + } + } + + if len(toRemove) > 0 { + logger.Info("Removing search attributes", "namespace", nsName, "count", len(toRemove), "attributes", toRemove) + _, err := operatorSvc.RemoveSearchAttributes(ctx, &operatorservice.RemoveSearchAttributesRequest{ + SearchAttributes: toRemove, + Namespace: nsName, + }) + if err != nil { + return fmt.Errorf("removing search attributes: %w", err) + } + } + + return nil +} diff --git a/pkg/temporal/search_attributes_test.go b/pkg/temporal/search_attributes_test.go new file mode 100644 index 00000000..72d212a8 --- /dev/null +++ b/pkg/temporal/search_attributes_test.go @@ -0,0 +1,208 @@ +// Licensed to Alexandre VILAIN under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Alexandre VILAIN licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package temporal + +import ( + "context" + "testing" + + "github.com/alexandrevilain/temporal-operator/api/v1beta1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" + "google.golang.org/grpc" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const testNamespace = "test-ns" + +// mockOperatorServiceClient is a mock for testing. +type mockOperatorServiceClient struct { + listResponse *operatorservice.ListSearchAttributesResponse + listError error + addError error + removeError error + + addCalled bool + removeCalled bool + addRequest *operatorservice.AddSearchAttributesRequest + removeRequest *operatorservice.RemoveSearchAttributesRequest +} + +func (m *mockOperatorServiceClient) ListSearchAttributes(_ context.Context, _ *operatorservice.ListSearchAttributesRequest, _ ...grpc.CallOption) (*operatorservice.ListSearchAttributesResponse, error) { + return m.listResponse, m.listError +} + +func (m *mockOperatorServiceClient) AddSearchAttributes(_ context.Context, req *operatorservice.AddSearchAttributesRequest, _ ...grpc.CallOption) (*operatorservice.AddSearchAttributesResponse, error) { + m.addCalled = true + m.addRequest = req + return &operatorservice.AddSearchAttributesResponse{}, m.addError +} + +func (m *mockOperatorServiceClient) RemoveSearchAttributes(_ context.Context, req *operatorservice.RemoveSearchAttributesRequest, _ ...grpc.CallOption) (*operatorservice.RemoveSearchAttributesResponse, error) { + m.removeCalled = true + m.removeRequest = req + return &operatorservice.RemoveSearchAttributesResponse{}, m.removeError +} + +func newNamespace(attrs map[string]string, allowDeletion bool) *v1beta1.TemporalNamespace { + return &v1beta1.TemporalNamespace{ + ObjectMeta: metav1.ObjectMeta{Name: testNamespace}, + Spec: v1beta1.TemporalNamespaceSpec{ + CustomSearchAttributes: attrs, + AllowSearchAttributeDeletion: allowDeletion, + }, + } +} + +func TestReconcileSearchAttributes(t *testing.T) { + ctx := context.Background() + + t.Run("add new attributes", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{}, + }, + } + ns := newNamespace(map[string]string{ + "CustomerId": "Keyword", + "OrderTotal": "Double", + }, false) + + err := ReconcileSearchAttributes(ctx, mock, ns) + require.NoError(t, err) + assert.True(t, mock.addCalled) + assert.False(t, mock.removeCalled) + assert.Equal(t, map[string]enums.IndexedValueType{ + "CustomerId": enums.INDEXED_VALUE_TYPE_KEYWORD, + "OrderTotal": enums.INDEXED_VALUE_TYPE_DOUBLE, + }, mock.addRequest.SearchAttributes) + assert.Equal(t, testNamespace, mock.addRequest.Namespace) + }) + + t.Run("remove stale with flag on", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{ + "OldAttr": enums.INDEXED_VALUE_TYPE_TEXT, + }, + }, + } + ns := newNamespace(map[string]string{}, true) + + err := ReconcileSearchAttributes(ctx, mock, ns) + require.NoError(t, err) + assert.False(t, mock.addCalled) + assert.True(t, mock.removeCalled) + assert.Equal(t, []string{"OldAttr"}, mock.removeRequest.SearchAttributes) + }) + + t.Run("no removal with flag off", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{ + "OldAttr": enums.INDEXED_VALUE_TYPE_TEXT, + }, + }, + } + ns := newNamespace(map[string]string{}, false) + + err := ReconcileSearchAttributes(ctx, mock, ns) + require.NoError(t, err) + assert.False(t, mock.addCalled) + assert.False(t, mock.removeCalled) + }) + + t.Run("no changes when spec matches server", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{ + "CustomerId": enums.INDEXED_VALUE_TYPE_KEYWORD, + }, + }, + } + ns := newNamespace(map[string]string{ + "CustomerId": "Keyword", + }, false) + + err := ReconcileSearchAttributes(ctx, mock, ns) + require.NoError(t, err) + assert.False(t, mock.addCalled) + assert.False(t, mock.removeCalled) + }) + + t.Run("mixed add and remove", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{ + "Existing": enums.INDEXED_VALUE_TYPE_KEYWORD, + "OldAttr": enums.INDEXED_VALUE_TYPE_TEXT, + }, + }, + } + ns := newNamespace(map[string]string{ + "Existing": "Keyword", + "NewAttr": "Bool", + }, true) + + err := ReconcileSearchAttributes(ctx, mock, ns) + require.NoError(t, err) + assert.True(t, mock.addCalled) + assert.True(t, mock.removeCalled) + assert.Equal(t, map[string]enums.IndexedValueType{ + "NewAttr": enums.INDEXED_VALUE_TYPE_BOOL, + }, mock.addRequest.SearchAttributes) + assert.Equal(t, []string{"OldAttr"}, mock.removeRequest.SearchAttributes) + }) + + t.Run("type mismatch returns error", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{ + "CustomerId": enums.INDEXED_VALUE_TYPE_TEXT, + }, + }, + } + ns := newNamespace(map[string]string{ + "CustomerId": "Keyword", + }, false) + + err := ReconcileSearchAttributes(ctx, mock, ns) + assert.Error(t, err) + assert.Contains(t, err.Error(), "does not allow type changes") + assert.False(t, mock.addCalled) + assert.False(t, mock.removeCalled) + }) + + t.Run("invalid type string returns error", func(t *testing.T) { + mock := &mockOperatorServiceClient{ + listResponse: &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: map[string]enums.IndexedValueType{}, + }, + } + ns := newNamespace(map[string]string{ + "CustomerId": "InvalidType", + }, false) + + err := ReconcileSearchAttributes(ctx, mock, ns) + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid search attribute type") + assert.False(t, mock.addCalled) + }) +}