Skip to content
Merged
204 changes: 181 additions & 23 deletions pkg/virtualkubelet/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/url"
"os"
"regexp"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -829,33 +830,74 @@ func remoteExecutionHandleProjectedSource(
// https://kubernetes.io/docs/concepts/workloads/pods/downward-api/
// See URL doc above, that describe what type of DownwardAPI to expect from volume. For now, only FieldRef is supported.
// The rest are ignored.
for _, item := range source.DownwardAPI.Items {
switch {

case item.FieldRef != nil:
switch item.FieldRef.FieldPath {
case "metadata.name":
projectedVolume.Data[item.Path] = pod.Name

case "metadata.namespace":
projectedVolume.Data[item.Path] = pod.Namespace

case "metadata.uid":
projectedVolume.Data[item.Path] = string(pod.UID)

// TODO implement DownwardAPI annotation and label if needed.
err := populateProjectedVolumeFromDownwardAPI(ctx, pod, source.DownwardAPI.Items, projectedVolume)
if err != nil {
return err
}
}
return nil
}

default:
log.G(ctx).Warningf("in pod %s unsupported DownwardAPI FieldPath %s in InterLink, ignoring this source...", pod.Name, item.FieldRef.FieldPath)
}
func formatDownwardAPIMetadataMap(data map[string]string) string {
if len(data) == 0 {
return ""
}
keys := make([]string, len(data))
i := 0
for k := range data {
keys[i] = k
i++
}
sort.Strings(keys)
var b strings.Builder
for _, k := range keys {
b.WriteString(k)
b.WriteString("=")
b.WriteString(strconv.Quote(data[k]))
b.WriteString("\n")
}
return b.String()
}

case item.ResourceFieldRef != nil:
// TODO implement DownwardAPI resourceFieldRef if needed.
log.G(ctx).Warningf("in pod %s unsupported DownwardAPI resourceFieldRef in InterLink, ignoring this source...", pod.Name)
func resolveDownwardAPIFieldPath(pod *v1.Pod, fieldPath string) (string, bool) {
switch fieldPath {
case "metadata.name":
return pod.Name, true
case "metadata.namespace":
return pod.Namespace, true
case "metadata.uid":
return string(pod.UID), true
case "metadata.labels":
return formatDownwardAPIMetadataMap(pod.Labels), true
case "metadata.annotations":
return formatDownwardAPIMetadataMap(pod.Annotations), true
case "spec.nodeName":
return pod.Spec.NodeName, true
case "spec.serviceAccountName":
return pod.Spec.ServiceAccountName, true
case "status.podIP":
return pod.Status.PodIP, true
case "status.hostIP":
return pod.Status.HostIP, true
default:
return "", false
}
}

default:
log.G(ctx).Warningf("in pod %s unsupported unknown DownwardAPI in InterLink, ignoring this source...", pod.Name)
func populateProjectedVolumeFromDownwardAPI(ctx context.Context, pod *v1.Pod, items []v1.DownwardAPIVolumeFile, projectedVolume *v1.ConfigMap) error {
for _, item := range items {
switch {
case item.FieldRef != nil:
if value, ok := resolveDownwardAPIFieldPath(pod, item.FieldRef.FieldPath); ok {
projectedVolume.Data[item.Path] = value
} else {
log.G(ctx).Warningf("in pod %s unsupported DownwardAPI FieldPath %s in InterLink, ignoring this source...", pod.Name, item.FieldRef.FieldPath)
}
case item.ResourceFieldRef != nil:
// TODO implement DownwardAPI resourceFieldRef if needed.
log.G(ctx).Warningf("in pod %s unsupported DownwardAPI resourceFieldRef in InterLink, ignoring this source...", pod.Name)
default:
log.G(ctx).Warningf("in pod %s unsupported unknown DownwardAPI in InterLink, ignoring this source...", pod.Name)
}
}
return nil
Expand Down Expand Up @@ -916,6 +958,46 @@ func remoteExecutionHandleVolumes(ctx context.Context, p *Provider, pod *v1.Pod,
log.G(ctx).Debug("ProjectedVolumeMaps len: ", len(req.ProjectedVolumeMaps))
}

case volume.DownwardAPI != nil:
if p.config.DisableProjectedVolumes {
log.G(ctx).Warning("Flag DisableProjectedVolumes set to true, so not handling DownwardAPI Volume: ", volume)
break
}

projectedVolume := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: volume.Name},
Data: make(map[string]string),
}
log.G(ctx).Debug("Adding to PodCreateRequests the downwardAPI volume ", volume.Name)

