Skip to content

Commit 704c3ac

Browse files
authored
Merge pull request #235 from deploymenttheory/dev-dw
testing retries
2 parents 778ea08 + 076b699 commit 704c3ac

File tree

2 files changed

+127
-24
lines changed

2 files changed

+127
-24
lines changed
Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,17 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
9797
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
9898
defer cancel()
9999

100-
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
101-
if err != nil {
100+
var body io.Reader
101+
var contentType string
102+
103+
// Create multipart body in a function to ensure it runs again on retry
104+
createBody := func() error {
105+
var err error
106+
body, contentType, err = createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
107+
return err
108+
}
109+
110+
if err := createBody(); err != nil {
102111
log.Error("Failed to create streaming multipart request body", zap.Error(err))
103112
return nil, err
104113
}
@@ -125,6 +134,20 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
125134
for attempt := 1; attempt <= maxRetries; attempt++ {
126135
startTime := time.Now()
127136

137+
// Create a new request for each retry
138+
if attempt > 1 {
139+
if err := createBody(); err != nil {
140+
log.Error("Failed to recreate streaming multipart request body", zap.Error(err))
141+
return nil, err
142+
}
143+
req, err = http.NewRequestWithContext(ctx, method, url, body)
144+
if err != nil {
145+
log.Error("Failed to create HTTP request on retry", zap.Error(err))
146+
return nil, err
147+
}
148+
req.Header.Set("Content-Type", contentType)
149+
}
150+
128151
resp, requestErr = c.httpClient.Do(req)
129152
duration := time.Since(startTime)
130153

httpclient/multipartrequest.go.back

Lines changed: 102 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/textproto"
1111
"os"
1212
"path/filepath"
13+
"sync"
1314
"time"
1415

1516
"github.com/deploymenttheory/go-api-http-client/authenticationhandler"
@@ -20,6 +21,13 @@ import (
2021
"go.uber.org/zap"
2122
)
2223

24+
// UploadState represents the state of an upload operation, including the last uploaded byte.
25+
// This struct is used to track the progress of file uploads for resumable uploads and to resume uploads from the last uploaded byte.
26+
type UploadState struct {
27+
LastUploadedByte int64
28+
sync.Mutex
29+
}
30+
2331
// DoMultiPartRequest creates and executes a multipart/form-data HTTP request for file uploads and form fields.
2432
// This function handles constructing the multipart request body, setting the necessary headers, and executing the request.
2533
// It supports custom content types and headers for each part of the multipart request, and handles authentication and
@@ -83,18 +91,18 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
8391

8492
log.Info("Executing multipart file upload request", zap.String("method", method), zap.String("endpoint", endpoint))
8593

86-
// Call the helper function to create a streaming multipart request body
87-
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
88-
if err != nil {
89-
return nil, err
90-
}
91-
9294
url := c.APIHandler.ConstructAPIResourceEndpoint(endpoint, log)
9395

94-
// Create a context with timeout
96+
// Create a context with timeout based on the custom timeout duration
9597
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
9698
defer cancel()
9799

100+
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
101+
if err != nil {
102+
log.Error("Failed to create streaming multipart request body", zap.Error(err))
103+
return nil, err
104+
}
105+
98106
req, err := http.NewRequestWithContext(ctx, method, url, body)
99107
if err != nil {
100108
log.Error("Failed to create HTTP request", zap.Error(err))
@@ -109,26 +117,68 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
109117
headerHandler.SetRequestHeaders(endpoint)
110118
headerHandler.LogHeaders(c.clientConfig.ClientOptions.Logging.HideSensitiveData)
111119

112-
startTime := time.Now()
120+
var resp *http.Response
121+
var requestErr error
113122

114-
resp, err := c.httpClient.Do(req)
115-
if err != nil {
116-
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(err))
117-
return nil, err
118-
}
123+
// Retry logic
124+
maxRetries := 3
125+
for attempt := 1; attempt <= maxRetries; attempt++ {
126+
startTime := time.Now()
127+
128+
resp, requestErr = c.httpClient.Do(req)
129+
duration := time.Since(startTime)
119130

120-
duration := time.Since(startTime)
121-
log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))
131+
if requestErr != nil {
132+
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(requestErr))
133+
if attempt < maxRetries {
134+
log.Info("Retrying request", zap.Int("attempt", attempt))
135+
time.Sleep(2 * time.Second)
136+
continue
137+
}
138+
return nil, requestErr
139+
}
140+
141+
log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))
142+
143+
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
144+
return resp, response.HandleAPISuccessResponse(resp, out, log)
145+
}
122146

