Skip to content

Commit d93d302

Browse files
authored
feat(CLI): add --parallel flag to phrase pull for concurrent downloads (#1067)
1 parent 07b3182 commit d93d302

5 files changed

Lines changed: 205 additions & 1 deletion

File tree

clients/cli/cmd/internal/pull.go

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"path/filepath"
1212
"reflect"
1313
"strings"
14+
"sync"
1415
"time"
1516

1617
"github.com/phrase/phrase-cli/cmd/internal/paths"
@@ -19,6 +20,7 @@ import (
1920

2021
"github.com/antihax/optional"
2122
"github.com/phrase/phrase-go/v4"
23+
"golang.org/x/sync/errgroup"
2224
)
2325

2426
const (
@@ -27,6 +29,8 @@ const (
2729
asyncRetryCount = 360 // 30 minutes
2830
)
2931

32+
const maxParallelDownloads = 4 // Phrase API allows max 4 concurrent requests
33+
3034
var (
3135
Config *phrase.Config
3236
errNotModified = errors.New("not modified")
@@ -38,6 +42,7 @@ type PullCommand struct {
3842
UseLocalBranchName bool
3943
Async bool
4044
Cache bool
45+
Parallel bool
4146
}
4247

4348
var Auth context.Context
@@ -101,7 +106,15 @@ func (cmd *PullCommand) Run(config *phrase.Config) error {
101106
}
102107

103108
for _, target := range targets {
104-
err := target.Pull(client, cmd.Async, cache)
109+
var err error
110+
if cmd.Parallel && !cmd.Async {
111+
err = target.PullParallel(client)
112+
} else {
113+
if cmd.Parallel && cmd.Async {
114+
print.Warn("--parallel is not supported with --async, ignoring parallel")
115+
}
116+
err = target.Pull(client, cmd.Async, cache)
117+
}
105118
if err != nil {
106119
return err
107120
}
@@ -167,6 +180,12 @@ func (target *Target) Pull(client *phrase.APIClient, async bool, cache *Download
167180
return nil
168181
}
169182

183+
type downloadResult struct {
184+
message string
185+
path string
186+
errMsg string
187+
}
188+
170189
func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool, cache *DownloadCache) error {
171190
localVarOptionals := phrase.LocaleDownloadOpts{}
172191

@@ -207,6 +226,140 @@ func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFil
207226
return target.downloadSynchronously(client, localeFile, localVarOptionals, cache)
208227
}
209228

229+
func (target *Target) PullParallel(client *phrase.APIClient) error {
230+
if err := target.CheckPreconditions(); err != nil {
231+
return err
232+
}
233+
234+
localeFiles, err := target.LocaleFiles()
235+
if err != nil {
236+
return err
237+
}
238+
239+
// Ensure all destination files/dirs exist before parallel downloads
240+
for _, lf := range localeFiles {
241+
if err := createFile(lf.Path); err != nil {
242+
return err
243+
}
244+
}
245+
246+
results := make([]downloadResult, len(localeFiles))
247+
var rateMu sync.RWMutex
248+
249+
ctx, cancel := context.WithTimeout(context.Background(), timeoutInMinutes)
250+
defer cancel()
251+
g, ctx := errgroup.WithContext(ctx)
252+
g.SetLimit(maxParallelDownloads)
253+
254+
for i, lf := range localeFiles {
255+
g.Go(func() error {
256+
if ctx.Err() != nil {
257+
return ctx.Err()
258+
}
259+
260+
opts, err := target.buildDownloadOpts(lf)
261+
if err != nil {
262+
err = fmt.Errorf("%s for %s", err, lf.Path)
263+
results[i] = downloadResult{errMsg: err.Error()}
264+
return err
265+
}
266+
267+
err = target.downloadWithRateGate(client, lf, opts, &rateMu)
268+
if err != nil {
269+
if openapiError, ok := err.(phrase.GenericOpenAPIError); ok {
270+
print.Warn("API response: %s", openapiError.Body())
271+
}
272+
err = fmt.Errorf("%s for %s", err, lf.Path)
273+
results[i] = downloadResult{errMsg: err.Error()}
274+
return err
275+
}
276+
277+
results[i] = downloadResult{
278+
message: lf.Message(),
279+
path: lf.RelPath(),
280+
}
281+
return nil
282+
})
283+
}
284+
285+
waitErr := g.Wait()
286+
287+
// Print results in original order: successes and failures
288+
var skipCount int
289+
for _, r := range results {
290+
if r.path != "" {
291+
print.Success("Downloaded %s to %s", r.message, r.path)
292+
} else if r.errMsg != "" {
293+
print.Failure("Failed %s", r.errMsg)
294+
} else {
295+
skipCount++
296+
}
297+
}
298+
if skipCount > 0 {
299+
print.Warn("%d download(s) skipped due to earlier failure", skipCount)
300+
}
301+
302+
return waitErr
303+
}
304+
305+
// downloadWithRateGate downloads a locale file with rate-limit coordination.
306+
// Uses RWMutex as a broadcast gate: workers take a read lock (cheap, concurrent),
307+
// and a rate-limited worker takes the write lock to pause everyone until reset.
308+
func (target *Target) downloadWithRateGate(client *phrase.APIClient, localeFile *LocaleFile, opts phrase.LocaleDownloadOpts, gate *sync.RWMutex) error {
309+
// Read-lock gate: blocks only when a writer (rate-limited worker) holds it
310+
gate.RLock()
311+
gate.RUnlock()
312+
313+
file, response, err := client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
314+
if err != nil {
315+
if response != nil && response.Rate.Remaining == 0 {
316+
// TryLock ensures only one worker handles the rate limit pause.
317+
// Others will block on their next RLock until the pause is over.
318+
if gate.TryLock() {
319+
waitForRateLimit(response.Rate)
320+
gate.Unlock()
321+
} else {
322+
// Another worker is already pausing; wait for it
323+
gate.RLock()
324+
gate.RUnlock()
325+
}
326+
327+
file, _, err = client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
328+
if err != nil {
329+
return err
330+
}
331+
} else {
332+
return err
333+
}
334+
}
335+
return copyToDestination(file, localeFile.Path)
336+
}
337+
338+
// buildDownloadOpts prepares the LocaleDownloadOpts for a locale file download.
339+
func (target *Target) buildDownloadOpts(localeFile *LocaleFile) (phrase.LocaleDownloadOpts, error) {
340+
opts := phrase.LocaleDownloadOpts{}
341+
342+
if target.Params != nil {
343+
opts = target.Params.LocaleDownloadOpts
344+
translationKeyPrefix, err := placeholders.ResolveTranslationKeyPrefix(target.Params.TranslationKeyPrefix, localeFile.Path)
345+
if err != nil {
346+
return opts, err
347+
}
348+
opts.TranslationKeyPrefix = translationKeyPrefix
349+
}
350+
351+
if opts.FileFormat.Value() == "" {
352+
opts.FileFormat = optional.NewString(localeFile.FileFormat)
353+
}
354+
355+
if localeFile.Tag != "" {
356+
opts.Tags = optional.NewString(localeFile.Tag)
357+
opts.Tag = optional.EmptyString()
358+
}
359+
360+
return opts, nil
361+
}
362+
210363
func (target *Target) downloadAsynchronously(client *phrase.APIClient, localeFile *LocaleFile, downloadOpts phrase.LocaleDownloadOpts) error {
211364
localeDownloadCreateParams := asyncDownloadParams(downloadOpts)
212365

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package internal
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) {
8+
target := &Target{
9+
File: "locales/<locale_name>.json",
10+
ProjectID: "proj1",
11+
}
12+
localeFile := &LocaleFile{
13+
FileFormat: "json",
14+
Tag: "",
15+
}
16+
17+
opts, err := target.buildDownloadOpts(localeFile)
18+
if err != nil {
19+
t.Fatalf("unexpected error: %v", err)
20+
}
21+
if opts.FileFormat.Value() != "json" {
22+
t.Errorf("expected file format 'json', got %q", opts.FileFormat.Value())
23+
}
24+
}
25+
26+
func TestBuildDownloadOpts_TagHandling(t *testing.T) {
27+
target := &Target{
28+
File: "locales/<locale_name>/<tag>.json",
29+
ProjectID: "proj1",
30+
}
31+
localeFile := &LocaleFile{
32+
FileFormat: "json",
33+
Tag: "web",
34+
}
35+
36+
opts, err := target.buildDownloadOpts(localeFile)
37+
if err != nil {
38+
t.Fatalf("unexpected error: %v", err)
39+
}
40+
if opts.Tags.Value() != "web" {
41+
t.Errorf("expected tags 'web', got %q", opts.Tags.Value())
42+
}
43+
if opts.Tag.Value() != "" {
44+
t.Errorf("expected tag to be empty string, got %q", opts.Tag.Value())
45+
}
46+
}

clients/cli/cmd/pull.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func initPull() {
2222
UseLocalBranchName: params.GetBool("use-local-branch-name"),
2323
Async: params.GetBool("async"),
2424
Cache: params.GetBool("cache"),
25+
Parallel: params.GetBool("parallel"),
2526
}
2627
err := cmdPull.Run(Config)
2728
if err != nil {
@@ -35,5 +36,6 @@ func initPull() {
3536
AddFlag(pullCmd, "bool", "use-local-branch-name", "", "use local branch name", false)
3637
AddFlag(pullCmd, "bool", "async", "a", "use asynchronous locale downloads (recommended for large number of keys)", false)
3738
AddFlag(pullCmd, "bool", "cache", "", "cache ETags locally to skip unchanged downloads (sync mode only)", false)
39+
AddFlag(pullCmd, "bool", "parallel", "p", "download locale files in parallel (max 4 concurrent requests)", false)
3840
params.BindPFlags(pullCmd.Flags())
3941
}

clients/cli/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ require (
3737
github.com/pelletier/go-toml v1.2.0 // indirect
3838
github.com/spf13/jwalterweatherman v1.1.0 // indirect
3939
github.com/stretchr/testify v1.9.0 // indirect
40+
golang.org/x/sync v0.12.0
4041
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
4142
)

clients/cli/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
372372
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
373373
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
374374
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
375+
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
376+
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
375377
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
376378
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
377379
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

0 commit comments

Comments
 (0)