Skip to content

Commit 2fa4000

Browse files
committed
feat(CLI): add --parallel flag to phrase pull for concurrent downloads
Download locale files using up to 4 concurrent requests (matching the Phrase API concurrency limit) via errgroup. Results are collected and printed in order after all downloads complete for clean output. A shared mutex coordinates rate-limit pauses across all workers. Only supported in sync mode; --parallel with --async warns and ignores.
1 parent 4f6cb6d commit 2fa4000

File tree

5 files changed

+263
-16
lines changed

5 files changed

+263
-16
lines changed

clients/cli/cmd/internal/pull.go

Lines changed: 143 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path/filepath"
1111
"reflect"
1212
"strings"
13+
"sync"
1314
"time"
1415

1516
"github.com/phrase/phrase-cli/cmd/internal/paths"
@@ -18,6 +19,7 @@ import (
1819

1920
"github.com/antihax/optional"
2021
"github.com/phrase/phrase-go/v4"
22+
"golang.org/x/sync/errgroup"
2123
)
2224

2325
const (
@@ -26,13 +28,16 @@ const (
2628
asyncRetryCount = 360 // 30 minutes
2729
)
2830

31+
const maxParallelDownloads = 4 // Phrase API allows max 4 concurrent requests
32+
2933
var Config *phrase.Config
3034

3135
type PullCommand struct {
3236
phrase.Config
3337
Branch string
3438
UseLocalBranchName bool
3539
Async bool
40+
Parallel bool
3641
}
3742

3843
var Auth context.Context
@@ -82,7 +87,15 @@ func (cmd *PullCommand) Run(config *phrase.Config) error {
8287
}
8388

8489
for _, target := range targets {
85-
err := target.Pull(client, cmd.Async)
90+
var err error
91+
if cmd.Parallel && !cmd.Async {
92+
err = target.PullParallel(client)
93+
} else {
94+
if cmd.Parallel && cmd.Async {
95+
print.Warn("--parallel is not supported with --async, ignoring parallel")
96+
}
97+
err = target.Pull(client, cmd.Async)
98+
}
8699
if err != nil {
87100
return err
88101
}
@@ -146,25 +159,144 @@ func (target *Target) Pull(client *phrase.APIClient, async bool) error {
146159
return nil
147160
}
148161

149-
func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool) error {
150-
localVarOptionals := phrase.LocaleDownloadOpts{}
162+
type downloadResult struct {
163+
message string
164+
path string
165+
errMsg string
166+
}
167+
168+
func (target *Target) PullParallel(client *phrase.APIClient) error {
169+
if err := target.CheckPreconditions(); err != nil {
170+
return err
171+
}
172+
173+
localeFiles, err := target.LocaleFiles()
174+
if err != nil {
175+
return err
176+
}
177+
178+
// Ensure all destination files/dirs exist before parallel downloads
179+
for _, lf := range localeFiles {
180+
if err := createFile(lf.Path); err != nil {
181+
return err
182+
}
183+
}
184+
185+
results := make([]downloadResult, len(localeFiles))
186+
var rateMu sync.Mutex
187+
188+
ctx, cancel := context.WithTimeout(context.Background(), timeoutInMinutes)
189+
defer cancel()
190+
g, ctx := errgroup.WithContext(ctx)
191+
g.SetLimit(maxParallelDownloads)
192+
193+
for i, lf := range localeFiles {
194+
g.Go(func() error {
195+
if ctx.Err() != nil {
196+
return ctx.Err()
197+
}
198+
199+
opts, err := target.buildDownloadOpts(lf)
200+
if err != nil {
201+
err = fmt.Errorf("%s for %s", err, lf.Path)
202+
results[i] = downloadResult{errMsg: err.Error()}
203+
return err
204+
}
205+
206+
err = target.downloadWithRateLimitRetry(client, lf, opts, &rateMu)
207+
if err != nil {
208+
if openapiError, ok := err.(phrase.GenericOpenAPIError); ok {
209+
print.Warn("API response: %s", openapiError.Body())
210+
}
211+
err = fmt.Errorf("%s for %s", err, lf.Path)
212+
results[i] = downloadResult{errMsg: err.Error()}
213+
return err
214+
}
215+
216+
results[i] = downloadResult{
217+
message: lf.Message(),
218+
path: lf.RelPath(),
219+
}
220+
return nil
221+
})
222+
}
223+
224+
waitErr := g.Wait()
225+
226+
// Print results in original order: successes and failures
227+
var skipCount int
228+
for _, r := range results {
229+
if r.path != "" {
230+
print.Success("Downloaded %s to %s", r.message, r.path)
231+
} else if r.errMsg != "" {
232+
print.Failure("Failed %s", r.errMsg)
233+
} else {
234+
skipCount++
235+
}
236+
}
237+
if skipCount > 0 {
238+
print.Warn("%d download(s) skipped due to earlier failure", skipCount)
239+
}
240+
241+
return waitErr
242+
}
243+
244+
// downloadWithRateLimitRetry downloads a locale file with rate-limit coordination.
245+
// The shared mutex pauses all workers when any worker hits the API rate limit.
246+
func (target *Target) downloadWithRateLimitRetry(client *phrase.APIClient, localeFile *LocaleFile, opts phrase.LocaleDownloadOpts, rateMu *sync.Mutex) error {
247+
// Gate: block if another worker is waiting out a rate limit
248+
rateMu.Lock()
249+
rateMu.Unlock()
250+
251+
file, response, err := client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
252+
if err != nil {
253+
if response != nil && response.Rate.Remaining == 0 {
254+
// Hold the mutex while waiting for rate limit reset;
255+
// this blocks other workers from making requests.
256+
rateMu.Lock()
257+
waitForRateLimit(response.Rate)
258+
rateMu.Unlock()
259+
260+
file, _, err = client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
261+
if err != nil {
262+
return err
263+
}
264+
} else {
265+
return err
266+
}
267+
}
268+
return copyToDestination(file, localeFile.Path)
269+
}
270+
271+
// buildDownloadOpts prepares the LocaleDownloadOpts for a locale file download.
272+
func (target *Target) buildDownloadOpts(localeFile *LocaleFile) (phrase.LocaleDownloadOpts, error) {
273+
opts := phrase.LocaleDownloadOpts{}
151274

152275
if target.Params != nil {
153-
localVarOptionals = target.Params.LocaleDownloadOpts
276+
opts = target.Params.LocaleDownloadOpts
154277
translationKeyPrefix, err := placeholders.ResolveTranslationKeyPrefix(target.Params.TranslationKeyPrefix, localeFile.Path)
155278
if err != nil {
156-
return err
279+
return opts, err
157280
}
158-
localVarOptionals.TranslationKeyPrefix = translationKeyPrefix
281+
opts.TranslationKeyPrefix = translationKeyPrefix
159282
}
160283

161-
if localVarOptionals.FileFormat.Value() == "" {
162-
localVarOptionals.FileFormat = optional.NewString(localeFile.FileFormat)
284+
if opts.FileFormat.Value() == "" {
285+
opts.FileFormat = optional.NewString(localeFile.FileFormat)
163286
}
164287

165288
if localeFile.Tag != "" {
166-
localVarOptionals.Tags = optional.NewString(localeFile.Tag)
167-
localVarOptionals.Tag = optional.EmptyString()
289+
opts.Tags = optional.NewString(localeFile.Tag)
290+
opts.Tag = optional.EmptyString()
291+
}
292+
293+
return opts, nil
294+
}
295+
296+
func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool) error {
297+
localVarOptionals, err := target.buildDownloadOpts(localeFile)
298+
if err != nil {
299+
return err
168300
}
169301

170302
debugFprintln("Target file pattern:", target.File)
@@ -182,9 +314,8 @@ func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFil
182314

183315
if async {
184316
return target.downloadAsynchronously(client, localeFile, localVarOptionals)
185-
} else {
186-
return target.downloadSynchronously(client, localeFile, localVarOptionals)
187317
}
318+
return target.downloadSynchronously(client, localeFile, localVarOptionals)
188319
}
189320

190321
func (target *Target) downloadAsynchronously(client *phrase.APIClient, localeFile *LocaleFile, downloadOpts phrase.LocaleDownloadOpts) error {
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package internal
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestRateLimitMutex_AllowsConcurrentWhenUnlocked(t *testing.T) {
11+
var mu sync.Mutex
12+
var concurrent int64
13+
var maxConcurrent int64
14+
15+
var wg sync.WaitGroup
16+
for i := 0; i < 10; i++ {
17+
wg.Add(1)
18+
go func() {
19+
defer wg.Done()
20+
21+
// Simulate the gate pattern from downloadWithRateLimitRetry
22+
mu.Lock()
23+
mu.Unlock()
24+
25+
c := atomic.AddInt64(&concurrent, 1)
26+
for {
27+
old := atomic.LoadInt64(&maxConcurrent)
28+
if c <= old || atomic.CompareAndSwapInt64(&maxConcurrent, old, c) {
29+
break
30+
}
31+
}
32+
33+
time.Sleep(time.Millisecond)
34+
atomic.AddInt64(&concurrent, -1)
35+
}()
36+
}
37+
wg.Wait()
38+
39+
if maxConcurrent < 2 {
40+
t.Errorf("expected concurrent execution when mutex is free, max concurrent was %d", maxConcurrent)
41+
}
42+
}
43+
44+
func TestRateLimitMutex_BlocksAllWorkersWhenHeld(t *testing.T) {
45+
var mu sync.Mutex
46+
ready := make(chan struct{}, 4)
47+
48+
// Simulate a rate-limited worker holding the mutex
49+
mu.Lock()
50+
51+
var wg sync.WaitGroup
52+
for i := 0; i < 4; i++ {
53+
wg.Add(1)
54+
go func() {
55+
defer wg.Done()
56+
ready <- struct{}{}
57+
mu.Lock()
58+
mu.Unlock()
59+
}()
60+
}
61+
62+
// Wait for all goroutines to start
63+
for i := 0; i < 4; i++ {
64+
<-ready
65+
}
66+
// Give goroutines time to reach the mu.Lock() call
67+
time.Sleep(10 * time.Millisecond)
68+
69+
// Release the lock (simulating rate limit wait done)
70+
mu.Unlock()
71+
wg.Wait()
72+
}
73+
74+
func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) {
75+
target := &Target{
76+
File: "locales/<locale_name>.json",
77+
ProjectID: "proj1",
78+
}
79+
localeFile := &LocaleFile{
80+
FileFormat: "json",
81+
Tag: "",
82+
}
83+
84+
opts, err := target.buildDownloadOpts(localeFile)
85+
if err != nil {
86+
t.Fatalf("unexpected error: %v", err)
87+
}
88+
if opts.FileFormat.Value() != "json" {
89+
t.Errorf("expected file format 'json', got %q", opts.FileFormat.Value())
90+
}
91+
}
92+
93+
func TestBuildDownloadOpts_TagHandling(t *testing.T) {
94+
target := &Target{
95+
File: "locales/<locale_name>/<tag>.json",
96+
ProjectID: "proj1",
97+
}
98+
localeFile := &LocaleFile{
99+
FileFormat: "json",
100+
Tag: "web",
101+
}
102+
103+
opts, err := target.buildDownloadOpts(localeFile)
104+
if err != nil {
105+
t.Fatalf("unexpected error: %v", err)
106+
}
107+
if opts.Tags.Value() != "web" {
108+
t.Errorf("expected tags 'web', got %q", opts.Tags.Value())
109+
}
110+
if opts.Tag.Value() != "" {
111+
t.Errorf("expected tag to be empty string, got %q", opts.Tag.Value())
112+
}
113+
}

clients/cli/cmd/pull.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func initPull() {
2121
Branch: params.GetString("branch"),
2222
UseLocalBranchName: params.GetBool("use-local-branch-name"),
2323
Async: params.GetBool("async"),
24+
Parallel: params.GetBool("parallel"),
2425
}
2526
err := cmdPull.Run(Config)
2627
if err != nil {
@@ -33,5 +34,6 @@ func initPull() {
3334
AddFlag(pullCmd, "string", "branch", "b", "branch", false)
3435
AddFlag(pullCmd, "bool", "use-local-branch-name", "", "use local branch name", false)
3536
AddFlag(pullCmd, "bool", "async", "a", "use asynchronous locale downloads (recommended for large number of keys)", false)
37+
AddFlag(pullCmd, "bool", "parallel", "p", "download locale files in parallel (max 4 concurrent requests)", false)
3638
params.BindPFlags(pullCmd.Flags())
3739
}

clients/cli/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ require (
3737
github.com/pelletier/go-toml v1.2.0 // indirect
3838
github.com/spf13/jwalterweatherman v1.1.0 // indirect
3939
github.com/stretchr/testify v1.9.0 // indirect
40+
golang.org/x/sync v0.12.0
4041
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
4142
)

clients/cli/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
215215
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
216216
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
217217
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
218-
github.com/phrase/phrase-go/v4 v4.18.1 h1:y1sv4z8ufEQB+kJA8ymSiH8nRAvH8gGoVSB5/7jvYEQ=
219-
github.com/phrase/phrase-go/v4 v4.18.1/go.mod h1:4XplKvrbHS2LDaXfFp9xrVDtO5xk2WHFm0htutwwd8c=
220-
github.com/phrase/phrase-go/v4 v4.19.0 h1:tNliCxO/0SMu2viLE9idzADBUoY9C6CqrDmp3ntgpQI=
221-
github.com/phrase/phrase-go/v4 v4.19.0/go.mod h1:4XplKvrbHS2LDaXfFp9xrVDtO5xk2WHFm0htutwwd8c=
218+
github.com/phrase/phrase-go/v4 v4.20.0 h1:UTgos4elHzv83XbC6hK2ulDVEvvi1215r8NzE8jC3bc=
219+
github.com/phrase/phrase-go/v4 v4.20.0/go.mod h1:4XplKvrbHS2LDaXfFp9xrVDtO5xk2WHFm0htutwwd8c=
222220
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
223221
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
224222
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -370,6 +368,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
370368
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
371369
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
372370
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
371+
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
372+
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
373373
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
374374
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
375375
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

0 commit comments

Comments
 (0)