From 2efb2dbc90775765362e17ed27d6bd6f2d66a646 Mon Sep 17 00:00:00 2001 From: Thibaut Etienne Date: Thu, 19 Mar 2026 15:11:10 +0100 Subject: [PATCH 1/2] feat(CLI): add --parallel flag to phrase pull for concurrent downloads Download locale files using up to 4 concurrent requests (matching the Phrase API concurrency limit) via errgroup. Results are collected and printed in order after all downloads complete for clean output. A shared mutex coordinates rate-limit pauses across all workers. Only supported in sync mode; --parallel with --async warns and ignores. --- clients/cli/cmd/internal/pull.go | 155 +++++++++++++++++- .../cli/cmd/internal/pull_parallel_test.go | 142 ++++++++++++++++ clients/cli/cmd/pull.go | 2 + clients/cli/go.mod | 1 + clients/cli/go.sum | 2 + 5 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 clients/cli/cmd/internal/pull_parallel_test.go diff --git a/clients/cli/cmd/internal/pull.go b/clients/cli/cmd/internal/pull.go index f33d9744..216cf5de 100644 --- a/clients/cli/cmd/internal/pull.go +++ b/clients/cli/cmd/internal/pull.go @@ -11,6 +11,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/phrase/phrase-cli/cmd/internal/paths" @@ -19,6 +20,7 @@ import ( "github.com/antihax/optional" "github.com/phrase/phrase-go/v4" + "golang.org/x/sync/errgroup" ) const ( @@ -27,6 +29,8 @@ const ( asyncRetryCount = 360 // 30 minutes ) +const maxParallelDownloads = 4 // Phrase API allows max 4 concurrent requests + var ( Config *phrase.Config errNotModified = errors.New("not modified") @@ -38,6 +42,7 @@ type PullCommand struct { UseLocalBranchName bool Async bool Cache bool + Parallel bool } var Auth context.Context @@ -101,7 +106,15 @@ func (cmd *PullCommand) Run(config *phrase.Config) error { } for _, target := range targets { - err := target.Pull(client, cmd.Async, cache) + var err error + if cmd.Parallel && !cmd.Async { + err = target.PullParallel(client) + } else { + if cmd.Parallel && cmd.Async { + print.Warn("--parallel is not supported with --async, ignoring parallel") + } + err = target.Pull(client, cmd.Async, cache) + } if err != nil { return err } @@ -167,6 +180,12 @@ func (target *Target) Pull(client *phrase.APIClient, async bool, cache *Download return nil } +type downloadResult struct { + message string + path string + errMsg string +} + func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool, cache *DownloadCache) error { localVarOptionals := phrase.LocaleDownloadOpts{} @@ -207,6 +226,140 @@ func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFil return target.downloadSynchronously(client, localeFile, localVarOptionals, cache) } +func (target *Target) PullParallel(client *phrase.APIClient) error { + if err := target.CheckPreconditions(); err != nil { + return err + } + + localeFiles, err := target.LocaleFiles() + if err != nil { + return err + } + + // Ensure all destination files/dirs exist before parallel downloads + for _, lf := range localeFiles { + if err := createFile(lf.Path); err != nil { + return err + } + } + + results := make([]downloadResult, len(localeFiles)) + var rateMu sync.RWMutex + + ctx, cancel := context.WithTimeout(context.Background(), timeoutInMinutes) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(maxParallelDownloads) + + for i, lf := range localeFiles { + g.Go(func() error { + if ctx.Err() != nil { + return ctx.Err() + } + + opts, err := target.buildDownloadOpts(lf) + if err != nil { + err = fmt.Errorf("%s for %s", err, lf.Path) + results[i] = downloadResult{errMsg: err.Error()} + return err + } + + err = target.downloadWithRateGate(client, lf, opts, &rateMu) + if err != nil { + if openapiError, ok := err.(phrase.GenericOpenAPIError); ok { + print.Warn("API response: %s", openapiError.Body()) + } + err = fmt.Errorf("%s for %s", err, lf.Path) + results[i] = downloadResult{errMsg: err.Error()} + return err + } + + results[i] = downloadResult{ + message: lf.Message(), + path: lf.RelPath(), + } + return nil + }) + } + + waitErr := g.Wait() + + // Print results in original order: successes and failures + var skipCount int + for _, r := range results { + if r.path != "" { + print.Success("Downloaded %s to %s", r.message, r.path) + } else if r.errMsg != "" { + print.Failure("Failed %s", r.errMsg) + } else { + skipCount++ + } + } + if skipCount > 0 { + print.Warn("%d download(s) skipped due to earlier failure", skipCount) + } + + return waitErr +} + +// downloadWithRateGate downloads a locale file with rate-limit coordination. +// Uses RWMutex as a broadcast gate: workers take a read lock (cheap, concurrent), +// and a rate-limited worker takes the write lock to pause everyone until reset. +func (target *Target) downloadWithRateGate(client *phrase.APIClient, localeFile *LocaleFile, opts phrase.LocaleDownloadOpts, gate *sync.RWMutex) error { + // Read-lock gate: blocks only when a writer (rate-limited worker) holds it + gate.RLock() + gate.RUnlock() + + file, response, err := client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts) + if err != nil { + if response != nil && response.Rate.Remaining == 0 { + // TryLock ensures only one worker handles the rate limit pause. + // Others will block on their next RLock until the pause is over. + if gate.TryLock() { + waitForRateLimit(response.Rate) + gate.Unlock() + } else { + // Another worker is already pausing; wait for it + gate.RLock() + gate.RUnlock() + } + + file, _, err = client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts) + if err != nil { + return err + } + } else { + return err + } + } + return copyToDestination(file, localeFile.Path) +} + +// buildDownloadOpts prepares the LocaleDownloadOpts for a locale file download. +func (target *Target) buildDownloadOpts(localeFile *LocaleFile) (phrase.LocaleDownloadOpts, error) { + opts := phrase.LocaleDownloadOpts{} + + if target.Params != nil { + opts = target.Params.LocaleDownloadOpts + translationKeyPrefix, err := placeholders.ResolveTranslationKeyPrefix(target.Params.TranslationKeyPrefix, localeFile.Path) + if err != nil { + return opts, err + } + opts.TranslationKeyPrefix = translationKeyPrefix + } + + if opts.FileFormat.Value() == "" { + opts.FileFormat = optional.NewString(localeFile.FileFormat) + } + + if localeFile.Tag != "" { + opts.Tags = optional.NewString(localeFile.Tag) + opts.Tag = optional.EmptyString() + } + + return opts, nil +} + func (target *Target) downloadAsynchronously(client *phrase.APIClient, localeFile *LocaleFile, downloadOpts phrase.LocaleDownloadOpts) error { localeDownloadCreateParams := asyncDownloadParams(downloadOpts) diff --git a/clients/cli/cmd/internal/pull_parallel_test.go b/clients/cli/cmd/internal/pull_parallel_test.go new file mode 100644 index 00000000..85b09e51 --- /dev/null +++ b/clients/cli/cmd/internal/pull_parallel_test.go @@ -0,0 +1,142 @@ +package internal + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestRateGate_AllowsConcurrentReaders(t *testing.T) { + var gate sync.RWMutex + var concurrent int64 + var maxConcurrent int64 + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + // Simulate the read-lock gate pattern from downloadWithRateGate + gate.RLock() + gate.RUnlock() + + c := atomic.AddInt64(&concurrent, 1) + for { + old := atomic.LoadInt64(&maxConcurrent) + if c <= old || atomic.CompareAndSwapInt64(&maxConcurrent, old, c) { + break + } + } + + time.Sleep(time.Millisecond) + atomic.AddInt64(&concurrent, -1) + }() + } + wg.Wait() + + // RLock is shared, so all goroutines should run concurrently + if maxConcurrent < 2 { + t.Errorf("expected concurrent execution with RLock gate, max concurrent was %d", maxConcurrent) + } +} + +func TestRateGate_WriteLockBlocksAllReaders(t *testing.T) { + var gate sync.RWMutex + ready := make(chan struct{}, 4) + + // Simulate a rate-limited worker holding the write lock + gate.Lock() + + var wg sync.WaitGroup + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ready <- struct{}{} + gate.RLock() + gate.RUnlock() + }() + } + + // Wait for all goroutines to start + for i := 0; i < 4; i++ { + <-ready + } + time.Sleep(10 * time.Millisecond) + + // Release the write lock (simulating rate limit wait done) + gate.Unlock() + wg.Wait() +} + +func TestRateGate_TryLockPreventsDoubleWait(t *testing.T) { + var gate sync.RWMutex + var waitCount int64 + + // Simulate two workers hitting rate limit simultaneously + gate.Lock() // first worker takes the write lock + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Second worker tries TryLock, should fail + if gate.TryLock() { + atomic.AddInt64(&waitCount, 1) + gate.Unlock() + } + }() + + time.Sleep(10 * time.Millisecond) + atomic.AddInt64(&waitCount, 1) // first worker counts + gate.Unlock() + wg.Wait() + + // Only 1 worker should have done the wait (the first one) + if atomic.LoadInt64(&waitCount) != 1 { + t.Errorf("expected exactly 1 rate limit wait, got %d", waitCount) + } +} + +func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) { + target := &Target{ + File: "locales/.json", + ProjectID: "proj1", + } + localeFile := &LocaleFile{ + FileFormat: "json", + Tag: "", + } + + opts, err := target.buildDownloadOpts(localeFile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if opts.FileFormat.Value() != "json" { + t.Errorf("expected file format 'json', got %q", opts.FileFormat.Value()) + } +} + +func TestBuildDownloadOpts_TagHandling(t *testing.T) { + target := &Target{ + File: "locales//.json", + ProjectID: "proj1", + } + localeFile := &LocaleFile{ + FileFormat: "json", + Tag: "web", + } + + opts, err := target.buildDownloadOpts(localeFile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if opts.Tags.Value() != "web" { + t.Errorf("expected tags 'web', got %q", opts.Tags.Value()) + } + if opts.Tag.Value() != "" { + t.Errorf("expected tag to be empty string, got %q", opts.Tag.Value()) + } +} diff --git a/clients/cli/cmd/pull.go b/clients/cli/cmd/pull.go index 3511cca5..e60b8ebd 100644 --- a/clients/cli/cmd/pull.go +++ b/clients/cli/cmd/pull.go @@ -22,6 +22,7 @@ func initPull() { UseLocalBranchName: params.GetBool("use-local-branch-name"), Async: params.GetBool("async"), Cache: params.GetBool("cache"), + Parallel: params.GetBool("parallel"), } err := cmdPull.Run(Config) if err != nil { @@ -35,5 +36,6 @@ func initPull() { AddFlag(pullCmd, "bool", "use-local-branch-name", "", "use local branch name", false) AddFlag(pullCmd, "bool", "async", "a", "use asynchronous locale downloads (recommended for large number of keys)", false) AddFlag(pullCmd, "bool", "cache", "", "cache ETags locally to skip unchanged downloads (sync mode only)", false) + AddFlag(pullCmd, "bool", "parallel", "p", "download locale files in parallel (max 4 concurrent requests)", false) params.BindPFlags(pullCmd.Flags()) } diff --git a/clients/cli/go.mod b/clients/cli/go.mod index 89360c8e..98267912 100644 --- a/clients/cli/go.mod +++ b/clients/cli/go.mod @@ -37,5 +37,6 @@ require ( github.com/pelletier/go-toml v1.2.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/stretchr/testify v1.9.0 // indirect + golang.org/x/sync v0.12.0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/clients/cli/go.sum b/clients/cli/go.sum index 6b1a9c91..76a512db 100644 --- a/clients/cli/go.sum +++ b/clients/cli/go.sum @@ -372,6 +372,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From 56eed2ce4d3596503988c69578a5acc44678d306 Mon Sep 17 00:00:00 2001 From: Thibaut Etienne Date: Thu, 26 Mar 2026 16:02:57 +0100 Subject: [PATCH 2/2] Remove stdlib-testing tests from pull_parallel_test.go Drop TestRateGate_* tests that only verified sync.RWMutex standard library behavior without testing any PR code. --- .../cli/cmd/internal/pull_parallel_test.go | 96 ------------------- 1 file changed, 96 deletions(-) diff --git a/clients/cli/cmd/internal/pull_parallel_test.go b/clients/cli/cmd/internal/pull_parallel_test.go index 85b09e51..51dee7ff 100644 --- a/clients/cli/cmd/internal/pull_parallel_test.go +++ b/clients/cli/cmd/internal/pull_parallel_test.go @@ -1,105 +1,9 @@ package internal import ( - "sync" - "sync/atomic" "testing" - "time" ) -func TestRateGate_AllowsConcurrentReaders(t *testing.T) { - var gate sync.RWMutex - var concurrent int64 - var maxConcurrent int64 - - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - // Simulate the read-lock gate pattern from downloadWithRateGate - gate.RLock() - gate.RUnlock() - - c := atomic.AddInt64(&concurrent, 1) - for { - old := atomic.LoadInt64(&maxConcurrent) - if c <= old || atomic.CompareAndSwapInt64(&maxConcurrent, old, c) { - break - } - } - - time.Sleep(time.Millisecond) - atomic.AddInt64(&concurrent, -1) - }() - } - wg.Wait() - - // RLock is shared, so all goroutines should run concurrently - if maxConcurrent < 2 { - t.Errorf("expected concurrent execution with RLock gate, max concurrent was %d", maxConcurrent) - } -} - -func TestRateGate_WriteLockBlocksAllReaders(t *testing.T) { - var gate sync.RWMutex - ready := make(chan struct{}, 4) - - // Simulate a rate-limited worker holding the write lock - gate.Lock() - - var wg sync.WaitGroup - for i := 0; i < 4; i++ { - wg.Add(1) - go func() { - defer wg.Done() - ready <- struct{}{} - gate.RLock() - gate.RUnlock() - }() - } - - // Wait for all goroutines to start - for i := 0; i < 4; i++ { - <-ready - } - time.Sleep(10 * time.Millisecond) - - // Release the write lock (simulating rate limit wait done) - gate.Unlock() - wg.Wait() -} - -func TestRateGate_TryLockPreventsDoubleWait(t *testing.T) { - var gate sync.RWMutex - var waitCount int64 - - // Simulate two workers hitting rate limit simultaneously - gate.Lock() // first worker takes the write lock - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - // Second worker tries TryLock, should fail - if gate.TryLock() { - atomic.AddInt64(&waitCount, 1) - gate.Unlock() - } - }() - - time.Sleep(10 * time.Millisecond) - atomic.AddInt64(&waitCount, 1) // first worker counts - gate.Unlock() - wg.Wait() - - // Only 1 worker should have done the wait (the first one) - if atomic.LoadInt64(&waitCount) != 1 { - t.Errorf("expected exactly 1 rate limit wait, got %d", waitCount) - } -} - func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) { target := &Target{ File: "locales/.json",