Skip to content

Commit 26f976d

Browse files
author
av8ramit
authored
Merge pull request tensorflow#8065 from jhseu/r1.0_cherrypicks
1.0 cherrypicks
2 parents 29a6b46 + 2924e92 commit 26f976d

19 files changed

+770
-489
lines changed

tensorflow/core/platform/cloud/BUILD

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ cc_library(
3838
":google_auth_provider",
3939
":http_request",
4040
":retrying_file_system",
41+
":retrying_utils",
4142
":time_util",
4243
"//tensorflow/core:framework_headers_lib",
4344
"//tensorflow/core:lib_internal",
@@ -87,6 +88,7 @@ cc_library(
8788
deps = [
8889
":http_request",
8990
":oauth_client",
91+
":retrying_utils",
9092
"//tensorflow/core:lib",
9193
"//tensorflow/core:lib_internal",
9294
"@jsoncpp_git//:jsoncpp",
@@ -110,6 +112,20 @@ cc_library(
110112
],
111113
)
112114

115+
cc_library(
116+
name = "retrying_utils",
117+
srcs = [
118+
"retrying_utils.cc",
119+
],
120+
hdrs = [
121+
"retrying_utils.h",
122+
],
123+
deps = [
124+
"//tensorflow/core:framework_headers_lib",
125+
"//tensorflow/core:lib_internal",
126+
],
127+
)
128+
113129
cc_library(
114130
name = "retrying_file_system",
115131
srcs = [
@@ -119,6 +135,7 @@ cc_library(
119135
"retrying_file_system.h",
120136
],
121137
deps = [
138+
":retrying_utils",
122139
"//tensorflow/core:framework_headers_lib",
123140
"//tensorflow/core:lib_internal",
124141
],
@@ -223,3 +240,16 @@ tf_cc_test(
223240
"//tensorflow/core:test_main",
224241
],
225242
)
243+
244+
tf_cc_test(
245+
name = "retrying_utils_test",
246+
size = "small",
247+
srcs = ["retrying_utils_test.cc"],
248+
deps = [
249+
":retrying_utils",
250+
"//tensorflow/core:lib",
251+
"//tensorflow/core:lib_internal",
252+
"//tensorflow/core:test",
253+
"//tensorflow/core:test_main",
254+
],
255+
)

tensorflow/core/platform/cloud/gcs_file_system.cc

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ limitations under the License.
3030
#include "tensorflow/core/lib/strings/numbers.h"
3131
#include "tensorflow/core/lib/strings/str_util.h"
3232
#include "tensorflow/core/platform/cloud/google_auth_provider.h"
33+
#include "tensorflow/core/platform/cloud/retrying_utils.h"
3334
#include "tensorflow/core/platform/cloud/time_util.h"
3435
#include "tensorflow/core/platform/env.h"
3536
#include "tensorflow/core/platform/mutex.h"
@@ -240,7 +241,11 @@ class GcsRandomAccessFile : public RandomAccessFile {
240241
buffer_.reserve(desired_buffer_size);
241242
}
242243

244+
// Shift the offset and clear the buffer so that the state stays
245+
// consistent if loading from GCS fails.
243246
buffer_start_offset_ = offset;
247+
buffer_.clear();
248+
244249
TF_RETURN_IF_ERROR(LoadBufferFromGCS());
245250

246251
// Set the results.
@@ -302,13 +307,13 @@ class GcsWritableFile : public WritableFile {
302307
GcsWritableFile(const string& bucket, const string& object,
303308
AuthProvider* auth_provider,
304309
HttpRequest::Factory* http_request_factory,
305-
int32 max_upload_attempts)
310+
int64 initial_retry_delay_usec)
306311
: bucket_(bucket),
307312
object_(object),
308313
auth_provider_(auth_provider),
309314
http_request_factory_(http_request_factory),
310315
sync_needed_(true),
311-
max_upload_attempts_(max_upload_attempts) {
316+
initial_retry_delay_usec_(initial_retry_delay_usec) {
312317
if (GetTmpFilename(&tmp_content_filename_).ok()) {
313318
outfile_.open(tmp_content_filename_,
314319
std::ofstream::binary | std::ofstream::app);
@@ -324,13 +329,13 @@ class GcsWritableFile : public WritableFile {
324329
AuthProvider* auth_provider,
325330
const string& tmp_content_filename,
326331
HttpRequest::Factory* http_request_factory,
327-
int32 max_upload_attempts)
332+
int64 initial_retry_delay_usec)
328333
: bucket_(bucket),
329334
object_(object),
330335
auth_provider_(auth_provider),
331336
http_request_factory_(http_request_factory),
332337
sync_needed_(true),
333-
max_upload_attempts_(max_upload_attempts) {
338+
initial_retry_delay_usec_(initial_retry_delay_usec) {
334339
tmp_content_filename_ = tmp_content_filename;
335340
outfile_.open(tmp_content_filename_,
336341
std::ofstream::binary | std::ofstream::app);
@@ -388,41 +393,40 @@ class GcsWritableFile : public WritableFile {
388393
string session_uri;
389394
TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri));
390395
uint64 already_uploaded = 0;
391-
for (int attempt = 0; attempt < max_upload_attempts_; attempt++) {
392-
if (attempt > 0) {
393-
bool completed;
394-
TF_RETURN_IF_ERROR(RequestUploadSessionStatus(session_uri, &completed,
395-
&already_uploaded));
396-
if (completed) {
397-
// It's unclear why UploadToSession didn't return OK in the previous
398-
// attempt, but GCS reports that the file is fully uploaded,
399-
// so succeed.
400-
return Status::OK();
401-
}
402-
}
403-
const Status upload_status =
404-
UploadToSession(session_uri, already_uploaded);
405-
if (upload_status.ok()) {
396+
bool first_attempt = true;
397+
const Status upload_status = RetryingUtils::CallWithRetries(
398+
[&first_attempt, &already_uploaded, &session_uri, this]() {
399+
if (!first_attempt) {
400+
bool completed;
401+
TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
402+
session_uri, &completed, &already_uploaded));
403+
if (completed) {
404+
// It's unclear why UploadToSession didn't return OK in the
405+
// previous attempt, but GCS reports that the file is fully
406+
// uploaded, so succeed.
407+
return Status::OK();
408+
}
409+
}
410+
first_attempt = false;
411+
return UploadToSession(session_uri, already_uploaded);
412+
},
413+
initial_retry_delay_usec_);
414+
switch (upload_status.code()) {
415+
case errors::Code::OK:
406416
return Status::OK();
407-
}
408-
switch (upload_status.code()) {
409-
case errors::Code::NOT_FOUND:
410-
// GCS docs recommend retrying the whole upload. We're relying on the
411-
// RetryingFileSystem to retry the Sync() call.
412-
return errors::Unavailable("Could not upload gs://", bucket_, "/",
413-
object_);
414-
case errors::Code::UNAVAILABLE:
415-
// The upload can be resumed, but GCS docs recommend an exponential
416-
// back-off.
417-
Env::Default()->SleepForMicroseconds(kUploadRetryDelayMicros
418-
<< attempt);
419-
break;
420-
default:
421-
// Something unexpected happen, fail.
422-
return upload_status;
423-
}
417+
case errors::Code::NOT_FOUND:
418+
// GCS docs recommend retrying the whole upload. We're relying on the
419+
// RetryingFileSystem to retry the Sync() call.
420+
return errors::Unavailable("Could not upload gs://", bucket_, "/",
421+
object_);
422+
case errors::Code::UNAVAILABLE:
423+
// Return ABORTED so that RetryingFileSystem doesn't retry again.
424+
return errors::Aborted("Upload gs://", bucket_, "/", object_,
425+
" failed.");
426+
default:
427+
// Something unexpected happen, fail.
428+
return upload_status;
424429
}
425-
return errors::Aborted("Upload gs://", bucket_, "/", object_, " failed.");
426430
}
427431

428432
Status CheckWritable() const {
@@ -571,7 +575,7 @@ class GcsWritableFile : public WritableFile {
571575
std::ofstream outfile_;
572576
HttpRequest::Factory* http_request_factory_;
573577
bool sync_needed_; // whether there is buffered data that needs to be synced
574-
int32 max_upload_attempts_;
578+
int64 initial_retry_delay_usec_;
575579
};
576580

577581
class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
@@ -594,11 +598,11 @@ GcsFileSystem::GcsFileSystem()
594598
GcsFileSystem::GcsFileSystem(
595599
std::unique_ptr<AuthProvider> auth_provider,
596600
std::unique_ptr<HttpRequest::Factory> http_request_factory,
597-
size_t read_ahead_bytes, int32 max_upload_attempts)
601+
size_t read_ahead_bytes, int64 initial_retry_delay_usec)
598602
: auth_provider_(std::move(auth_provider)),
599603
http_request_factory_(std::move(http_request_factory)),
600604
read_ahead_bytes_(read_ahead_bytes),
601-
max_upload_attempts_(max_upload_attempts) {}
605+
initial_retry_delay_usec_(initial_retry_delay_usec) {}
602606

603607
Status GcsFileSystem::NewRandomAccessFile(
604608
const string& fname, std::unique_ptr<RandomAccessFile>* result) {
@@ -616,7 +620,7 @@ Status GcsFileSystem::NewWritableFile(const string& fname,
616620
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
617621
result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(),
618622
http_request_factory_.get(),
619-
max_upload_attempts_));
623+
initial_retry_delay_usec_));
620624
return Status::OK();
621625
}
622626

@@ -656,7 +660,7 @@ Status GcsFileSystem::NewAppendableFile(const string& fname,
656660
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
657661
result->reset(new GcsWritableFile(
658662
bucket, object, auth_provider_.get(), old_content_filename,
659-
http_request_factory_.get(), max_upload_attempts_));
663+
http_request_factory_.get(), initial_retry_delay_usec_));
660664
return Status::OK();
661665
}
662666

tensorflow/core/platform/cloud/gcs_file_system.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class GcsFileSystem : public FileSystem {
3535
GcsFileSystem();
3636
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
3737
std::unique_ptr<HttpRequest::Factory> http_request_factory,
38-
size_t read_ahead_bytes, int32 max_upload_attempts);
38+
size_t read_ahead_bytes, int64 initial_retry_delay_usec);
3939

4040
Status NewRandomAccessFile(
4141
const string& filename,
@@ -114,9 +114,8 @@ class GcsFileSystem : public FileSystem {
114114
// RandomAccessFile implementation. Defaults to 256Mb.
115115
const size_t read_ahead_bytes_ = 256 * 1024 * 1024;
116116

117-
// The max number of attempts to upload a file to GCS using the resumable
118-
// upload API.
119-
const int32 max_upload_attempts_ = 5;
117+
// The initial delay for exponential backoffs when retrying failed calls.
118+
const int64 initial_retry_delay_usec_ = 1000000L;
120119

121120
TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem);
122121
};

0 commit comments

Comments
 (0)