@@ -30,6 +30,7 @@ limitations under the License.
30
30
#include " tensorflow/core/lib/strings/numbers.h"
31
31
#include " tensorflow/core/lib/strings/str_util.h"
32
32
#include " tensorflow/core/platform/cloud/google_auth_provider.h"
33
+ #include " tensorflow/core/platform/cloud/retrying_utils.h"
33
34
#include " tensorflow/core/platform/cloud/time_util.h"
34
35
#include " tensorflow/core/platform/env.h"
35
36
#include " tensorflow/core/platform/mutex.h"
@@ -240,7 +241,11 @@ class GcsRandomAccessFile : public RandomAccessFile {
240
241
buffer_.reserve (desired_buffer_size);
241
242
}
242
243
244
+ // Shift the offset and clear the buffer so that the state stays
245
+ // consistent if loading from GCS fails.
243
246
buffer_start_offset_ = offset;
247
+ buffer_.clear ();
248
+
244
249
TF_RETURN_IF_ERROR (LoadBufferFromGCS ());
245
250
246
251
// Set the results.
@@ -302,13 +307,13 @@ class GcsWritableFile : public WritableFile {
302
307
GcsWritableFile (const string& bucket, const string& object,
303
308
AuthProvider* auth_provider,
304
309
HttpRequest::Factory* http_request_factory,
305
- int32 max_upload_attempts )
310
+ int64 initial_retry_delay_usec )
306
311
: bucket_(bucket),
307
312
object_ (object),
308
313
auth_provider_(auth_provider),
309
314
http_request_factory_(http_request_factory),
310
315
sync_needed_(true ),
311
- max_upload_attempts_(max_upload_attempts ) {
316
+ initial_retry_delay_usec_(initial_retry_delay_usec ) {
312
317
if (GetTmpFilename (&tmp_content_filename_).ok ()) {
313
318
outfile_.open (tmp_content_filename_,
314
319
std::ofstream::binary | std::ofstream::app);
@@ -324,13 +329,13 @@ class GcsWritableFile : public WritableFile {
324
329
AuthProvider* auth_provider,
325
330
const string& tmp_content_filename,
326
331
HttpRequest::Factory* http_request_factory,
327
- int32 max_upload_attempts )
332
+ int64 initial_retry_delay_usec )
328
333
: bucket_(bucket),
329
334
object_(object),
330
335
auth_provider_(auth_provider),
331
336
http_request_factory_(http_request_factory),
332
337
sync_needed_(true ),
333
- max_upload_attempts_(max_upload_attempts ) {
338
+ initial_retry_delay_usec_(initial_retry_delay_usec ) {
334
339
tmp_content_filename_ = tmp_content_filename;
335
340
outfile_.open (tmp_content_filename_,
336
341
std::ofstream::binary | std::ofstream::app);
@@ -388,41 +393,40 @@ class GcsWritableFile : public WritableFile {
388
393
string session_uri;
389
394
TF_RETURN_IF_ERROR (CreateNewUploadSession (&session_uri));
390
395
uint64 already_uploaded = 0 ;
391
- for (int attempt = 0 ; attempt < max_upload_attempts_; attempt++) {
392
- if (attempt > 0 ) {
393
- bool completed;
394
- TF_RETURN_IF_ERROR (RequestUploadSessionStatus (session_uri, &completed,
395
- &already_uploaded));
396
- if (completed) {
397
- // It's unclear why UploadToSession didn't return OK in the previous
398
- // attempt, but GCS reports that the file is fully uploaded,
399
- // so succeed.
400
- return Status::OK ();
401
- }
402
- }
403
- const Status upload_status =
404
- UploadToSession (session_uri, already_uploaded);
405
- if (upload_status.ok ()) {
396
+ bool first_attempt = true ;
397
+ const Status upload_status = RetryingUtils::CallWithRetries (
398
+ [&first_attempt, &already_uploaded, &session_uri, this ]() {
399
+ if (!first_attempt) {
400
+ bool completed;
401
+ TF_RETURN_IF_ERROR (RequestUploadSessionStatus (
402
+ session_uri, &completed, &already_uploaded));
403
+ if (completed) {
404
+ // It's unclear why UploadToSession didn't return OK in the
405
+ // previous attempt, but GCS reports that the file is fully
406
+ // uploaded, so succeed.
407
+ return Status::OK ();
408
+ }
409
+ }
410
+ first_attempt = false ;
411
+ return UploadToSession (session_uri, already_uploaded);
412
+ },
413
+ initial_retry_delay_usec_);
414
+ switch (upload_status.code ()) {
415
+ case errors::Code::OK:
406
416
return Status::OK ();
407
- }
408
- switch (upload_status.code ()) {
409
- case errors::Code::NOT_FOUND:
410
- // GCS docs recommend retrying the whole upload. We're relying on the
411
- // RetryingFileSystem to retry the Sync() call.
412
- return errors::Unavailable (" Could not upload gs://" , bucket_, " /" ,
413
- object_);
414
- case errors::Code::UNAVAILABLE:
415
- // The upload can be resumed, but GCS docs recommend an exponential
416
- // back-off.
417
- Env::Default ()->SleepForMicroseconds (kUploadRetryDelayMicros
418
- << attempt);
419
- break ;
420
- default :
421
- // Something unexpected happen, fail.
422
- return upload_status;
423
- }
417
+ case errors::Code::NOT_FOUND:
418
+ // GCS docs recommend retrying the whole upload. We're relying on the
419
+ // RetryingFileSystem to retry the Sync() call.
420
+ return errors::Unavailable (" Could not upload gs://" , bucket_, " /" ,
421
+ object_);
422
+ case errors::Code::UNAVAILABLE:
423
+ // Return ABORTED so that RetryingFileSystem doesn't retry again.
424
+ return errors::Aborted (" Upload gs://" , bucket_, " /" , object_,
425
+ " failed." );
426
+ default :
427
+ // Something unexpected happen, fail.
428
+ return upload_status;
424
429
}
425
- return errors::Aborted (" Upload gs://" , bucket_, " /" , object_, " failed." );
426
430
}
427
431
428
432
Status CheckWritable () const {
@@ -571,7 +575,7 @@ class GcsWritableFile : public WritableFile {
571
575
std::ofstream outfile_;
572
576
HttpRequest::Factory* http_request_factory_;
573
577
bool sync_needed_; // whether there is buffered data that needs to be synced
574
- int32 max_upload_attempts_ ;
578
+ int64 initial_retry_delay_usec_ ;
575
579
};
576
580
577
581
class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
@@ -594,11 +598,11 @@ GcsFileSystem::GcsFileSystem()
594
598
GcsFileSystem::GcsFileSystem (
595
599
std::unique_ptr<AuthProvider> auth_provider,
596
600
std::unique_ptr<HttpRequest::Factory> http_request_factory,
597
- size_t read_ahead_bytes, int32 max_upload_attempts )
601
+ size_t read_ahead_bytes, int64 initial_retry_delay_usec )
598
602
: auth_provider_(std::move(auth_provider)),
599
603
http_request_factory_(std::move(http_request_factory)),
600
604
read_ahead_bytes_(read_ahead_bytes),
601
- max_upload_attempts_(max_upload_attempts ) {}
605
+ initial_retry_delay_usec_(initial_retry_delay_usec ) {}
602
606
603
607
Status GcsFileSystem::NewRandomAccessFile (
604
608
const string& fname, std::unique_ptr<RandomAccessFile>* result) {
@@ -616,7 +620,7 @@ Status GcsFileSystem::NewWritableFile(const string& fname,
616
620
TF_RETURN_IF_ERROR (ParseGcsPath (fname, false , &bucket, &object));
617
621
result->reset (new GcsWritableFile (bucket, object, auth_provider_.get (),
618
622
http_request_factory_.get (),
619
- max_upload_attempts_ ));
623
+ initial_retry_delay_usec_ ));
620
624
return Status::OK ();
621
625
}
622
626
@@ -656,7 +660,7 @@ Status GcsFileSystem::NewAppendableFile(const string& fname,
656
660
TF_RETURN_IF_ERROR (ParseGcsPath (fname, false , &bucket, &object));
657
661
result->reset (new GcsWritableFile (
658
662
bucket, object, auth_provider_.get (), old_content_filename,
659
- http_request_factory_.get (), max_upload_attempts_ ));
663
+ http_request_factory_.get (), initial_retry_delay_usec_ ));
660
664
return Status::OK ();
661
665
}
662
666
0 commit comments