123-
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
124-
return resp, response.HandleAPISuccessResponse(resp, out, log)
147+
// If status code indicates a server error, retry
148+
if resp.StatusCode >= 500 && attempt < maxRetries {
149+
log.Info("Retrying request due to server error", zap.Int("status_code", resp.StatusCode), zap.Int("attempt", attempt))
150+
time.Sleep(2 * time.Second)
151+
continue
152+
}
153+
154+
return resp, response.HandleAPIErrorResponse(resp, log)
125155
}
126156

127-
return resp, response.HandleAPIErrorResponse(resp, log)
157+
return resp, requestErr
128158
}
129159

130160
// createStreamingMultipartRequestBody creates a streaming multipart request body with the provided files and form fields.
131161
// This function constructs the body of a multipart/form-data request using an io.Pipe, allowing the request to be sent in chunks.
162+
// It supports custom content types and headers for each part of the multipart request, and logs the process for debugging
163+
// and monitoring purposes.
164+
165+
// Parameters:
166+
// - files: A map where the key is the field name and the value is a slice of file paths to be included in the request.
167+
// Each file path corresponds to a file that will be included in the multipart request.
168+
// - formDataFields: A map of additional form fields to be included in the multipart request, where the key is the field name
169+
// and the value is the field value. These are regular form fields that accompany the file uploads.
170+
// - fileContentTypes: A map specifying the content type for each file part. The key is the field name and the value is the
171+
// content type (e.g., "image/jpeg").
172+
// - formDataPartHeaders: A map specifying custom headers for each part of the multipart form data. The key is the field name
173+
// and the value is an http.Header containing the headers for that part.
174+
// - log: An instance of a logger implementing the logger.Logger interface, used to log informational messages, warnings,
175+
// and errors encountered during the construction of the multipart request body.
176+
177+
// Returns:
178+
// - io.Reader: The constructed multipart request body reader. This reader streams the multipart form data payload ready to be sent.
179+
// - string: The content type of the multipart request body. This includes the boundary string used by the multipart writer.
180+
// - error: An error object indicating failure during the construction of the multipart request body. This could be due to issues
181+
// such as file reading errors or multipart writer errors.
132182
func createStreamingMultipartRequestBody(files map[string][]string, formDataFields map[string]string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) (io.Reader, string, error) {
133183
pr, pw := io.Pipe()
134184
writer := multipart.NewWriter(pw)
@@ -216,7 +266,8 @@ func addFilePart(writer *multipart.Writer, fieldName, filePath string, fileConte
216266
}
217267

218268
progressLogger := logUploadProgress(file, fileSize.Size(), log)
219-
if err := chunkFileUpload(file, encoder, log, progressLogger); err != nil {
269+
uploadState := &UploadState{}
270+
if err := chunkFileUpload(file, encoder, log, progressLogger, uploadState); err != nil {
220271
log.Error("Failed to copy file content", zap.String("filePath", filePath), zap.Error(err))
221272
return err
222273
}
@@ -278,25 +329,43 @@ func setFormDataPartHeader(fieldname, filename, contentType string, customHeader
278329

279330
// chunkFileUpload reads the file upload into chunks and writes it to the writer.
280331
// This function reads the file in chunks and writes it to the provided writer, allowing for progress logging during the upload.
281-
// chunk size is set to 1024 KB (1 MB) by default.
332+
// The chunk size is set to 8192 KB (8 MB) by default. This is a common chunk size used for file uploads to cloud storage services.
333+
334+
// Azure Blob Storage has a minimum chunk size of 4 MB and a maximum of 100 MB for block blobs.
335+
// GCP Cloud Storage has a minimum chunk size of 256 KB and a maximum of 5 GB.
336+
// AWS S3 has a minimum chunk size of 5 MB and a maximum of 5 GB.
337+
338+
// The function also calculates the total number of chunks and logs the chunk number during the upload process.
282339

283340
// Parameters:
284341
// - file: The file to be uploaded.
285342
// - writer: The writer to which the file content will be written.
286343
// - log: An instance of a logger implementing the logger.Logger interface, used to log informational messages, warnings,
287344
// and errors encountered during the file upload.
288345
// - updateProgress: A function to update the upload progress, typically used for logging purposes.
346+
// - uploadState: A pointer to an UploadState struct used to track the progress of the file upload for resumable uploads.
289347

290348
// Returns:
291349
// - error: An error object indicating failure during the file upload. This could be due to issues such as file reading errors
292350
// or writer errors.
293-
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64)) error {
294-
const chunkSize = 1024 * 1024 // 1024 bytes * 1024 (1 MB)
351+
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64), uploadState *UploadState) error {
352+
const chunkSize = 8 * 1024 * 1024 // 8 MB
295353
buffer := make([]byte, chunkSize)
296354
totalWritten := int64(0)
297355
chunkWritten := int64(0)
298356
fileName := filepath.Base(file.Name())
299357

358+
// Seek to the last uploaded byte
359+
file.Seek(uploadState.LastUploadedByte, io.SeekStart)
360+
361+
// Calculate the total number of chunks
362+
fileInfo, err := file.Stat()
363+
if err != nil {
364+
return fmt.Errorf("failed to get file info: %v", err)
365+
}
366+
totalChunks := (fileInfo.Size() + chunkSize - 1) / chunkSize
367+
currentChunk := uploadState.LastUploadedByte / chunkSize
368+
300369
for {
301370
n, err := file.Read(buffer)
302371
if err != nil && err != io.EOF {
@@ -308,6 +377,10 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP
308377

309378
written, err := writer.Write(buffer[:n])
310379
if err != nil {
380+
// Save the state before returning the error
381+
uploadState.Lock()
382+
uploadState.LastUploadedByte += totalWritten
383+
uploadState.Unlock()
311384
return err
312385
}
313386

@@ -316,8 +389,11 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP
316389
updateProgress(int64(written))
317390

318391
if chunkWritten >= chunkSize {
392+
currentChunk++
319393
log.Debug("File Upload Chunk Sent",
320394
zap.String("file_name", fileName),
395+
zap.Int64("chunk_number", currentChunk),
396+
zap.Int64("total_chunks", totalChunks),
321397
zap.Int64("kb_sent", chunkWritten/1024),
322398
zap.Int64("total_kb_sent", totalWritten/1024))
323399
chunkWritten = 0
@@ -326,8 +402,11 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP
326402

327403
// Log any remaining bytes that were written but didn't reach the log threshold
328404
if chunkWritten > 0 {
405+
currentChunk++
329406
log.Debug("Final Upload Chunk Sent",
330407
zap.String("file_name", fileName),
408+
zap.Int64("chunk_number", currentChunk),
409+
zap.Int64("total_chunks", totalChunks),
331410
zap.Int64("kb_sent", chunkWritten/1024),
332411
zap.Int64("total_kb_sent", totalWritten/1024))
333412
}
@@ -346,6 +425,7 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP
346425

347426
// Returns:
348427
// - func(int64): A function that takes the number of bytes written as an argument and logs the upload progress.
428+
// logUploadProgress logs the upload progress based on the percentage of the total file size.
349429
func logUploadProgress(file *os.File, fileSize int64, log logger.Logger) func(int64) {
350430
var uploaded int64 = 0
351431
const logInterval = 5 // Log every 5% increment

0 commit comments

Comments
 (0)