1
+ from __future__ import absolute_import
2
+ from __future__ import division
3
+ from __future__ import print_function
4
+ import os
5
+ import json
6
+ import tensorflow as tf
7
+ from tensorflow import feature_column as fc
8
+ # for python 2.x
9
+ #import sys
10
+ #reload(sys)
11
+ #sys.setdefaultencoding("utf-8")
12
+
13
+ flags = tf .app .flags
14
+ flags .DEFINE_string ("model_dir" , "./model_dir" , "Base directory for the model." )
15
+ flags .DEFINE_string ("output_model" , "./model_output" , "Path to the training data." )
16
+ flags .DEFINE_string ("train_data" , "data/samples" , "Directory for storing mnist data" )
17
+ flags .DEFINE_string ("eval_data" , "data/eval" , "Path to the evaluation data." )
18
+ flags .DEFINE_string ("hidden_units" , "512,256,128" , "Comma-separated list of number of units in each hidden layer of the NN" )
19
+ flags .DEFINE_integer ("train_steps" , 10000 ,
20
+ "Number of (global) training steps to perform" )
21
+ flags .DEFINE_integer ("batch_size" , 512 , "Training batch size" )
22
+ flags .DEFINE_integer ("shuffle_buffer_size" , 10000 , "dataset shuffle buffer size" )
23
+ flags .DEFINE_float ("learning_rate" , 0.01 , "Learning rate" )
24
+ flags .DEFINE_float ("dropout_rate" , 0.25 , "Drop out rate" )
25
+ flags .DEFINE_integer ("num_parallel_readers" , 5 , "number of parallel readers for training data" )
26
+ flags .DEFINE_integer ("save_checkpoints_steps" , 5000 , "Save checkpoints every this many steps" )
27
+ flags .DEFINE_string ("ps_hosts" , "s-xiasha-10-2-176-43.hx:2222" ,
28
+ "Comma-separated list of hostname:port pairs" )
29
+ flags .DEFINE_string ("worker_hosts" , "s-xiasha-10-2-176-42.hx:2223,s-xiasha-10-2-176-44.hx:2224" ,
30
+ "Comma-separated list of hostname:port pairs" )
31
+ flags .DEFINE_string ("job_name" , None , "job name: worker or ps" )
32
+ flags .DEFINE_integer ("task_index" , None ,
33
+ "Worker task index, should be >= 0. task_index=0 is "
34
+ "the master worker task the performs the variable "
35
+ "initialization " )
36
+ flags .DEFINE_boolean ("run_on_cluster" , False , "Whether the cluster info need to be passed in as input" )
37
+
38
+ FLAGS = flags .FLAGS
39
+ my_feature_columns = []
40
+
41
+
42
+ def set_tfconfig_environ ():
43
+ if "TF_CLUSTER_DEF" in os .environ :
44
+ cluster = json .loads (os .environ ["TF_CLUSTER_DEF" ])
45
+ task_index = int (os .environ ["TF_INDEX" ])
46
+ task_type = os .environ ["TF_ROLE" ]
47
+
48
+ tf_config = dict ()
49
+ worker_num = len (cluster ["worker" ])
50
+ if task_type == "ps" :
51
+ tf_config ["task" ] = {"index" : task_index , "type" : task_type }
52
+ FLAGS .job_name = "ps"
53
+ FLAGS .task_index = task_index
54
+ else :
55
+ if task_index == 0 :
56
+ tf_config ["task" ] = {"index" : 0 , "type" : "chief" }
57
+ else :
58
+ tf_config ["task" ] = {"index" : task_index - 1 , "type" : task_type }
59
+ FLAGS .job_name = "worker"
60
+ FLAGS .task_index = task_index
61
+
62
+ if worker_num == 1 :
63
+ cluster ["chief" ] = cluster ["worker" ]
64
+ del cluster ["worker" ]
65
+ else :
66
+ cluster ["chief" ] = [cluster ["worker" ][0 ]]
67
+ del cluster ["worker" ][0 ]
68
+
69
+ tf_config ["cluster" ] = cluster
70
+ os .environ ["TF_CONFIG" ] = json .dumps (tf_config )
71
+ print ("TF_CONFIG" , json .loads (os .environ ["TF_CONFIG" ]))
72
+
73
+ if "INPUT_FILE_LIST" in os .environ :
74
+ INPUT_PATH = json .loads (os .environ ["INPUT_FILE_LIST" ])
75
+ if INPUT_PATH :
76
+ print ("input path:" , INPUT_PATH )
77
+ FLAGS .train_data = INPUT_PATH .get (FLAGS .train_data )
78
+ FLAGS .eval_data = INPUT_PATH .get (FLAGS .eval_data )
79
+ else : # for ps
80
+ print ("load input path failed." )
81
+ FLAGS .train_data = None
82
+ FLAGS .eval_data = None
83
+
84
+
85
+ def parse_argument ():
86
+ if FLAGS .job_name is None or FLAGS .job_name == "" :
87
+ raise ValueError ("Must specify an explicit `job_name`" )
88
+ if FLAGS .task_index is None or FLAGS .task_index == "" :
89
+ raise ValueError ("Must specify an explicit `task_index`" )
90
+
91
+ print ("job name = %s" % FLAGS .job_name )
92
+ print ("task index = %d" % FLAGS .task_index )
93
+ os .environ ["TF_ROLE" ] = FLAGS .job_name
94
+ os .environ ["TF_INDEX" ] = str (FLAGS .task_index )
95
+
96
+ # Construct the cluster and start the server
97
+ ps_spec = FLAGS .ps_hosts .split ("," )
98
+ worker_spec = FLAGS .worker_hosts .split ("," )
99
+ cluster = {"worker" : worker_spec , "ps" : ps_spec }
100
+ os .environ ["TF_CLUSTER_DEF" ] = json .dumps (cluster )
101
+
102
+
103
+ def create_feature_columns ():
104
+ # user feature
105
+ bids = fc .categorical_column_with_hash_bucket ("behaviorBids" , 10240 , dtype = tf .int64 )
106
+ c1ids = fc .categorical_column_with_hash_bucket ("behaviorC1ids" , 100 , dtype = tf .int64 )
107
+ cids = fc .categorical_column_with_hash_bucket ("behaviorCids" , 10240 , dtype = tf .int64 )
108
+ sids = fc .categorical_column_with_hash_bucket ("behaviorSids" , 10240 , dtype = tf .int64 )
109
+ pids = fc .categorical_column_with_hash_bucket ("behaviorPids" , 1000000 , dtype = tf .int64 )
110
+ bids_weighted = fc .weighted_categorical_column (bids , "bidWeights" )
111
+ c1ids_weighted = fc .weighted_categorical_column (c1ids , "c1idWeights" )
112
+ cids_weighted = fc .weighted_categorical_column (cids , "cidWeights" )
113
+ sids_weighted = fc .weighted_categorical_column (sids , "sidWeights" )
114
+ pids_weighted = fc .weighted_categorical_column (pids , "pidWeights" )
115
+
116
+ # item feature
117
+ pid = fc .categorical_column_with_hash_bucket ("productId" , 1000000 , dtype = tf .int64 )
118
+ sid = fc .categorical_column_with_hash_bucket ("sellerId" , 10240 , dtype = tf .int64 )
119
+ bid = fc .categorical_column_with_hash_bucket ("brandId" , 10240 , dtype = tf .int64 )
120
+ c1id = fc .categorical_column_with_hash_bucket ("cate1Id" , 100 , dtype = tf .int64 )
121
+ cid = fc .categorical_column_with_hash_bucket ("cateId" , 10240 , dtype = tf .int64 )
122
+
123
+ # context feature
124
+ matchScore = fc .numeric_column ("matchScore" , default_value = 0.0 )
125
+ popScore = fc .numeric_column ("popScore" , default_value = 0.0 )
126
+ brandPrefer = fc .numeric_column ("brandPrefer" , default_value = 0.0 )
127
+ cate2Prefer = fc .numeric_column ("cate2Prefer" , default_value = 0.0 )
128
+ catePrefer = fc .numeric_column ("catePrefer" , default_value = 0.0 )
129
+ sellerPrefer = fc .numeric_column ("sellerPrefer" , default_value = 0.0 )
130
+ matchType = fc .indicator_column (fc .categorical_column_with_identity ("matchType" , 9 , default_value = 0 ))
131
+ postition = fc .indicator_column (fc .categorical_column_with_identity ("position" , 201 , default_value = 200 ))
132
+ triggerNum = fc .indicator_column (fc .categorical_column_with_identity ("triggerNum" , 51 , default_value = 50 ))
133
+ triggerRank = fc .indicator_column (fc .categorical_column_with_identity ("triggerRank" , 51 , default_value = 50 ))
134
+ sceneType = fc .indicator_column (fc .categorical_column_with_identity ("type" , 2 , default_value = 0 ))
135
+ hour = fc .indicator_column (fc .categorical_column_with_identity ("hour" , 24 , default_value = 0 ))
136
+ phoneBrand = fc .indicator_column (fc .categorical_column_with_hash_bucket ("phoneBrand" , 1000 ))
137
+ phoneResolution = fc .indicator_column (fc .categorical_column_with_hash_bucket ("phoneResolution" , 500 ))
138
+ phoneOs = fc .indicator_column (
139
+ fc .categorical_column_with_vocabulary_list ("phoneOs" , ["android" , "ios" ], default_value = 0 ))
140
+ tab = fc .indicator_column (fc .categorical_column_with_vocabulary_list ("tab" ,
141
+ ["ALL" , "TongZhuang" , "XieBao" , "MuYing" , "NvZhuang" , "MeiZhuang" , "JuJia" , "MeiShi" ], default_value = 0 ))
142
+
143
+ pid_embed = fc .shared_embedding_columns ([pids_weighted , pid ], 64 , combiner = 'sum' , shared_embedding_collection_name = "pid" )
144
+ bid_embed = fc .shared_embedding_columns ([bids_weighted , bid ], 32 , combiner = 'sum' , shared_embedding_collection_name = "bid" )
145
+ cid_embed = fc .shared_embedding_columns ([cids_weighted , cid ], 32 , combiner = 'sum' , shared_embedding_collection_name = "cid" )
146
+ c1id_embed = fc .shared_embedding_columns ([c1ids_weighted , c1id ], 10 , combiner = 'sum' , shared_embedding_collection_name = "c1id" )
147
+ sid_embed = fc .shared_embedding_columns ([sids_weighted , sid ], 32 , combiner = 'sum' , shared_embedding_collection_name = "sid" )
148
+ global my_feature_columns
149
+ my_feature_columns = [matchScore , matchType , postition , triggerNum , triggerRank , sceneType , hour , phoneBrand , phoneResolution ,
150
+ phoneOs , tab , popScore , sellerPrefer , brandPrefer , cate2Prefer , catePrefer ]
151
+ my_feature_columns += pid_embed
152
+ my_feature_columns += sid_embed
153
+ my_feature_columns += bid_embed
154
+ my_feature_columns += cid_embed
155
+ my_feature_columns += c1id_embed
156
+ print ("feature columns:" , my_feature_columns )
157
+ return my_feature_columns
158
+
159
+
160
+ def parse_exmp (serial_exmp ):
161
+ click = fc .numeric_column ("click" , default_value = 0 , dtype = tf .int64 )
162
+ pay = fc .numeric_column ("pay" , default_value = 0 , dtype = tf .int64 )
163
+ fea_columns = [click , pay ]
164
+ fea_columns += my_feature_columns
165
+ feature_spec = tf .feature_column .make_parse_example_spec (fea_columns )
166
+ feats = tf .parse_single_example (serial_exmp , features = feature_spec )
167
+ click = feats .pop ('click' )
168
+ pay = feats .pop ('pay' )
169
+ return feats , {'ctr' : tf .to_float (click ), 'cvr' : tf .to_float (pay )}
170
+
171
+
172
+ def train_input_fn (filenames , batch_size , shuffle_buffer_size ):
173
+ #dataset = tf.data.TFRecordDataset(filenames)
174
+ files = tf .data .Dataset .list_files (filenames )
175
+ dataset = files .apply (tf .contrib .data .parallel_interleave (tf .data .TFRecordDataset , cycle_length = FLAGS .num_parallel_readers ))
176
+ # Shuffle, repeat, and batch the examples.
177
+ if shuffle_buffer_size > 0 :
178
+ dataset = dataset .shuffle (shuffle_buffer_size )
179
+ #dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func=parse_exmp, batch_size=batch_size))
180
+ #dataset = dataset.repeat().prefetch(1)
181
+ dataset = dataset .map (parse_exmp , num_parallel_calls = 8 )
182
+ dataset = dataset .repeat ().batch (batch_size ).prefetch (1 )
183
+ print (dataset .output_types )
184
+ print (dataset .output_shapes )
185
+ # Return the read end of the pipeline.
186
+ return dataset
187
+
188
+
189
+ def eval_input_fn (filename , batch_size ):
190
+ dataset = tf .data .TFRecordDataset (filename )
191
+ dataset = dataset .map (parse_exmp , num_parallel_calls = 8 )
192
+ # Shuffle, repeat, and batch the examples.
193
+ dataset = dataset .batch (batch_size )
194
+ # Return the read end of the pipeline.
195
+ return dataset
196
+
197
+
198
+ def build_mode (features , mode , params ):
199
+ net = fc .input_layer (features , params ['feature_columns' ])
200
+ # Build the hidden layers, sized according to the 'hidden_units' param.
201
+ for units in params ['hidden_units' ]:
202
+ net = tf .layers .dense (net , units = units , activation = tf .nn .relu )
203
+ if 'dropout_rate' in params and params ['dropout_rate' ] > 0.0 :
204
+ net = tf .layers .dropout (net , params ['dropout_rate' ], training = (mode == tf .estimator .ModeKeys .TRAIN ))
205
+ # Compute logits
206
+ logits = tf .layers .dense (net , 1 , activation = None )
207
+ return logits
208
+
209
+ def my_model (features , labels , mode , params ):
210
+ with tf .variable_scope ('ctr_model' ):
211
+ ctr_logits = build_mode (features , mode , params )
212
+ with tf .variable_scope ('cvr_model' ):
213
+ cvr_logits = build_mode (features , mode , params )
214
+
215
+ ctr_predictions = tf .sigmoid (ctr_logits )
216
+ prop = tf .multiply (ctr_predictions , tf .sigmoid (cvr_logits ))
217
+ if mode == tf .estimator .ModeKeys .PREDICT :
218
+ predictions = {
219
+ 'probabilities' : prop ,
220
+ 'ctr_probabilities' : ctr_predictions
221
+ }
222
+ export_outputs = {
223
+ 'prediction' : tf .estimator .export .PredictOutput (predictions )
224
+ }
225
+ return tf .estimator .EstimatorSpec (mode , predictions = predictions , export_outputs = export_outputs )
226
+
227
+ y = labels ['cvr' ]
228
+ cvr_loss = tf .reduce_sum (tf .keras .backend .binary_crossentropy (y , prop ), name = "cvr_loss" )
229
+ ctr_loss = tf .reduce_sum (tf .nn .sigmoid_cross_entropy_with_logits (labels = labels ['ctr' ], logits = ctr_logits ), name = "ctr_loss" )
230
+ loss = tf .add (ctr_loss , cvr_loss , name = "ctcvr_loss" )
231
+
232
+ ctr_accuracy = tf .metrics .accuracy (labels = labels ['ctr' ], predictions = tf .to_float (tf .greater_equal (ctr_predictions , 0.5 )))
233
+ cvr_accuracy = tf .metrics .accuracy (labels = y , predictions = tf .to_float (tf .greater_equal (prop , 0.5 )))
234
+ ctr_auc = tf .metrics .auc (labels ['ctr' ], ctr_predictions )
235
+ cvr_auc = tf .metrics .auc (y , prop )
236
+ metrics = {'cvr_accuracy' : cvr_accuracy , 'ctr_accuracy' : ctr_accuracy , 'ctr_auc' : ctr_auc , 'cvr_auc' : cvr_auc }
237
+ tf .summary .scalar ('ctr_accuracy' , ctr_accuracy [1 ])
238
+ tf .summary .scalar ('cvr_accuracy' , cvr_accuracy [1 ])
239
+ tf .summary .scalar ('ctr_auc' , ctr_auc [1 ])
240
+ tf .summary .scalar ('cvr_auc' , cvr_auc [1 ])
241
+ if mode == tf .estimator .ModeKeys .EVAL :
242
+ return tf .estimator .EstimatorSpec (mode , loss = loss , eval_metric_ops = metrics )
243
+
244
+ # Create training op.
245
+ assert mode == tf .estimator .ModeKeys .TRAIN
246
+ optimizer = tf .train .AdagradOptimizer (learning_rate = params ['learning_rate' ])
247
+ train_op = optimizer .minimize (loss , global_step = tf .train .get_global_step ())
248
+ return tf .estimator .EstimatorSpec (mode , loss = loss , train_op = train_op )
249
+
250
+ def main (unused_argv ):
251
+ set_tfconfig_environ ()
252
+ create_feature_columns ()
253
+ classifier = tf .estimator .Estimator (
254
+ model_fn = my_model ,
255
+ params = {
256
+ 'feature_columns' : my_feature_columns ,
257
+ 'hidden_units' : FLAGS .hidden_units .split (',' ),
258
+ 'learning_rate' : FLAGS .learning_rate ,
259
+ 'dropout_rate' : FLAGS .dropout_rate
260
+ },
261
+ config = tf .estimator .RunConfig (model_dir = FLAGS .model_dir , save_checkpoints_steps = FLAGS .save_checkpoints_steps )
262
+ )
263
+ batch_size = FLAGS .batch_size
264
+ print ("train steps:" , FLAGS .train_steps , "batch_size:" , batch_size )
265
+ if isinstance (FLAGS .train_data , str ) and os .path .isdir (FLAGS .train_data ):
266
+ train_files = [FLAGS .train_data + '/' + x for x in os .listdir (FLAGS .train_data )] if os .path .isdir (
267
+ FLAGS .train_data ) else FLAGS .train_data
268
+ else :
269
+ train_files = FLAGS .train_data
270
+ if isinstance (FLAGS .eval_data , str ) and os .path .isdir (FLAGS .eval_data ):
271
+ eval_files = [FLAGS .eval_data + '/' + x for x in os .listdir (FLAGS .eval_data )] if os .path .isdir (
272
+ FLAGS .eval_data ) else FLAGS .eval_data
273
+ else :
274
+ eval_files = FLAGS .eval_data
275
+ shuffle_buffer_size = FLAGS .shuffle_buffer_size
276
+ print ("train_data:" , train_files )
277
+ print ("eval_data:" , eval_files )
278
+ print ("shuffle_buffer_size:" , shuffle_buffer_size )
279
+
280
+ train_spec = tf .estimator .TrainSpec (
281
+ input_fn = lambda : train_input_fn (train_files , batch_size , shuffle_buffer_size ),
282
+ max_steps = FLAGS .train_steps
283
+ )
284
+ input_fn_for_eval = lambda : eval_input_fn (eval_files , batch_size )
285
+ eval_spec = tf .estimator .EvalSpec (input_fn = input_fn_for_eval , throttle_secs = 600 )
286
+
287
+ print ("before train and evaluate" )
288
+ tf .estimator .train_and_evaluate (classifier , train_spec , eval_spec )
289
+ print ("after train and evaluate" )
290
+
291
+ # Evaluate accuracy.
292
+ results = classifier .evaluate (input_fn = input_fn_for_eval )
293
+ for key in sorted (results ): print ('%s: %s' % (key , results [key ]))
294
+ print ("after evaluate" )
295
+
296
+ if FLAGS .job_name == "worker" and FLAGS .task_index == 0 :
297
+ print ("exporting model ..." )
298
+ feature_spec = tf .feature_column .make_parse_example_spec (my_feature_columns )
299
+ print (feature_spec )
300
+ serving_input_receiver_fn = tf .estimator .export .build_parsing_serving_input_receiver_fn (feature_spec )
301
+ classifier .export_savedmodel (FLAGS .output_model , serving_input_receiver_fn )
302
+ print ("quit main" )
303
+
304
+
305
+ if __name__ == "__main__" :
306
+ if "CUDA_VISIBLE_DEVICES" in os .environ :
307
+ print ("CUDA_VISIBLE_DEVICES:" , os .environ ["CUDA_VISIBLE_DEVICES" ])
308
+ if FLAGS .run_on_cluster : parse_argument ()
309
+ tf .logging .set_verbosity (tf .logging .INFO )
310
+ tf .app .run (main = main )
0 commit comments