Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions client/indexd/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

"github.com/calypr/data-client/common"
"github.com/calypr/data-client/drs"
"github.com/calypr/data-client/upload"

Check failure on line 11 in client/indexd/register.go

View workflow job for this annotation

GitHub Actions / build

github.com/calypr/data-client@v0.0.0-20260210235422-eab5bbef7b2f: replacement directory /Users/walsbr/calypr/data-client does not exist

Check failure on line 11 in client/indexd/register.go

View workflow job for this annotation

GitHub Actions / build

github.com/calypr/data-client@v0.0.0-20260210235422-eab5bbef7b2f: replacement directory /Users/walsbr/calypr/data-client does not exist
localCommon "github.com/calypr/git-drs/common"
"github.com/calypr/git-drs/drsmap"
)

Expand Down Expand Up @@ -48,6 +49,24 @@
}
cl.Logger.InfoContext(ctx, fmt.Sprintf("indexd record registration complete for oid %s", oid))

// Check if this is a remote URL (no upload needed, delete the sentinel file)
cl.Logger.InfoContext(ctx, fmt.Sprintf("Check if this is a remote URL %v %v", drsObject.Aliases, drsObject))
for _, alias := range drsObject.Aliases {
if alias == "git-drs-remote-url:true" {

lfsObjPath, pathErr := drsmap.GetObjectPath(localCommon.LFS_OBJS_PATH, oid)
if pathErr != nil {
return nil, fmt.Errorf("compute LFS object path for oid %s: %w", oid, pathErr)
}

if rmErr := os.Remove(lfsObjPath); rmErr != nil && !os.IsNotExist(rmErr) {
return nil, fmt.Errorf("delete LFS object %s: %w", lfsObjPath, rmErr)
}
cl.Logger.InfoContext(ctx, fmt.Sprintf("Object was remote-url, updated index, removed local LFS object: %s", lfsObjPath))
return drsObject, nil
}
}

// Now attempt to upload the file if not already available
cl.Logger.InfoContext(ctx, fmt.Sprintf("checking if oid %s is already downloadable", oid))
downloadable, err := cl.isFileDownloadable(ctx, drsObject)
Expand Down
232 changes: 230 additions & 2 deletions cloud/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,237 @@ package cloud
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/calypr/data-client/drs"
"github.com/calypr/git-drs/lfs"
"golang.org/x/oauth2/google"
)

// Download downloads the S3 object to a temporary file while computing its SHA256 hash.
// Download extracts the first non-empty access URL from a
// DRSObject, performs a HEAD preflight for that URL, downloads object bytes to
// a temporary file, and returns the computed SHA256 and temporary file path.
func Download(ctx context.Context, drsObj *drs.DRSObject) (string, string, error) {
if ctx == nil {
ctx = context.Background()
}
if drsObj == nil {
return "", "", fmt.Errorf("drs object is nil")
}

rawURL, err := firstAccessURL(drsObj)
if err != nil {
return "", "", err
}

headMeta, err := HeadObject(ctx, rawURL)
if err != nil {
return "", "", fmt.Errorf("head preflight failed: %w", err)
}

rc, err := openObjectReader(ctx, rawURL)
if err != nil {
return "", "", err
}
defer rc.Close()

etag := headMeta.ETag
subdir1, subdir2 := "xx", "yy"
if len(etag) >= 4 {
subdir1 = etag[0:2]
subdir2 = etag[2:4]
}
objName := etag
if objName == "" {
objName = "unknown-etag"
}
Comment on lines +49 to +57
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temp file name/directory is derived directly from the remote object's ETag. ETags can contain characters that are problematic in filenames (e.g., base64 + / = on GCS) and may lead to unexpected subdirectories or invalid paths. Use a safe filename (e.g., a random name or the computed SHA256) and keep ETag only as metadata/logging.

Suggested change
subdir1, subdir2 := "xx", "yy"
if len(etag) >= 4 {
subdir1 = etag[0:2]
subdir2 = etag[2:4]
}
objName := etag
if objName == "" {
objName = "unknown-etag"
}
// Derive a filesystem-safe name from the ETag to avoid using the raw value
// directly in path components (ETags may contain characters like '+', '/', '=').
objName := "unknown-etag"
if etag != "" {
sum := sha256.Sum256([]byte(etag))
objName = hex.EncodeToString(sum[:])
}
subdir1, subdir2 := "xx", "yy"
if len(objName) >= 4 {
subdir1 = objName[0:2]
subdir2 = objName[2:4]
}

Copilot uses AI. Check for mistakes.
tmpDir := filepath.Join(os.TempDir(), "git-drs", "tmp-objects", subdir1, subdir2)
if err := os.MkdirAll(tmpDir, 0755); err != nil {
return "", "", fmt.Errorf("mkdir %s: %w", tmpDir, err)
}
tmpPath := filepath.Join(tmpDir, objName)
tmpFile, err := os.Create(tmpPath)
if err != nil {
return "", "", fmt.Errorf("create %s: %w", tmpPath, err)
}

cleanup := true
defer func() {
_ = tmpFile.Close()
if cleanup {
_ = os.Remove(tmpPath)
}
}()

h := sha256.New()
if _, err := io.Copy(io.MultiWriter(tmpFile, h), rc); err != nil {
return "", "", fmt.Errorf("download object: %w", err)
}
if err := tmpFile.Sync(); err != nil {
return "", "", fmt.Errorf("sync temp file: %w", err)
}
if err := tmpFile.Close(); err != nil {
return "", "", fmt.Errorf("close temp file: %w", err)
}

computedSHA := fmt.Sprintf("%x", h.Sum(nil))

_, lfsRoot, err := lfs.GetGitRootDirectories(ctx)
if err != nil {
return "", "", fmt.Errorf("get git root directories: %w", err)
}

oid := computedSHA // sha of sentinel drsObj.Checksums.SHA256
dstDir := filepath.Join(lfsRoot, "objects", oid[0:2], oid[2:4])
dstPath := filepath.Join(dstDir, oid)

if err := os.MkdirAll(dstDir, 0755); err != nil {
return "", "", fmt.Errorf("mkdir %s: %w", dstDir, err)
}

if err := os.Rename(tmpPath, dstPath); err != nil {
return "", "", fmt.Errorf("rename %s to %s: %w", tmpPath, dstPath, err)
}
cleanup = false
return computedSHA, dstPath, nil

}

