Skip to content

Commit c494582

Browse files
Taylor Robiereedwm
Taylor Robie
authored andcommitted
Move evaluation to .evaluate() (tensorflow#5413)
* move evaluation from numpy to tensorflow fix syntax error don't use sigmoid to convert logits. there is too much precision loss. WIP: add logit metrics continue refactor of NCF evaluation fix syntax error fix bugs in eval loss calculation fix eval loss reweighting remove numpy based metric calculations fix logging hooks fix sigmoid to softmax bug fix comment catch rare PIPE error and address some PR comments * fix metric test and address PR comments * delint and fix python2 * fix test and address PR comments * extend eval to TPUs
1 parent f3be93a commit c494582

9 files changed

+363
-238
lines changed

official/recommendation/constants.py

+6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ def __init__(self, data_dir, cache_id=None):
5252
# when performing evaluation.
5353
NUM_EVAL_NEGATIVES = 999
5454

55+
# keys for evaluation metrics
56+
TOP_K = 10 # Top-k list for evaluation
57+
HR_KEY = "HR"
58+
NDCG_KEY = "NDCG"
59+
DUPLICATE_MASK = "duplicate_mask"
60+
5561
# ==============================================================================
5662
# == Subprocess Data Generation ================================================
5763
# ==============================================================================

official/recommendation/data_async_generation.py

+17-3
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def _process_shard(args):
124124
return users_out, items_out, labels_out
125125

126126

127-
def _construct_record(users, items, labels=None):
127+
def _construct_record(users, items, labels=None, dupe_mask=None):
128128
"""Convert NumPy arrays into a TFRecords entry."""
129129
feature_dict = {
130130
movielens.USER_COLUMN: tf.train.Feature(
@@ -136,6 +136,10 @@ def _construct_record(users, items, labels=None):
136136
feature_dict["labels"] = tf.train.Feature(
137137
bytes_list=tf.train.BytesList(value=[memoryview(labels).tobytes()]))
138138

139+
if dupe_mask is not None:
140+
feature_dict[rconst.DUPLICATE_MASK] = tf.train.Feature(
141+
bytes_list=tf.train.BytesList(value=[memoryview(dupe_mask).tobytes()]))
142+
139143
return tf.train.Example(
140144
features=tf.train.Features(feature=feature_dict)).SerializeToString()
141145

@@ -305,6 +309,9 @@ def _construct_training_records(
305309
def _construct_eval_record(cache_paths, eval_batch_size):
306310
"""Convert Eval data to a single TFRecords file."""
307311

312+
# Later logic assumes that all items for a given user are in the same batch.
313+
assert not eval_batch_size % (rconst.NUM_EVAL_NEGATIVES + 1)
314+
308315
log_msg("Beginning construction of eval TFRecords file.")
309316
raw_fpath = cache_paths.eval_raw_file
310317
intermediate_fpath = cache_paths.eval_record_template_temp
@@ -332,9 +339,16 @@ def _construct_eval_record(cache_paths, eval_batch_size):
332339
num_batches = users.shape[0]
333340
with tf.python_io.TFRecordWriter(intermediate_fpath) as writer:
334341
for i in range(num_batches):
342+
batch_users = users[i, :]
343+
batch_items = items[i, :]
344+
dupe_mask = stat_utils.mask_duplicates(
345+
batch_items.reshape(-1, rconst.NUM_EVAL_NEGATIVES + 1),
346+
axis=1).flatten().astype(np.int8)
347+
335348
batch_bytes = _construct_record(
336-
users=users[i, :],
337-
items=items[i, :]
349+
users=batch_users,
350+
items=batch_items,
351+
dupe_mask=dupe_mask
338352
)
339353
writer.write(batch_bytes)
340354
tf.gfile.Rename(intermediate_fpath, dest_fpath)

official/recommendation/data_preprocessing.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import os
2828
import pickle
2929
import signal
30+
import socket
3031
import subprocess
3132
import time
3233
import timeit
@@ -399,10 +400,14 @@ def _shutdown(proc):
399400
"""Convenience function to cleanly shut down async generation process."""
400401

401402
tf.logging.info("Shutting down train data creation subprocess.")
402-
proc.send_signal(signal.SIGINT)
403-
time.sleep(1)
404-
if proc.returncode is not None:
405-
return # SIGINT was handled successfully within 1 sec
403+
try:
404+
proc.send_signal(signal.SIGINT)
405+
time.sleep(1)
406+
if proc.returncode is not None:
407+
return # SIGINT was handled successfully within 1 sec
408+
409+
except socket.error:
410+
pass
406411

407412
# Otherwise another second of grace period and then forcibly kill the process.
408413
time.sleep(1)
@@ -493,6 +498,8 @@ def make_deserialize(params, batch_size, training=False):
493498
}
494499
if training:
495500
feature_map["labels"] = tf.FixedLenFeature([], dtype=tf.string)
501+
else:
502+
feature_map[rconst.DUPLICATE_MASK] = tf.FixedLenFeature([], dtype=tf.string)
496503

497504
def deserialize(examples_serialized):
498505
"""Called by Dataset.map() to convert batches of records to tensors."""
@@ -506,13 +513,17 @@ def deserialize(examples_serialized):
506513
items = tf.cast(items, tf.int32) # TPU doesn't allow uint16 infeed.
507514

508515
if not training:
516+
dupe_mask = tf.reshape(tf.cast(tf.decode_raw(
517+
features[rconst.DUPLICATE_MASK], tf.int8), tf.bool), (batch_size,))
509518
return {
510519
movielens.USER_COLUMN: users,
511520
movielens.ITEM_COLUMN: items,
521+
rconst.DUPLICATE_MASK: dupe_mask,
512522
}
513523

514524
labels = tf.reshape(tf.cast(tf.decode_raw(
515525
features["labels"], tf.int8), tf.bool), (batch_size,))
526+
516527
return {
517528
movielens.USER_COLUMN: users,
518529
movielens.ITEM_COLUMN: items,

official/recommendation/data_test.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
NUM_ITEMS = 2000
3737
NUM_PTS = 50000
3838
BATCH_SIZE = 2048
39+
EVAL_BATCH_SIZE = 4000
3940
NUM_NEG = 4
4041

4142

@@ -112,8 +113,8 @@ def drain_dataset(self, dataset, g):
112113
def test_end_to_end(self):
113114
ncf_dataset, _ = data_preprocessing.instantiate_pipeline(
114115
dataset=DATASET, data_dir=self.temp_data_dir,
115-
batch_size=BATCH_SIZE, eval_batch_size=BATCH_SIZE, num_data_readers=2,
116-
num_neg=NUM_NEG)
116+
batch_size=BATCH_SIZE, eval_batch_size=EVAL_BATCH_SIZE,
117+
num_data_readers=2, num_neg=NUM_NEG)
117118

118119
g = tf.Graph()
119120
with g.as_default():

0 commit comments

Comments
 (0)