@@ -430,77 +430,54 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
430
430
# pool underlying the training generation doesn't starve other processes.
431
431
num_workers = int (multiprocessing .cpu_count () * 0.75 ) or 1
432
432
433
+ flags_ = {
434
+ "data_dir" : data_dir ,
435
+ "cache_id" : ncf_dataset .cache_paths .cache_id ,
436
+ "num_neg" : num_neg ,
437
+ "num_train_positives" : ncf_dataset .num_train_positives ,
438
+ "num_items" : ncf_dataset .num_items ,
439
+ "num_readers" : ncf_dataset .num_data_readers ,
440
+ "epochs_per_cycle" : epochs_per_cycle ,
441
+ "train_batch_size" : batch_size ,
442
+ "eval_batch_size" : eval_batch_size ,
443
+ "num_workers" : num_workers ,
444
+ # This allows the training input function to guarantee batch size and
445
+ # significantly improves performance. (~5% increase in examples/sec on
446
+ # GPU, and needed for TPU XLA.)
447
+ "spillover" : True ,
448
+ "redirect_logs" : use_subprocess ,
449
+ "use_tf_logging" : not use_subprocess ,
450
+ }
451
+ if ncf_dataset .deterministic :
452
+ flags_ ["seed" ] = stat_utils .random_int32 ()
453
+ # We write to a temp file then atomically rename it to the final file,
454
+ # because writing directly to the final file can cause the data generation
455
+ # async process to read a partially written JSON file.
456
+ flagfile_temp = os .path .join (flags .FLAGS .data_dir , rconst .FLAGFILE_TEMP )
457
+ tf .logging .info ("Preparing flagfile for async data generation in {} ..."
458
+ .format (flagfile_temp ))
459
+ with tf .gfile .Open (flagfile_temp , "w" ) as f :
460
+ for k , v in six .iteritems (flags_ ):
461
+ f .write ("--{}={}\n " .format (k , v ))
462
+ flagfile = os .path .join (data_dir , rconst .FLAGFILE )
463
+ tf .gfile .Rename (flagfile_temp , flagfile )
464
+ tf .logging .info (
465
+ "Wrote flagfile for async data generation in {}."
466
+ .format (flagfile ))
467
+
433
468
if use_subprocess :
434
469
tf .logging .info ("Creating training file subprocess." )
435
-
436
470
subproc_env = os .environ .copy ()
437
-
438
471
# The subprocess uses TensorFlow for tf.gfile, but it does not need GPU
439
472
# resources and by default will try to allocate GPU memory. This would cause
440
473
# contention with the main training process.
441
474
subproc_env ["CUDA_VISIBLE_DEVICES" ] = ""
442
-
443
475
subproc_args = popen_helper .INVOCATION + [
444
- "--data_dir" , data_dir ,
445
- "--cache_id" , str (ncf_dataset .cache_paths .cache_id ),
446
- "--num_neg" , str (num_neg ),
447
- "--num_train_positives" , str (ncf_dataset .num_train_positives ),
448
- "--num_items" , str (ncf_dataset .num_items ),
449
- "--num_readers" , str (ncf_dataset .num_data_readers ),
450
- "--epochs_per_cycle" , str (epochs_per_cycle ),
451
- "--train_batch_size" , str (batch_size ),
452
- "--eval_batch_size" , str (eval_batch_size ),
453
- "--num_workers" , str (num_workers ),
454
- # This allows the training input function to guarantee batch size and
455
- # significantly improves performance. (~5% increase in examples/sec on
456
- # GPU, and needed for TPU XLA.)
457
- "--spillover" , "True" ,
458
- "--redirect_logs" , "True"
459
- ]
460
- if ncf_dataset .deterministic :
461
- subproc_args .extend (["--seed" , str (int (stat_utils .random_int32 ()))])
462
-
476
+ "--data_dir" , data_dir ]
463
477
tf .logging .info (
464
478
"Generation subprocess command: {}" .format (" " .join (subproc_args )))
465
-
466
479
proc = subprocess .Popen (args = subproc_args , shell = False , env = subproc_env )
467
480
468
- else :
469
- # We write to a temp file then atomically rename it to the final file,
470
- # because writing directly to the final file can cause the data generation
471
- # async process to read a partially written JSON file.
472
- command_file_temp = os .path .join (data_dir , rconst .COMMAND_FILE_TEMP )
473
- tf .logging .info ("Generation subprocess command at {} ..."
474
- .format (command_file_temp ))
475
- with tf .gfile .Open (command_file_temp , "w" ) as f :
476
- command = {
477
- "data_dir" : data_dir ,
478
- "cache_id" : ncf_dataset .cache_paths .cache_id ,
479
- "num_neg" : num_neg ,
480
- "num_train_positives" : ncf_dataset .num_train_positives ,
481
- "num_items" : ncf_dataset .num_items ,
482
- "num_readers" : ncf_dataset .num_data_readers ,
483
- "epochs_per_cycle" : epochs_per_cycle ,
484
- "train_batch_size" : batch_size ,
485
- "eval_batch_size" : eval_batch_size ,
486
- "num_workers" : num_workers ,
487
- # This allows the training input function to guarantee batch size and
488
- # significantly improves performance. (~5% increase in examples/sec on
489
- # GPU, and needed for TPU XLA.)
490
- "spillover" : True ,
491
- "redirect_logs" : False
492
- }
493
- if ncf_dataset .deterministic :
494
- command ["seed" ] = stat_utils .random_int32 ()
495
-
496
- json .dump (command , f )
497
- command_file = os .path .join (data_dir , rconst .COMMAND_FILE )
498
- tf .gfile .Rename (command_file_temp , command_file )
499
-
500
- tf .logging .info (
501
- "Generation subprocess command saved to: {}"
502
- .format (command_file ))
503
-
504
481
cleanup_called = {"finished" : False }
505
482
@atexit .register
506
483
def cleanup ():
0 commit comments