Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 154 additions & 1 deletion clients/cli/cmd/internal/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"

"github.com/phrase/phrase-cli/cmd/internal/paths"
Expand All @@ -19,6 +20,7 @@ import (

"github.com/antihax/optional"
"github.com/phrase/phrase-go/v4"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -27,6 +29,8 @@ const (
asyncRetryCount = 360 // 30 minutes
)

const maxParallelDownloads = 4 // Phrase API allows max 4 concurrent requests

var (
Config *phrase.Config
errNotModified = errors.New("not modified")
Expand All @@ -38,6 +42,7 @@ type PullCommand struct {
UseLocalBranchName bool
Async bool
Cache bool
Parallel bool
}

var Auth context.Context
Expand Down Expand Up @@ -101,7 +106,15 @@ func (cmd *PullCommand) Run(config *phrase.Config) error {
}

for _, target := range targets {
err := target.Pull(client, cmd.Async, cache)
var err error
if cmd.Parallel && !cmd.Async {
err = target.PullParallel(client)
} else {
if cmd.Parallel && cmd.Async {
print.Warn("--parallel is not supported with --async, ignoring parallel")
}
err = target.Pull(client, cmd.Async, cache)
}
if err != nil {
return err
}
Expand Down Expand Up @@ -167,6 +180,12 @@ func (target *Target) Pull(client *phrase.APIClient, async bool, cache *Download
return nil
}

type downloadResult struct {
message string
path string
errMsg string
}

func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool, cache *DownloadCache) error {
localVarOptionals := phrase.LocaleDownloadOpts{}

Expand Down Expand Up @@ -207,6 +226,140 @@ func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFil
return target.downloadSynchronously(client, localeFile, localVarOptionals, cache)
}

func (target *Target) PullParallel(client *phrase.APIClient) error {
if err := target.CheckPreconditions(); err != nil {
return err
}

localeFiles, err := target.LocaleFiles()
if err != nil {
return err
}

// Ensure all destination files/dirs exist before parallel downloads
for _, lf := range localeFiles {
if err := createFile(lf.Path); err != nil {
return err
}
}

results := make([]downloadResult, len(localeFiles))
var rateMu sync.RWMutex

ctx, cancel := context.WithTimeout(context.Background(), timeoutInMinutes)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(maxParallelDownloads)

for i, lf := range localeFiles {
g.Go(func() error {
if ctx.Err() != nil {
return ctx.Err()
}

opts, err := target.buildDownloadOpts(lf)
if err != nil {
err = fmt.Errorf("%s for %s", err, lf.Path)
results[i] = downloadResult{errMsg: err.Error()}
return err
}

err = target.downloadWithRateGate(client, lf, opts, &rateMu)
if err != nil {
if openapiError, ok := err.(phrase.GenericOpenAPIError); ok {
print.Warn("API response: %s", openapiError.Body())
}
err = fmt.Errorf("%s for %s", err, lf.Path)
results[i] = downloadResult{errMsg: err.Error()}
return err
}

results[i] = downloadResult{
message: lf.Message(),
path: lf.RelPath(),
}
return nil
})
}

waitErr := g.Wait()

// Print results in original order: successes and failures
var skipCount int
for _, r := range results {
if r.path != "" {
print.Success("Downloaded %s to %s", r.message, r.path)
} else if r.errMsg != "" {
print.Failure("Failed %s", r.errMsg)
} else {
skipCount++
}
}
if skipCount > 0 {
print.Warn("%d download(s) skipped due to earlier failure", skipCount)
}

return waitErr
}

// downloadWithRateGate downloads a locale file with rate-limit coordination.
// Uses RWMutex as a broadcast gate: workers take a read lock (cheap, concurrent),
// and a rate-limited worker takes the write lock to pause everyone until reset.
func (target *Target) downloadWithRateGate(client *phrase.APIClient, localeFile *LocaleFile, opts phrase.LocaleDownloadOpts, gate *sync.RWMutex) error {
// Read-lock gate: blocks only when a writer (rate-limited worker) holds it
gate.RLock()
gate.RUnlock()

file, response, err := client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
if err != nil {
if response != nil && response.Rate.Remaining == 0 {
// TryLock ensures only one worker handles the rate limit pause.
// Others will block on their next RLock until the pause is over.
if gate.TryLock() {
waitForRateLimit(response.Rate)
gate.Unlock()
} else {
// Another worker is already pausing; wait for it
gate.RLock()
gate.RUnlock()
}

file, _, err = client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
if err != nil {
return err
}
} else {
return err
}
}
return copyToDestination(file, localeFile.Path)
}

// buildDownloadOpts prepares the LocaleDownloadOpts for a locale file download.
func (target *Target) buildDownloadOpts(localeFile *LocaleFile) (phrase.LocaleDownloadOpts, error) {
opts := phrase.LocaleDownloadOpts{}

if target.Params != nil {
opts = target.Params.LocaleDownloadOpts
translationKeyPrefix, err := placeholders.ResolveTranslationKeyPrefix(target.Params.TranslationKeyPrefix, localeFile.Path)
if err != nil {
return opts, err
}
opts.TranslationKeyPrefix = translationKeyPrefix
}

if opts.FileFormat.Value() == "" {
opts.FileFormat = optional.NewString(localeFile.FileFormat)
}

if localeFile.Tag != "" {
opts.Tags = optional.NewString(localeFile.Tag)
opts.Tag = optional.EmptyString()
}

return opts, nil
}

func (target *Target) downloadAsynchronously(client *phrase.APIClient, localeFile *LocaleFile, downloadOpts phrase.LocaleDownloadOpts) error {
localeDownloadCreateParams := asyncDownloadParams(downloadOpts)

Expand Down
46 changes: 46 additions & 0 deletions clients/cli/cmd/internal/pull_parallel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package internal

import (
"testing"
)

func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) {
target := &Target{
File: "locales/<locale_name>.json",
ProjectID: "proj1",
}
localeFile := &LocaleFile{
FileFormat: "json",
Tag: "",
}

opts, err := target.buildDownloadOpts(localeFile)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if opts.FileFormat.Value() != "json" {
t.Errorf("expected file format 'json', got %q", opts.FileFormat.Value())
}
}

func TestBuildDownloadOpts_TagHandling(t *testing.T) {
target := &Target{
File: "locales/<locale_name>/<tag>.json",
ProjectID: "proj1",
}
localeFile := &LocaleFile{
FileFormat: "json",
Tag: "web",
}

opts, err := target.buildDownloadOpts(localeFile)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if opts.Tags.Value() != "web" {
t.Errorf("expected tags 'web', got %q", opts.Tags.Value())
}
if opts.Tag.Value() != "" {
t.Errorf("expected tag to be empty string, got %q", opts.Tag.Value())
}
}
2 changes: 2 additions & 0 deletions clients/cli/cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func initPull() {
UseLocalBranchName: params.GetBool("use-local-branch-name"),
Async: params.GetBool("async"),
Cache: params.GetBool("cache"),
Parallel: params.GetBool("parallel"),
}
err := cmdPull.Run(Config)
if err != nil {
Expand All @@ -35,5 +36,6 @@ func initPull() {
AddFlag(pullCmd, "bool", "use-local-branch-name", "", "use local branch name", false)
AddFlag(pullCmd, "bool", "async", "a", "use asynchronous locale downloads (recommended for large number of keys)", false)
AddFlag(pullCmd, "bool", "cache", "", "cache ETags locally to skip unchanged downloads (sync mode only)", false)
AddFlag(pullCmd, "bool", "parallel", "p", "download locale files in parallel (max 4 concurrent requests)", false)
params.BindPFlags(pullCmd.Flags())
}
1 change: 1 addition & 0 deletions clients/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ require (
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/sync v0.12.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)
2 changes: 2 additions & 0 deletions clients/cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
Loading