From 460c06404e3825e6588b1f376a90ee1aa3f3f903 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Thu, 19 Feb 2026 18:56:52 -0500 Subject: [PATCH 1/3] create metadata service --- cmd/deployment-tracker/main.go | 10 +- internal/controller/controller.go | 206 ++--------------------------- internal/metadata/metadata.go | 203 ++++++++++++++++++++++++++++ internal/metadata/metadata_test.go | 1 + 4 files changed, 225 insertions(+), 195 deletions(-) create mode 100644 internal/metadata/metadata.go create mode 100644 internal/metadata/metadata_test.go diff --git a/cmd/deployment-tracker/main.go b/cmd/deployment-tracker/main.go index b447e5c..bbd655f 100644 --- a/cmd/deployment-tracker/main.go +++ b/cmd/deployment-tracker/main.go @@ -13,7 +13,8 @@ import ( "time" "github.com/github/deployment-tracker/internal/controller" - "k8s.io/client-go/metadata" + "github.com/github/deployment-tracker/internal/metadata" + k8smetadata "k8s.io/client-go/metadata" "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/client-go/kubernetes" @@ -114,13 +115,16 @@ func main() { } // Create metadata client - metadataClient, err := metadata.NewForConfig(k8sCfg) + metadataClient, err := k8smetadata.NewForConfig(k8sCfg) if err != nil { slog.Error("Error creating Kubernetes metadata client", "error", err) os.Exit(1) } + // Create MetadataService + metadataService := metadata.NewMetadataService(metadataClient) + // Start the metrics server var promSrv = &http.Server{ Addr: ":" + metricsPort, @@ -160,7 +164,7 @@ func main() { cancel() }() - cntrl, err := controller.New(clientset, metadataClient, namespace, excludeNamespaces, &cntrlCfg) + cntrl, err := controller.New(clientset, metadataService, namespace, excludeNamespaces, &cntrlCfg) if err != nil { slog.Error("Failed to create controller", "error", err) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 4670a80..75ff19c 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -8,15 +8,12 @@ import ( "slices" "strings" "time" - "unicode/utf8" + "github.com/github/deployment-tracker/internal/metadata" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/github/deployment-tracker/pkg/dtmetrics" "github.com/github/deployment-tracker/pkg/ociutil" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" amcache "k8s.io/apimachinery/pkg/util/cache" - "k8s.io/client-go/metadata" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -34,14 +31,6 @@ const ( EventCreated = "CREATED" // EventDeleted indicates that a pod has been deleted. EventDeleted = "DELETED" - // MetadataAnnotationPrefix is the annotation key prefix for deployment record metadata like runtime risk and tags. - MetadataAnnotationPrefix = "metadata.github.com/" - // RuntimeRisksAnnotationKey is the tag key for runtime risks. Comes after MetadataAnnotationPrefix. - RuntimeRisksAnnotationKey = "runtime-risks" - // MaxCustomTags is the maximum number of custom tags per deployment record. - MaxCustomTags = 5 - // MaxCustomTagLength is the maximum length for a custom tag key or value. - MaxCustomTagLength = 100 ) type ttlCache interface { @@ -57,20 +46,14 @@ type PodEvent struct { DeletedPod *corev1.Pod // Only populated for delete events } -// AggregatePodMetadata represents combined metadata for a pod and its ownership hierarchy. -type AggregatePodMetadata struct { - RuntimeRisks map[deploymentrecord.RuntimeRisk]bool - Tags map[string]string -} - // Controller is the Kubernetes controller for tracking deployments. type Controller struct { - clientset kubernetes.Interface - metadataClient metadata.Interface - podInformer cache.SharedIndexInformer - workqueue workqueue.TypedRateLimitingInterface[PodEvent] - apiClient *deploymentrecord.Client - cfg *Config + clientset kubernetes.Interface + metadataService *metadata.MetadataService + podInformer cache.SharedIndexInformer + workqueue workqueue.TypedRateLimitingInterface[PodEvent] + apiClient *deploymentrecord.Client + cfg *Config // best effort cache to avoid redundant posts // post requests are idempotent, so if this cache fails due to // restarts or other events, nothing will break. @@ -78,7 +61,7 @@ type Controller struct { } // New creates a new deployment tracker controller. -func New(clientset kubernetes.Interface, metadataClient metadata.Interface, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) { +func New(clientset kubernetes.Interface, metadataService *metadata.MetadataService, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) { // Create informer factory factory := createInformerFactory(clientset, namespace, excludeNamespaces) @@ -111,7 +94,7 @@ func New(clientset kubernetes.Interface, metadataClient metadata.Interface, name cntrl := &Controller{ clientset: clientset, - metadataClient: metadataClient, + metadataService: metadataService, podInformer: podInformer, workqueue: queue, apiClient: apiClient, @@ -363,9 +346,9 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { var lastErr error // Gather aggregate metadata for adds/updates - var aggPodMetadata *AggregatePodMetadata + var aggPodMetadata *metadata.AggregatePodMetadata if status != deploymentrecord.StatusDecommissioned { - aggPodMetadata = c.aggregateMetadata(ctx, podToPartialMetadata(pod)) + aggPodMetadata = c.metadataService.AggregatePodMetadata(ctx, podToPartialMetadata(pod)) } // Record info for each container in the pod @@ -405,7 +388,7 @@ func (c *Controller) deploymentExists(ctx context.Context, namespace, name strin } // recordContainer records a single container's deployment info. -func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *AggregatePodMetadata) error { +func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error { var cacheKey string dn := getARDeploymentName(pod, container, c.cfg.Template) @@ -521,7 +504,7 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta case deploymentrecord.StatusDecommissioned: cacheKey = getCacheKey(EventDeleted, dn, digest) c.observedDeployments.Set(cacheKey, true, 2*time.Minute) - // If there was a previous created event, remove that + // If there was a previous create event, remove that cacheKey = getCacheKey(EventCreated, dn, digest) c.observedDeployments.Delete(cacheKey) default: @@ -531,95 +514,6 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta return nil } -// aggregateMetadata returns aggregated metadata for a pod and its owners. -func (c *Controller) aggregateMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata { - aggMetadata := &AggregatePodMetadata{ - RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool), - Tags: make(map[string]string), - } - queue := []*metav1.PartialObjectMetadata{obj} - visited := make(map[types.UID]bool) - - for len(queue) > 0 { - current := queue[0] - queue = queue[1:] - - if visited[current.GetUID()] { - slog.Warn("Already visited object, skipping to avoid cycles", - "UID", current.GetUID(), - "name", current.GetName(), - ) - continue - } - visited[current.GetUID()] = true - - extractMetadataFromObject(current, aggMetadata) - c.addOwnersToQueue(ctx, current, &queue) - } - - return aggMetadata -} - -// addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing -// to collect their metadata. -func (c *Controller) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) { - ownerRefs := current.GetOwnerReferences() - - for _, owner := range ownerRefs { - ownerObj, err := c.getOwnerMetadata(ctx, current.GetNamespace(), owner) - if err != nil { - slog.Warn("Failed to get owner object for metadata collection", - "namespace", current.GetNamespace(), - "owner_kind", owner.Kind, - "owner_name", owner.Name, - "error", err, - ) - continue - } - - if ownerObj == nil { - continue - } - - *queue = append(*queue, ownerObj) - } -} - -// getOwnerMetadata retrieves partial object metadata for an owner ref. -func (c *Controller) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) { - gvr := schema.GroupVersionResource{ - Group: "apps", - Version: "v1", - } - - switch owner.Kind { - case "ReplicaSet": - gvr.Resource = "replicasets" - case "Deployment": - gvr.Resource = "deployments" - default: - slog.Debug("Unsupported owner kind for runtime risk collection", - "kind", owner.Kind, - "name", owner.Name, - ) - return nil, nil - } - - obj, err := c.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{}) - if err != nil { - if k8serrors.IsNotFound(err) { - slog.Debug("Owner object not found for metadata collection", - "namespace", namespace, - "owner_kind", owner.Kind, - "owner_name", owner.Name, - ) - return nil, nil - } - return nil, err - } - return obj, nil -} - func getCacheKey(ev, dn, digest string) string { return ev + "||" + dn + "||" + digest } @@ -676,7 +570,7 @@ func createInformerFactory(clientset kubernetes.Interface, namespace string, exc // getARDeploymentName converts the pod's metadata into the correct format // for the deployment name for the artifact registry (this is not the same -// as the K8s deployment's name! +// as the K8s deployment's name!) // The deployment name must unique within logical, physical environment and // the cluster. func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string { @@ -728,78 +622,6 @@ func getDeploymentName(pod *corev1.Pod) string { return "" } -// extractMetadataFromObject extracts metadata from an object. -func extractMetadataFromObject(obj *metav1.PartialObjectMetadata, aggPodMetadata *AggregatePodMetadata) { - annotations := obj.GetAnnotations() - - // Extract runtime risks - if risks, exists := annotations[MetadataAnnotationPrefix+RuntimeRisksAnnotationKey]; exists { - for _, risk := range strings.Split(risks, ",") { - r := deploymentrecord.ValidateRuntimeRisk(risk) - if r != "" { - aggPodMetadata.RuntimeRisks[r] = true - } - } - } - - // Extract tags by sorted keys to ensure tags are deterministic - // if over the limit and some are dropped, the same ones will be dropped each time. - keys := make([]string, 0, len(annotations)) - for key := range annotations { - keys = append(keys, key) - } - slices.Sort(keys) - - for _, key := range keys { - if len(aggPodMetadata.Tags) >= MaxCustomTags { - break - } - - if strings.HasPrefix(key, MetadataAnnotationPrefix) { - tagKey := strings.TrimPrefix(key, MetadataAnnotationPrefix) - tagValue := annotations[key] - - if RuntimeRisksAnnotationKey == tagKey { - // ignore runtime risks for custom tags - continue - } - if utf8.RuneCountInString(tagKey) > MaxCustomTagLength || utf8.RuneCountInString(tagValue) > MaxCustomTagLength { - slog.Warn("Tag key or value exceeds max length, skipping", - "object_name", obj.GetName(), - "kind", obj.Kind, - "tag_key", tagKey, - "tag_value", tagValue, - "key_length", utf8.RuneCountInString(tagKey), - "value_length", utf8.RuneCountInString(tagValue), - "max_length", MaxCustomTagLength, - ) - continue - } - if tagKey == "" { - slog.Warn("Tag key is empty, skipping", - "object_name", obj.GetName(), - "kind", obj.Kind, - "annotation", key, - "tag_key", tagKey, - "tag_value", tagValue, - ) - continue - } - if _, exists := aggPodMetadata.Tags[tagKey]; exists { - slog.Debug("Duplicate tag key found, skipping", - "object_name", obj.GetName(), - "kind", obj.Kind, - "tag_key", tagKey, - "existing_value", aggPodMetadata.Tags[tagKey], - "new_value", tagValue, - ) - continue - } - aggPodMetadata.Tags[tagKey] = tagValue - } - } -} - func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata { return &metav1.PartialObjectMetadata{ TypeMeta: metav1.TypeMeta{ diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go new file mode 100644 index 0000000..803b2b0 --- /dev/null +++ b/internal/metadata/metadata.go @@ -0,0 +1,203 @@ +package metadata + +import ( + "context" + "log/slog" + "slices" + "strings" + "unicode/utf8" + + "github.com/github/deployment-tracker/pkg/deploymentrecord" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + k8smetadata "k8s.io/client-go/metadata" +) + +const ( + // MetadataAnnotationPrefix is the annotation key prefix for deployment record metadata like runtime risk and tags. + MetadataAnnotationPrefix = "metadata.github.com/" + // RuntimeRisksAnnotationKey is the tag key for runtime risks. Comes after MetadataAnnotationPrefix. + RuntimeRisksAnnotationKey = "runtime-risks" + // MaxCustomTags is the maximum number of custom tags per deployment record. + MaxCustomTags = 5 + // MaxCustomTagLength is the maximum length for a custom tag key or value. + MaxCustomTagLength = 100 +) + +type MetadataService struct { + metadataClient k8smetadata.Interface +} + +// AggregatePodMetadata represents combined metadata for a pod and its ownership hierarchy. +type AggregatePodMetadata struct { + RuntimeRisks map[deploymentrecord.RuntimeRisk]bool + Tags map[string]string +} + +func NewMetadataService(metadataClient k8smetadata.Interface) *MetadataService { + return &MetadataService{ + metadataClient: metadataClient, + } +} + +func (ms *MetadataService) AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata { + aggMetadata := &AggregatePodMetadata{ + RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool), + Tags: make(map[string]string), + } + queue := []*metav1.PartialObjectMetadata{obj} + visited := make(map[types.UID]bool) + + for len(queue) > 0 { + current := queue[0] + queue = queue[1:] + + if visited[current.GetUID()] { + slog.Warn("Already visited object, skipping to avoid cycles", + "UID", current.GetUID(), + "name", current.GetName(), + ) + continue + } + visited[current.GetUID()] = true + + extractMetadataFromObject(current, aggMetadata) + ms.addOwnersToQueue(ctx, current, &queue) + } + + return aggMetadata +} + +// addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing +// to collect their metadata. +func (ms *MetadataService) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) { + ownerRefs := current.GetOwnerReferences() + + for _, owner := range ownerRefs { + ownerObj, err := ms.getOwnerMetadata(ctx, current.GetNamespace(), owner) + if err != nil { + slog.Warn("Failed to get owner object for metadata collection", + "namespace", current.GetNamespace(), + "owner_kind", owner.Kind, + "owner_name", owner.Name, + "error", err, + ) + continue + } + + if ownerObj == nil { + continue + } + + *queue = append(*queue, ownerObj) + } +} + +// getOwnerMetadata retrieves partial object metadata for an owner ref. +func (ms *MetadataService) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) { + gvr := schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + } + + switch owner.Kind { + case "ReplicaSet": + gvr.Resource = "replicasets" + case "Deployment": + gvr.Resource = "deployments" + default: + slog.Debug("Unsupported owner kind for runtime risk collection", + "kind", owner.Kind, + "name", owner.Name, + ) + return nil, nil + } + + obj, err := ms.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + slog.Debug("Owner object not found for metadata collection", + "namespace", namespace, + "owner_kind", owner.Kind, + "owner_name", owner.Name, + ) + return nil, nil + } + return nil, err + } + return obj, nil +} + +// extractMetadataFromObject extracts metadata from an object. +func extractMetadataFromObject(obj *metav1.PartialObjectMetadata, aggPodMetadata *AggregatePodMetadata) { + annotations := obj.GetAnnotations() + + // Extract runtime risks + if risks, exists := annotations[MetadataAnnotationPrefix+RuntimeRisksAnnotationKey]; exists { + for _, risk := range strings.Split(risks, ",") { + r := deploymentrecord.ValidateRuntimeRisk(risk) + if r != "" { + aggPodMetadata.RuntimeRisks[r] = true + } + } + } + + // Extract tags by sorted keys to ensure tags are deterministic + // if over the limit and some are dropped, the same ones will be dropped each time. + keys := make([]string, 0, len(annotations)) + for key := range annotations { + keys = append(keys, key) + } + slices.Sort(keys) + + for _, key := range keys { + if len(aggPodMetadata.Tags) >= MaxCustomTags { + break + } + + if strings.HasPrefix(key, MetadataAnnotationPrefix) { + tagKey := strings.TrimPrefix(key, MetadataAnnotationPrefix) + tagValue := annotations[key] + + if RuntimeRisksAnnotationKey == tagKey { + // ignore runtime risks for custom tags + continue + } + if utf8.RuneCountInString(tagKey) > MaxCustomTagLength || utf8.RuneCountInString(tagValue) > MaxCustomTagLength { + slog.Warn("Tag key or value exceeds max length, skipping", + "object_name", obj.GetName(), + "kind", obj.Kind, + "tag_key", tagKey, + "tag_value", tagValue, + "key_length", utf8.RuneCountInString(tagKey), + "value_length", utf8.RuneCountInString(tagValue), + "max_length", MaxCustomTagLength, + ) + continue + } + if tagKey == "" { + slog.Warn("Tag key is empty, skipping", + "object_name", obj.GetName(), + "kind", obj.Kind, + "annotation", key, + "tag_key", tagKey, + "tag_value", tagValue, + ) + continue + } + if _, exists := aggPodMetadata.Tags[tagKey]; exists { + slog.Debug("Duplicate tag key found, skipping", + "object_name", obj.GetName(), + "kind", obj.Kind, + "tag_key", tagKey, + "existing_value", aggPodMetadata.Tags[tagKey], + "new_value", tagValue, + ) + continue + } + aggPodMetadata.Tags[tagKey] = tagValue + } + } +} diff --git a/internal/metadata/metadata_test.go b/internal/metadata/metadata_test.go new file mode 100644 index 0000000..82c4846 --- /dev/null +++ b/internal/metadata/metadata_test.go @@ -0,0 +1 @@ +package metadata From 103437d73131338c0f14d4e1c207a8622be46d18 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 20 Feb 2026 12:59:50 -0500 Subject: [PATCH 2/3] rename to aggregator, add testing --- cmd/deployment-tracker/main.go | 6 +- internal/controller/controller.go | 22 +- internal/metadata/metadata.go | 21 +- internal/metadata/metadata_test.go | 380 +++++++++++++++++++++++++++++ 4 files changed, 408 insertions(+), 21 deletions(-) diff --git a/cmd/deployment-tracker/main.go b/cmd/deployment-tracker/main.go index bbd655f..441d42c 100644 --- a/cmd/deployment-tracker/main.go +++ b/cmd/deployment-tracker/main.go @@ -122,8 +122,8 @@ func main() { os.Exit(1) } - // Create MetadataService - metadataService := metadata.NewMetadataService(metadataClient) + // Create metadata aggregator + metadataAggregator := metadata.NewAggregator(metadataClient) // Start the metrics server var promSrv = &http.Server{ @@ -164,7 +164,7 @@ func main() { cancel() }() - cntrl, err := controller.New(clientset, metadataService, namespace, excludeNamespaces, &cntrlCfg) + cntrl, err := controller.New(clientset, metadataAggregator, namespace, excludeNamespaces, &cntrlCfg) if err != nil { slog.Error("Failed to create controller", "error", err) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 75ff19c..4420eee 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -39,6 +39,10 @@ type ttlCache interface { Delete(k any) } +type podMetadataAggregator interface { + AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata +} + // PodEvent represents a pod event to be processed. type PodEvent struct { Key string @@ -48,12 +52,12 @@ type PodEvent struct { // Controller is the Kubernetes controller for tracking deployments. type Controller struct { - clientset kubernetes.Interface - metadataService *metadata.MetadataService - podInformer cache.SharedIndexInformer - workqueue workqueue.TypedRateLimitingInterface[PodEvent] - apiClient *deploymentrecord.Client - cfg *Config + clientset kubernetes.Interface + metadataAggregator podMetadataAggregator + podInformer cache.SharedIndexInformer + workqueue workqueue.TypedRateLimitingInterface[PodEvent] + apiClient *deploymentrecord.Client + cfg *Config // best effort cache to avoid redundant posts // post requests are idempotent, so if this cache fails due to // restarts or other events, nothing will break. @@ -61,7 +65,7 @@ type Controller struct { } // New creates a new deployment tracker controller. -func New(clientset kubernetes.Interface, metadataService *metadata.MetadataService, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) { +func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregator, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) { // Create informer factory factory := createInformerFactory(clientset, namespace, excludeNamespaces) @@ -94,7 +98,7 @@ func New(clientset kubernetes.Interface, metadataService *metadata.MetadataServi cntrl := &Controller{ clientset: clientset, - metadataService: metadataService, + metadataAggregator: metadataAggregator, podInformer: podInformer, workqueue: queue, apiClient: apiClient, @@ -348,7 +352,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { // Gather aggregate metadata for adds/updates var aggPodMetadata *metadata.AggregatePodMetadata if status != deploymentrecord.StatusDecommissioned { - aggPodMetadata = c.metadataService.AggregatePodMetadata(ctx, podToPartialMetadata(pod)) + aggPodMetadata = c.metadataAggregator.AggregatePodMetadata(ctx, podToPartialMetadata(pod)) } // Record info for each container in the pod diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index 803b2b0..e42ae2f 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -26,7 +26,8 @@ const ( MaxCustomTagLength = 100 ) -type MetadataService struct { +// Aggregator uses the Kubernetes metadata client to aggregate metadata for a pod and its ownership hierarchy. +type Aggregator struct { metadataClient k8smetadata.Interface } @@ -36,13 +37,15 @@ type AggregatePodMetadata struct { Tags map[string]string } -func NewMetadataService(metadataClient k8smetadata.Interface) *MetadataService { - return &MetadataService{ +// NewAggregator creates a new Aggregator with a Kubernetes metadata client. +func NewAggregator(metadataClient k8smetadata.Interface) *Aggregator { + return &Aggregator{ metadataClient: metadataClient, } } -func (ms *MetadataService) AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata { +// AggregatePodMetadata takes a pod's partial object metadata and traverses its ownership hierarchy to return AggregatePodMetadata. +func (m *Aggregator) AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata { aggMetadata := &AggregatePodMetadata{ RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool), Tags: make(map[string]string), @@ -64,7 +67,7 @@ func (ms *MetadataService) AggregatePodMetadata(ctx context.Context, obj *metav1 visited[current.GetUID()] = true extractMetadataFromObject(current, aggMetadata) - ms.addOwnersToQueue(ctx, current, &queue) + m.addOwnersToQueue(ctx, current, &queue) } return aggMetadata @@ -72,11 +75,11 @@ func (ms *MetadataService) AggregatePodMetadata(ctx context.Context, obj *metav1 // addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing // to collect their metadata. -func (ms *MetadataService) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) { +func (m *Aggregator) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) { ownerRefs := current.GetOwnerReferences() for _, owner := range ownerRefs { - ownerObj, err := ms.getOwnerMetadata(ctx, current.GetNamespace(), owner) + ownerObj, err := m.getOwnerMetadata(ctx, current.GetNamespace(), owner) if err != nil { slog.Warn("Failed to get owner object for metadata collection", "namespace", current.GetNamespace(), @@ -96,7 +99,7 @@ func (ms *MetadataService) addOwnersToQueue(ctx context.Context, current *metav1 } // getOwnerMetadata retrieves partial object metadata for an owner ref. -func (ms *MetadataService) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) { +func (m *Aggregator) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) { gvr := schema.GroupVersionResource{ Group: "apps", Version: "v1", @@ -115,7 +118,7 @@ func (ms *MetadataService) getOwnerMetadata(ctx context.Context, namespace strin return nil, nil } - obj, err := ms.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{}) + obj, err := m.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { slog.Debug("Owner object not found for metadata collection", diff --git a/internal/metadata/metadata_test.go b/internal/metadata/metadata_test.go index 82c4846..98a8cec 100644 --- a/internal/metadata/metadata_test.go +++ b/internal/metadata/metadata_test.go @@ -1 +1,381 @@ package metadata + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/github/deployment-tracker/pkg/deploymentrecord" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + metadatafake "k8s.io/client-go/metadata/fake" + k8stesting "k8s.io/client-go/testing" +) + +func newPartialObject(uid, name, kind string, annotations map[string]string, owners []metav1.OwnerReference) *metav1.PartialObjectMetadata { + return &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{Kind: kind, APIVersion: "apps/v1"}, + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Name: name, + Namespace: "default", + Annotations: annotations, + OwnerReferences: owners, + }, + } +} + +func ownerRef(kind, name, uid string) metav1.OwnerReference { + return metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: kind, + Name: name, + UID: types.UID(uid), + } +} + +func TestExtractMetadataFromObject(t *testing.T) { + tests := []struct { + name string + annotations map[string]string + expectedRuntimeRisks map[deploymentrecord.RuntimeRisk]bool + expectedTags map[string]string + }{ + { + name: "no annotations", + annotations: nil, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{}, + }, + { + name: "single runtime risk", + annotations: map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + }, + expectedTags: map[string]string{}, + }, + { + name: "multiple runtime risks", + annotations: map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource,internet-exposed,sensitive-data", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + deploymentrecord.InternetExposed: true, + deploymentrecord.SensitiveData: true, + }, + expectedTags: map[string]string{}, + }, + { + name: "multiple runtime risks with spaces and capitals", + annotations: map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "Critical-Resource, internet-exposed, sensitive-data", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + deploymentrecord.InternetExposed: true, + deploymentrecord.SensitiveData: true, + }, + expectedTags: map[string]string{}, + }, + { + name: "invalid runtime risks are ignored", + annotations: map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource,not-a-risk", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + }, + expectedTags: map[string]string{}, + }, + { + name: "custom tags extracted", + annotations: map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + MetadataAnnotationPrefix + "env": "prod", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"team": "platform", "env": "prod"}, + }, + { + name: "runtime risks annotation excluded from tags", + annotations: map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource", + MetadataAnnotationPrefix + "team": "platform", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + }, + expectedTags: map[string]string{"team": "platform"}, + }, + { + name: "other annotations are ignored", + annotations: map[string]string{ + "app.kubernetes.io/name": "my-app", + MetadataAnnotationPrefix + "team": "platform", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"team": "platform"}, + }, + { + name: "max custom tags of 5 enforced", + annotations: map[string]string{ + MetadataAnnotationPrefix + "tag1": "1", + MetadataAnnotationPrefix + "tag2": "2", + MetadataAnnotationPrefix + "tag3": "3", + MetadataAnnotationPrefix + "tag4": "4", + MetadataAnnotationPrefix + "tag5": "5", + MetadataAnnotationPrefix + "tag6": "6", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"tag1": "1", "tag2": "2", "tag3": "3", "tag4": "4", "tag5": "5"}, + }, + { + name: "tag key exceeding max length is skipped", + annotations: map[string]string{ + MetadataAnnotationPrefix + strings.Repeat("k", MaxCustomTagLength+1): "val", + MetadataAnnotationPrefix + "good": "val", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"good": "val"}, + }, + { + name: "tag value exceeding max length is skipped", + annotations: map[string]string{ + MetadataAnnotationPrefix + "key": strings.Repeat("v", MaxCustomTagLength+1), + MetadataAnnotationPrefix + "good": "val", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"good": "val"}, + }, + { + name: "empty tag key after prefix is skipped", + annotations: map[string]string{ + MetadataAnnotationPrefix: "value", + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + obj := newPartialObject("uid-1", "test-pod", "Pod", tt.annotations, nil) + agg := &AggregatePodMetadata{ + RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool), + Tags: make(map[string]string), + } + + extractMetadataFromObject(obj, agg) + + if len(agg.RuntimeRisks) != len(tt.expectedRuntimeRisks) { + t.Errorf("RuntimeRisks count = %d, expected %d", len(agg.RuntimeRisks), len(tt.expectedRuntimeRisks)) + } + for risk := range tt.expectedRuntimeRisks { + if !agg.RuntimeRisks[risk] { + t.Errorf("missing expected runtime risk %q", risk) + } + } + + if len(agg.Tags) != len(tt.expectedTags) { + t.Errorf("Tags count = %d, expected %d\ngot: %v\nexpected: %v", len(agg.Tags), len(tt.expectedTags), agg.Tags, tt.expectedTags) + } + for k, v := range tt.expectedTags { + if agg.Tags[k] != v { + t.Errorf("Tags[%q] = %q, expected %q", k, agg.Tags[k], v) + } + } + }) + } +} + +func TestAggregatePodMetadata(t *testing.T) { + tests := []struct { + name string + pod *metav1.PartialObjectMetadata + clusterObjects []runtime.Object + expectedRuntimeRisks map[deploymentrecord.RuntimeRisk]bool + expectedTags map[string]string + }{ + { + name: "pod with no owners", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + }, nil), + clusterObjects: nil, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"team": "platform"}, + }, + { + name: "aggregates through full deployment ownership chain: pod -> replicaset -> deployment", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + }, []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("rs-1", "my-rs", "ReplicaSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "internet-exposed", + }, []metav1.OwnerReference{ownerRef("Deployment", "my-deploy", "deploy-1")}), + newPartialObject("deploy-1", "my-deploy", "Deployment", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "sensitive-data", + MetadataAnnotationPrefix + "env": "prod", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.InternetExposed: true, + deploymentrecord.SensitiveData: true, + }, + expectedTags: map[string]string{"team": "platform", "env": "prod"}, + }, + { + name: "unsupported owner kind is skipped", + pod: newPartialObject("pod-1", "my-pod", "Pod", nil, + []metav1.OwnerReference{ownerRef("ServiceAccount", "my-sa", "sa-1")}), + clusterObjects: nil, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{}, + }, + { + name: "missing owner is skipped gracefully", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + }, []metav1.OwnerReference{ownerRef("ReplicaSet", "gone-rs", "rs-gone")}), + clusterObjects: nil, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"team": "platform"}, + }, + { + name: "duplicate tags from owner do not overwrite pod tags", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "from-pod", + }, []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("rs-1", "my-rs", "ReplicaSet", map[string]string{ + MetadataAnnotationPrefix + "team": "from-rs", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{}, + expectedTags: map[string]string{"team": "from-pod"}, + }, + { + name: "runtime risks are merged across hierarchy", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource", + }, []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("rs-1", "my-rs", "ReplicaSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource,internet-exposed", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + deploymentrecord.InternetExposed: true, + }, + expectedTags: map[string]string{}, + }, + { + name: "cycle detection in ownership chain is handled correctly", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + }, []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("rs-1", "my-rs", "ReplicaSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource", + }, []metav1.OwnerReference{ownerRef("Deployment", "my-deploy", "deploy-1")}), + newPartialObject("deploy-1", "my-deploy", "Deployment", nil, + []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + }, + expectedTags: map[string]string{"team": "platform"}, + }, + { + name: "multiple owner references are all traversed", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + }, []metav1.OwnerReference{ + ownerRef("ReplicaSet", "rs-1", "rs-1-uid"), + ownerRef("ReplicaSet", "rs-2", "rs-2-uid"), + }), + clusterObjects: []runtime.Object{ + newPartialObject("rs-1-uid", "rs-1", "ReplicaSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource", + MetadataAnnotationPrefix + "org": "engineering", + }, nil), + newPartialObject("rs-2-uid", "rs-2", "ReplicaSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "internet-exposed", + MetadataAnnotationPrefix + "env": "prod", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + deploymentrecord.InternetExposed: true, + }, + expectedTags: map[string]string{"team": "platform", "org": "engineering", "env": "prod"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := metadatafake.NewTestScheme() + _ = metav1.AddMetaToScheme(scheme) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}, &metav1.PartialObjectMetadata{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, &metav1.PartialObjectMetadata{}) + + fakeClient := metadatafake.NewSimpleMetadataClient(scheme, tt.clusterObjects...) + m := NewAggregator(fakeClient) + + result := m.AggregatePodMetadata(context.Background(), tt.pod) + + if len(result.RuntimeRisks) != len(tt.expectedRuntimeRisks) { + t.Errorf("RuntimeRisks count = %d, expected %d", len(result.RuntimeRisks), len(tt.expectedRuntimeRisks)) + } + for risk := range tt.expectedRuntimeRisks { + if !result.RuntimeRisks[risk] { + t.Errorf("missing expected runtime risk %q", risk) + } + } + + if len(result.Tags) != len(tt.expectedTags) { + t.Errorf("Tags count = %d, expected %d\ngot: %v\nexpected: %v", len(result.Tags), len(tt.expectedTags), result.Tags, tt.expectedTags) + } + for k, v := range tt.expectedTags { + if result.Tags[k] != v { + t.Errorf("Tags[%q] = %q, expected %q", k, result.Tags[k], v) + } + } + }) + } +} + +func TestAggregatePodMetadata_OwnerFetchError(t *testing.T) { + scheme := metadatafake.NewTestScheme() + _ = metav1.AddMetaToScheme(scheme) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}, &metav1.PartialObjectMetadata{}) + + fakeClient := metadatafake.NewSimpleMetadataClient(scheme) + fakeClient.PrependReactor("get", "replicasets", func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("simulated API server error") + }) + + m := NewAggregator(fakeClient) + + pod := newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "platform", + }, []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}) + + result := m.AggregatePodMetadata(context.Background(), pod) + + if len(result.RuntimeRisks) != 0 { + t.Errorf("expected no runtime risks, got %v", result.RuntimeRisks) + } + if len(result.Tags) != 1 || result.Tags["team"] != "platform" { + t.Errorf("expected only pod-level tags, got %v", result.Tags) + } +} From d9014fd9d3631322b049a471e9a12d139fd3be05 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 20 Feb 2026 13:43:14 -0500 Subject: [PATCH 3/3] address comments --- internal/controller/controller.go | 4 ++-- internal/metadata/metadata.go | 6 +++--- internal/metadata/metadata_test.go | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 4420eee..ef3e80c 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -40,7 +40,7 @@ type ttlCache interface { } type podMetadataAggregator interface { - AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata + BuildAggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata } // PodEvent represents a pod event to be processed. @@ -352,7 +352,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { // Gather aggregate metadata for adds/updates var aggPodMetadata *metadata.AggregatePodMetadata if status != deploymentrecord.StatusDecommissioned { - aggPodMetadata = c.metadataAggregator.AggregatePodMetadata(ctx, podToPartialMetadata(pod)) + aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) } // Record info for each container in the pod diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index e42ae2f..039c192 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -44,8 +44,8 @@ func NewAggregator(metadataClient k8smetadata.Interface) *Aggregator { } } -// AggregatePodMetadata takes a pod's partial object metadata and traverses its ownership hierarchy to return AggregatePodMetadata. -func (m *Aggregator) AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata { +// BuildAggregatePodMetadata takes a pod's partial object metadata and traverses its ownership hierarchy to return AggregatePodMetadata. +func (m *Aggregator) BuildAggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata { aggMetadata := &AggregatePodMetadata{ RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool), Tags: make(map[string]string), @@ -111,7 +111,7 @@ func (m *Aggregator) getOwnerMetadata(ctx context.Context, namespace string, own case "Deployment": gvr.Resource = "deployments" default: - slog.Debug("Unsupported owner kind for runtime risk collection", + slog.Debug("Unsupported owner kind for metadata collection", "kind", owner.Kind, "name", owner.Name, ) diff --git a/internal/metadata/metadata_test.go b/internal/metadata/metadata_test.go index 98a8cec..02e0d23 100644 --- a/internal/metadata/metadata_test.go +++ b/internal/metadata/metadata_test.go @@ -195,7 +195,7 @@ func TestExtractMetadataFromObject(t *testing.T) { } } -func TestAggregatePodMetadata(t *testing.T) { +func TestBuildAggregatePodMetadata(t *testing.T) { tests := []struct { name string pod *metav1.PartialObjectMetadata @@ -331,7 +331,7 @@ func TestAggregatePodMetadata(t *testing.T) { fakeClient := metadatafake.NewSimpleMetadataClient(scheme, tt.clusterObjects...) m := NewAggregator(fakeClient) - result := m.AggregatePodMetadata(context.Background(), tt.pod) + result := m.BuildAggregatePodMetadata(context.Background(), tt.pod) if len(result.RuntimeRisks) != len(tt.expectedRuntimeRisks) { t.Errorf("RuntimeRisks count = %d, expected %d", len(result.RuntimeRisks), len(tt.expectedRuntimeRisks)) @@ -354,7 +354,7 @@ func TestAggregatePodMetadata(t *testing.T) { } } -func TestAggregatePodMetadata_OwnerFetchError(t *testing.T) { +func TestBuildAggregatePodMetadata_OwnerFetchError(t *testing.T) { scheme := metadatafake.NewTestScheme() _ = metav1.AddMetaToScheme(scheme) scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}, &metav1.PartialObjectMetadata{}) @@ -370,7 +370,7 @@ func TestAggregatePodMetadata_OwnerFetchError(t *testing.T) { MetadataAnnotationPrefix + "team": "platform", }, []metav1.OwnerReference{ownerRef("ReplicaSet", "my-rs", "rs-1")}) - result := m.AggregatePodMetadata(context.Background(), pod) + result := m.BuildAggregatePodMetadata(context.Background(), pod) if len(result.RuntimeRisks) != 0 { t.Errorf("expected no runtime risks, got %v", result.RuntimeRisks)