func firstAccessURL(drsObj *drs.DRSObject) (string, error) {
if len(drsObj.AccessMethods) == 0 {
return "", fmt.Errorf("drs object has no access methods")
}
for _, am := range drsObj.AccessMethods {
u := strings.TrimSpace(am.AccessURL.URL)
if u != "" {
return u, nil
}
}
return "", fmt.Errorf("drs object has no access URL")
}

func openObjectReader(ctx context.Context, rawURL string) (io.ReadCloser, error) {
switch DetectPlatform(rawURL) {
case PlatformS3:
params := S3ObjectParameters{
S3URL: rawURL,
AWSAccessKey: os.Getenv(AWS_KEY_ENV_VAR),
AWSSecretKey: os.Getenv(AWS_SECRET_ENV_VAR),
AWSRegion: os.Getenv(AWS_REGION_ENV_VAR),
AWSEndpoint: os.Getenv(AWS_ENDPOINT_URL_ENV_VAR),
}
return AgentFetchReader(ctx, params)
case PlatformGCS:
return openGCSReader(ctx, rawURL)
case PlatformAzure:
return openAzureReader(ctx, rawURL)
default:
return nil, fmt.Errorf("unsupported URL: cannot detect cloud platform for %q", rawURL)
}
}

func openGCSReader(ctx context.Context, rawURL string) (io.ReadCloser, error) {
bucket, key, err := parseGCSURL(rawURL)
if err != nil {
return nil, fmt.Errorf("GCS: %w", err)
}

httpClient, err := google.DefaultClient(ctx, "https://www.googleapis.com/auth/devstorage.read_only")
if err != nil {
return nil, fmt.Errorf("GCS: build credentials (is %s set?): %w", GCSCredentialsEnvVar, err)
}

apiURL := fmt.Sprintf("%s/storage/v1/b/%s/o/%s?alt=media", gcsAPIBase,
url.PathEscape(bucket),
url.PathEscape(key),
)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
if err != nil {
return nil, fmt.Errorf("GCS: build request: %w", err)
}

resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("GCS: request failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("GCS: HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}
return resp.Body, nil
}

func openAzureReader(ctx context.Context, rawURL string) (io.ReadCloser, error) {
account, container, blob, err := parseAzureURL(rawURL)
if err != nil {
return nil, fmt.Errorf("Azure: %w", err)
}

if v := os.Getenv(AzureAccountEnvVar); v != "" {
account = v
}
storageKey := os.Getenv(AzureKeyEnvVar)
sasToken := os.Getenv(AzureSASTokenEnvVar)
if connStr := os.Getenv(AzureConnStringEnvVar); connStr != "" && (account == "" || storageKey == "") {
account, storageKey = parseAzureConnString(connStr)
}

if account == "" {
return nil, fmt.Errorf("Azure: %s must be set", AzureAccountEnvVar)
}
if storageKey == "" && sasToken == "" {
return nil, fmt.Errorf("Azure: one of %s, %s, or %s must be set",
AzureKeyEnvVar, AzureSASTokenEnvVar, AzureConnStringEnvVar)
}

base := azureBlobBase
if base == "" {
base = fmt.Sprintf("https://%s.blob.core.windows.net", account)
}
blobEndpoint := fmt.Sprintf("%s/%s/%s", base, container, blob)

reqURL := blobEndpoint
if sasToken != "" {
reqURL = blobEndpoint + "?" + strings.TrimPrefix(sasToken, "?")
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return nil, fmt.Errorf("Azure: build request: %w", err)
}
req.Header.Set("x-ms-date", time.Now().UTC().Format(http.TimeFormat))
req.Header.Set("x-ms-version", azureBlobAPIVersion)

if storageKey != "" && sasToken == "" {
if err := signAzureSharedKey(req, account, container, blob, storageKey); err != nil {
return nil, fmt.Errorf("Azure: sign request: %w", err)
}
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("Azure: GET failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("Azure: HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}
return resp.Body, nil
}

// DownloadOLD deprecated: downloads the S3 object to a temporary file while computing its SHA256 hash.
// returns the computed SHA256 hash, temporary path and any error encountered.
func Download(ctx context.Context, info *S3Object, s3Input S3ObjectParameters, lfsRoot string) (string, string, error) {
func DownloadOLD(ctx context.Context, info *S3Object, s3Input S3ObjectParameters, lfsRoot string) (string, string, error) {
// 2) object destination
etag := info.ETag
subdir1, subdir2 := "xx", "yy"
Expand Down Expand Up @@ -91,3 +313,9 @@ func Download(ctx context.Context, info *S3Object, s3Input S3ObjectParameters, l
computedSHA := fmt.Sprintf("%x", h.Sum(nil))
return computedSHA, tmpObj, nil
}

// GetSHA256 computes the SHA256 hash of the input string and returns it as a hex-encoded string.
func GetSHA256(s string) string {
h := sha256.Sum256([]byte(s))
return hex.EncodeToString(h[:])
}
Loading
Loading