diff --git a/clients/cli/cmd/internal/pull.go b/clients/cli/cmd/internal/pull.go index f33d9744..216cf5de 100644 --- a/clients/cli/cmd/internal/pull.go +++ b/clients/cli/cmd/internal/pull.go @@ -11,6 +11,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/phrase/phrase-cli/cmd/internal/paths" @@ -19,6 +20,7 @@ import ( "github.com/antihax/optional" "github.com/phrase/phrase-go/v4" + "golang.org/x/sync/errgroup" ) const ( @@ -27,6 +29,8 @@ const ( asyncRetryCount = 360 // 30 minutes ) +const maxParallelDownloads = 4 // Phrase API allows max 4 concurrent requests + var ( Config *phrase.Config errNotModified = errors.New("not modified") @@ -38,6 +42,7 @@ type PullCommand struct { UseLocalBranchName bool Async bool Cache bool + Parallel bool } var Auth context.Context @@ -101,7 +106,15 @@ func (cmd *PullCommand) Run(config *phrase.Config) error { } for _, target := range targets { - err := target.Pull(client, cmd.Async, cache) + var err error + if cmd.Parallel && !cmd.Async { + err = target.PullParallel(client) + } else { + if cmd.Parallel && cmd.Async { + print.Warn("--parallel is not supported with --async, ignoring parallel") + } + err = target.Pull(client, cmd.Async, cache) + } if err != nil { return err } @@ -167,6 +180,12 @@ func (target *Target) Pull(client *phrase.APIClient, async bool, cache *Download return nil } +type downloadResult struct { + message string + path string + errMsg string +} + func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool, cache *DownloadCache) error { localVarOptionals := phrase.LocaleDownloadOpts{} @@ -207,6 +226,140 @@ func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFil return target.downloadSynchronously(client, localeFile, localVarOptionals, cache) } +func (target *Target) PullParallel(client *phrase.APIClient) error { + if err := target.CheckPreconditions(); err != nil { + return err + } + + localeFiles, err := target.LocaleFiles() + if err != nil { + return err + } + + // Ensure all destination files/dirs exist before parallel downloads + for _, lf := range localeFiles { + if err := createFile(lf.Path); err != nil { + return err + } + } + + results := make([]downloadResult, len(localeFiles)) + var rateMu sync.RWMutex + + ctx, cancel := context.WithTimeout(context.Background(), timeoutInMinutes) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(maxParallelDownloads) + + for i, lf := range localeFiles { + g.Go(func() error { + if ctx.Err() != nil { + return ctx.Err() + } + + opts, err := target.buildDownloadOpts(lf) + if err != nil { + err = fmt.Errorf("%s for %s", err, lf.Path) + results[i] = downloadResult{errMsg: err.Error()} + return err + } + + err = target.downloadWithRateGate(client, lf, opts, &rateMu) + if err != nil { + if openapiError, ok := err.(phrase.GenericOpenAPIError); ok { + print.Warn("API response: %s", openapiError.Body()) + } + err = fmt.Errorf("%s for %s", err, lf.Path) + results[i] = downloadResult{errMsg: err.Error()} + return err + } + + results[i] = downloadResult{ + message: lf.Message(), + path: lf.RelPath(), + } + return nil + }) + } + + waitErr := g.Wait() + + // Print results in original order: successes and failures + var skipCount int + for _, r := range results { + if r.path != "" { + print.Success("Downloaded %s to %s", r.message, r.path) + } else if r.errMsg != "" { + print.Failure("Failed %s", r.errMsg) + } else { + skipCount++ + } + } + if skipCount > 0 { + print.Warn("%d download(s) skipped due to earlier failure", skipCount) + } + + return waitErr +} + +// downloadWithRateGate downloads a locale file with rate-limit coordination. +// Uses RWMutex as a broadcast gate: workers take a read lock (cheap, concurrent), +// and a rate-limited worker takes the write lock to pause everyone until reset. +func (target *Target) downloadWithRateGate(client *phrase.APIClient, localeFile *LocaleFile, opts phrase.LocaleDownloadOpts, gate *sync.RWMutex) error { + // Read-lock gate: blocks only when a writer (rate-limited worker) holds it + gate.RLock() + gate.RUnlock() + + file, response, err := client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts) + if err != nil { + if response != nil && response.Rate.Remaining == 0 { + // TryLock ensures only one worker handles the rate limit pause. + // Others will block on their next RLock until the pause is over. + if gate.TryLock() { + waitForRateLimit(response.Rate) + gate.Unlock() + } else { + // Another worker is already pausing; wait for it + gate.RLock() + gate.RUnlock() + } + + file, _, err = client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts) + if err != nil { + return err + } + } else { + return err + } + } + return copyToDestination(file, localeFile.Path) +} + +// buildDownloadOpts prepares the LocaleDownloadOpts for a locale file download. +func (target *Target) buildDownloadOpts(localeFile *LocaleFile) (phrase.LocaleDownloadOpts, error) { + opts := phrase.LocaleDownloadOpts{} + + if target.Params != nil { + opts = target.Params.LocaleDownloadOpts + translationKeyPrefix, err := placeholders.ResolveTranslationKeyPrefix(target.Params.TranslationKeyPrefix, localeFile.Path) + if err != nil { + return opts, err + } + opts.TranslationKeyPrefix = translationKeyPrefix + } + + if opts.FileFormat.Value() == "" { + opts.FileFormat = optional.NewString(localeFile.FileFormat) + } + + if localeFile.Tag != "" { + opts.Tags = optional.NewString(localeFile.Tag) + opts.Tag = optional.EmptyString() + } + + return opts, nil +} + func (target *Target) downloadAsynchronously(client *phrase.APIClient, localeFile *LocaleFile, downloadOpts phrase.LocaleDownloadOpts) error { localeDownloadCreateParams := asyncDownloadParams(downloadOpts) diff --git a/clients/cli/cmd/internal/pull_parallel_test.go b/clients/cli/cmd/internal/pull_parallel_test.go new file mode 100644 index 00000000..51dee7ff --- /dev/null +++ b/clients/cli/cmd/internal/pull_parallel_test.go @@ -0,0 +1,46 @@ +package internal + +import ( + "testing" +) + +func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) { + target := &Target{ + File: "locales/.json", + ProjectID: "proj1", + } + localeFile := &LocaleFile{ + FileFormat: "json", + Tag: "", + } + + opts, err := target.buildDownloadOpts(localeFile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if opts.FileFormat.Value() != "json" { + t.Errorf("expected file format 'json', got %q", opts.FileFormat.Value()) + } +} + +func TestBuildDownloadOpts_TagHandling(t *testing.T) { + target := &Target{ + File: "locales//.json", + ProjectID: "proj1", + } + localeFile := &LocaleFile{ + FileFormat: "json", + Tag: "web", + } + + opts, err := target.buildDownloadOpts(localeFile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if opts.Tags.Value() != "web" { + t.Errorf("expected tags 'web', got %q", opts.Tags.Value()) + } + if opts.Tag.Value() != "" { + t.Errorf("expected tag to be empty string, got %q", opts.Tag.Value()) + } +} diff --git a/clients/cli/cmd/pull.go b/clients/cli/cmd/pull.go index 3511cca5..e60b8ebd 100644 --- a/clients/cli/cmd/pull.go +++ b/clients/cli/cmd/pull.go @@ -22,6 +22,7 @@ func initPull() { UseLocalBranchName: params.GetBool("use-local-branch-name"), Async: params.GetBool("async"), Cache: params.GetBool("cache"), + Parallel: params.GetBool("parallel"), } err := cmdPull.Run(Config) if err != nil { @@ -35,5 +36,6 @@ func initPull() { AddFlag(pullCmd, "bool", "use-local-branch-name", "", "use local branch name", false) AddFlag(pullCmd, "bool", "async", "a", "use asynchronous locale downloads (recommended for large number of keys)", false) AddFlag(pullCmd, "bool", "cache", "", "cache ETags locally to skip unchanged downloads (sync mode only)", false) + AddFlag(pullCmd, "bool", "parallel", "p", "download locale files in parallel (max 4 concurrent requests)", false) params.BindPFlags(pullCmd.Flags()) } diff --git a/clients/cli/go.mod b/clients/cli/go.mod index 89360c8e..98267912 100644 --- a/clients/cli/go.mod +++ b/clients/cli/go.mod @@ -37,5 +37,6 @@ require ( github.com/pelletier/go-toml v1.2.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/stretchr/testify v1.9.0 // indirect + golang.org/x/sync v0.12.0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/clients/cli/go.sum b/clients/cli/go.sum index 6b1a9c91..76a512db 100644 --- a/clients/cli/go.sum +++ b/clients/cli/go.sum @@ -372,6 +372,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=