Comment on lines +961 to +972
err := populateProjectedVolumeFromDownwardAPI(ctx, pod, volume.DownwardAPI.Items, &projectedVolume)
if err != nil {
return err
}

req.ProjectedVolumeMaps = append(req.ProjectedVolumeMaps, projectedVolume)

Comment thread
dciangot marked this conversation as resolved.
for i := range pod.Spec.Volumes {
if pod.Spec.Volumes[i].Name != volume.Name {
continue
}

pod.Spec.Volumes[i].VolumeSource = v1.VolumeSource{
Projected: &v1.ProjectedVolumeSource{
DefaultMode: volume.DownwardAPI.DefaultMode,
Sources: []v1.VolumeProjection{
{
DownwardAPI: &v1.DownwardAPIProjection{
Items: volume.DownwardAPI.Items,
},
},
},
},
}

break
}

case volume.Secret != nil:
scrt, err := p.clientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1082,6 +1164,75 @@ func resolveEnvRefs(
}
}

func resolveEnvFromRefs(
ctx context.Context,
p *Provider,
pod *v1.Pod,
container *v1.Container,
) {
if len(container.EnvFrom) == 0 {
return
}

explicitEnv := make(map[string]struct{}, len(container.Env))
envIndex := make(map[string]int, len(container.Env))
for i := range container.Env {
explicitEnv[container.Env[i].Name] = struct{}{}
envIndex[container.Env[i].Name] = i
}

upsert := func(name, value string) {
if _, isExplicit := explicitEnv[name]; isExplicit {
return
}
if idx, ok := envIndex[name]; ok {
container.Env[idx].Value = value
container.Env[idx].ValueFrom = nil
return
}
container.Env = append(container.Env, v1.EnvVar{Name: name, Value: value})
envIndex[name] = len(container.Env) - 1
}

for _, envFrom := range container.EnvFrom {
prefix := envFrom.Prefix

if cmRef := envFrom.ConfigMapRef; cmRef != nil {
cm, err := p.clientSet.CoreV1().
ConfigMaps(pod.Namespace).
Get(ctx, cmRef.Name, metav1.GetOptions{})
if err != nil {
if cmRef.Optional != nil && *cmRef.Optional {
continue
}
log.G(ctx).Errorf("resolving envFrom ConfigMap %s/%s: %v", pod.Namespace, cmRef.Name, err)
continue
}
for key, value := range cm.Data {
upsert(prefix+key, value)
}
}

if secretRef := envFrom.SecretRef; secretRef != nil {
secret, err := p.clientSet.CoreV1().
Secrets(pod.Namespace).
Get(ctx, secretRef.Name, metav1.GetOptions{})
if err != nil {
if secretRef.Optional != nil && *secretRef.Optional {
continue
}
log.G(ctx).Errorf("resolving envFrom Secret %s/%s: %v", pod.Namespace, secretRef.Name, err)
continue
}
for key, value := range secret.Data {
upsert(prefix+key, string(value))
}
}
}

container.EnvFrom = nil
}

// RemoteExecution is called by the VK everytime a Pod is being registered or deleted to/from the VK.
// Depending on the mode (CREATE/DELETE), it performs different actions, making different REST calls.
// Note: for the CREATE mode, the function gets stuck up to 5 minutes waiting for every missing ConfigMap/Secret.
Expand Down Expand Up @@ -1130,6 +1281,13 @@ func RemoteExecution(ctx context.Context, config Config, p *Provider, pod *v1.Po

addKubernetesServicesEnvVars(ctx, config, podToOffload)

for i := range podToOffload.Spec.InitContainers {
resolveEnvFromRefs(ctx, p, podToOffload, &podToOffload.Spec.InitContainers[i])
}
for i := range podToOffload.Spec.Containers {
resolveEnvFromRefs(ctx, p, podToOffload, &podToOffload.Spec.Containers[i])
}

if config.SkipDownwardAPIResolution {
log.G(ctx).Info("SkipDownwardAPIResolution is set to true")
for i := range podToOffload.Spec.InitContainers {
Expand Down
Loading
Loading