Skip to content

Commit 498cd77

Browse files
author
xudong.yang
committed
add new model
1 parent 63968ad commit 498cd77

File tree

2 files changed

+338
-0
lines changed

2 files changed

+338
-0
lines changed

esmm/esmm.py

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
import os
5+
import json
6+
import tensorflow as tf
7+
from tensorflow import feature_column as fc
8+
# for python 2.x
9+
#import sys
10+
#reload(sys)
11+
#sys.setdefaultencoding("utf-8")
12+
13+
flags = tf.app.flags
14+
flags.DEFINE_string("model_dir", "./model_dir", "Base directory for the model.")
15+
flags.DEFINE_string("output_model", "./model_output", "Path to the training data.")
16+
flags.DEFINE_string("train_data", "data/samples", "Directory for storing mnist data")
17+
flags.DEFINE_string("eval_data", "data/eval", "Path to the evaluation data.")
18+
flags.DEFINE_string("hidden_units", "512,256,128", "Comma-separated list of number of units in each hidden layer of the NN")
19+
flags.DEFINE_integer("train_steps", 10000,
20+
"Number of (global) training steps to perform")
21+
flags.DEFINE_integer("batch_size", 512, "Training batch size")
22+
flags.DEFINE_integer("shuffle_buffer_size", 10000, "dataset shuffle buffer size")
23+
flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
24+
flags.DEFINE_float("dropout_rate", 0.25, "Drop out rate")
25+
flags.DEFINE_integer("num_parallel_readers", 5, "number of parallel readers for training data")
26+
flags.DEFINE_integer("save_checkpoints_steps", 5000, "Save checkpoints every this many steps")
27+
flags.DEFINE_string("ps_hosts", "s-xiasha-10-2-176-43.hx:2222",
28+
"Comma-separated list of hostname:port pairs")
29+
flags.DEFINE_string("worker_hosts", "s-xiasha-10-2-176-42.hx:2223,s-xiasha-10-2-176-44.hx:2224",
30+
"Comma-separated list of hostname:port pairs")
31+
flags.DEFINE_string("job_name", None, "job name: worker or ps")
32+
flags.DEFINE_integer("task_index", None,
33+
"Worker task index, should be >= 0. task_index=0 is "
34+
"the master worker task the performs the variable "
35+
"initialization ")
36+
flags.DEFINE_boolean("run_on_cluster", False, "Whether the cluster info need to be passed in as input")
37+
38+
FLAGS = flags.FLAGS
39+
my_feature_columns = []
40+
41+
42+
def set_tfconfig_environ():
43+
if "TF_CLUSTER_DEF" in os.environ:
44+
cluster = json.loads(os.environ["TF_CLUSTER_DEF"])
45+
task_index = int(os.environ["TF_INDEX"])
46+
task_type = os.environ["TF_ROLE"]
47+
48+
tf_config = dict()
49+
worker_num = len(cluster["worker"])
50+
if task_type == "ps":
51+
tf_config["task"] = {"index": task_index, "type": task_type}
52+
FLAGS.job_name = "ps"
53+
FLAGS.task_index = task_index
54+
else:
55+
if task_index == 0:
56+
tf_config["task"] = {"index": 0, "type": "chief"}
57+
else:
58+
tf_config["task"] = {"index": task_index - 1, "type": task_type}
59+
FLAGS.job_name = "worker"
60+
FLAGS.task_index = task_index
61+
62+
if worker_num == 1:
63+
cluster["chief"] = cluster["worker"]
64+
del cluster["worker"]
65+
else:
66+
cluster["chief"] = [cluster["worker"][0]]
67+
del cluster["worker"][0]
68+
69+
tf_config["cluster"] = cluster
70+
os.environ["TF_CONFIG"] = json.dumps(tf_config)
71+
print("TF_CONFIG", json.loads(os.environ["TF_CONFIG"]))
72+
73+
if "INPUT_FILE_LIST" in os.environ:
74+
INPUT_PATH = json.loads(os.environ["INPUT_FILE_LIST"])
75+
if INPUT_PATH:
76+
print("input path:", INPUT_PATH)
77+
FLAGS.train_data = INPUT_PATH.get(FLAGS.train_data)
78+
FLAGS.eval_data = INPUT_PATH.get(FLAGS.eval_data)
79+
else: # for ps
80+
print("load input path failed.")
81+
FLAGS.train_data = None
82+
FLAGS.eval_data = None
83+
84+
85+
def parse_argument():
86+
if FLAGS.job_name is None or FLAGS.job_name == "":
87+
raise ValueError("Must specify an explicit `job_name`")
88+
if FLAGS.task_index is None or FLAGS.task_index == "":
89+
raise ValueError("Must specify an explicit `task_index`")
90+
91+
print("job name = %s" % FLAGS.job_name)
92+
print("task index = %d" % FLAGS.task_index)
93+
os.environ["TF_ROLE"] = FLAGS.job_name
94+
os.environ["TF_INDEX"] = str(FLAGS.task_index)
95+
96+
# Construct the cluster and start the server
97+
ps_spec = FLAGS.ps_hosts.split(",")
98+
worker_spec = FLAGS.worker_hosts.split(",")
99+
cluster = {"worker": worker_spec, "ps": ps_spec}
100+
os.environ["TF_CLUSTER_DEF"] = json.dumps(cluster)
101+
102+
103+
def create_feature_columns():
104+
# user feature
105+
bids = fc.categorical_column_with_hash_bucket("behaviorBids", 10240, dtype=tf.int64)
106+
c1ids = fc.categorical_column_with_hash_bucket("behaviorC1ids", 100, dtype=tf.int64)
107+
cids = fc.categorical_column_with_hash_bucket("behaviorCids", 10240, dtype=tf.int64)
108+
sids = fc.categorical_column_with_hash_bucket("behaviorSids", 10240, dtype=tf.int64)
109+
pids = fc.categorical_column_with_hash_bucket("behaviorPids", 1000000, dtype=tf.int64)
110+
bids_weighted = fc.weighted_categorical_column(bids, "bidWeights")
111+
c1ids_weighted = fc.weighted_categorical_column(c1ids, "c1idWeights")
112+
cids_weighted = fc.weighted_categorical_column(cids, "cidWeights")
113+
sids_weighted = fc.weighted_categorical_column(sids, "sidWeights")
114+
pids_weighted = fc.weighted_categorical_column(pids, "pidWeights")
115+
116+
# item feature
117+
pid = fc.categorical_column_with_hash_bucket("productId", 1000000, dtype=tf.int64)
118+
sid = fc.categorical_column_with_hash_bucket("sellerId", 10240, dtype=tf.int64)
119+
bid = fc.categorical_column_with_hash_bucket("brandId", 10240, dtype=tf.int64)
120+
c1id = fc.categorical_column_with_hash_bucket("cate1Id", 100, dtype=tf.int64)
121+
cid = fc.categorical_column_with_hash_bucket("cateId", 10240, dtype=tf.int64)
122+
123+
# context feature
124+
matchScore = fc.numeric_column("matchScore", default_value=0.0)
125+
popScore = fc.numeric_column("popScore", default_value=0.0)
126+
brandPrefer = fc.numeric_column("brandPrefer", default_value=0.0)
127+
cate2Prefer = fc.numeric_column("cate2Prefer", default_value=0.0)
128+
catePrefer = fc.numeric_column("catePrefer", default_value=0.0)
129+
sellerPrefer = fc.numeric_column("sellerPrefer", default_value=0.0)
130+
matchType = fc.indicator_column(fc.categorical_column_with_identity("matchType", 9, default_value=0))
131+
postition = fc.indicator_column(fc.categorical_column_with_identity("position", 201, default_value=200))
132+
triggerNum = fc.indicator_column(fc.categorical_column_with_identity("triggerNum", 51, default_value=50))
133+
triggerRank = fc.indicator_column(fc.categorical_column_with_identity("triggerRank", 51, default_value=50))
134+
sceneType = fc.indicator_column(fc.categorical_column_with_identity("type", 2, default_value=0))
135+
hour = fc.indicator_column(fc.categorical_column_with_identity("hour", 24, default_value=0))
136+
phoneBrand = fc.indicator_column(fc.categorical_column_with_hash_bucket("phoneBrand", 1000))
137+
phoneResolution = fc.indicator_column(fc.categorical_column_with_hash_bucket("phoneResolution", 500))
138+
phoneOs = fc.indicator_column(
139+
fc.categorical_column_with_vocabulary_list("phoneOs", ["android", "ios"], default_value=0))
140+
tab = fc.indicator_column(fc.categorical_column_with_vocabulary_list("tab",
141+
["ALL", "TongZhuang", "XieBao", "MuYing", "NvZhuang", "MeiZhuang", "JuJia", "MeiShi"], default_value=0))
142+
143+
pid_embed = fc.shared_embedding_columns([pids_weighted, pid], 64, combiner='sum', shared_embedding_collection_name="pid")
144+
bid_embed = fc.shared_embedding_columns([bids_weighted, bid], 32, combiner='sum', shared_embedding_collection_name="bid")
145+
cid_embed = fc.shared_embedding_columns([cids_weighted, cid], 32, combiner='sum', shared_embedding_collection_name="cid")
146+
c1id_embed = fc.shared_embedding_columns([c1ids_weighted, c1id], 10, combiner='sum', shared_embedding_collection_name="c1id")
147+
sid_embed = fc.shared_embedding_columns([sids_weighted, sid], 32, combiner='sum', shared_embedding_collection_name="sid")
148+
global my_feature_columns
149+
my_feature_columns = [matchScore, matchType, postition, triggerNum, triggerRank, sceneType, hour, phoneBrand, phoneResolution,
150+
phoneOs, tab, popScore, sellerPrefer, brandPrefer, cate2Prefer, catePrefer]
151+
my_feature_columns += pid_embed
152+
my_feature_columns += sid_embed
153+
my_feature_columns += bid_embed
154+
my_feature_columns += cid_embed
155+
my_feature_columns += c1id_embed
156+
print("feature columns:", my_feature_columns)
157+
return my_feature_columns
158+
159+
160+
def parse_exmp(serial_exmp):
161+
click = fc.numeric_column("click", default_value=0, dtype=tf.int64)
162+
pay = fc.numeric_column("pay", default_value=0, dtype=tf.int64)
163+
fea_columns = [click, pay]
164+
fea_columns += my_feature_columns
165+
feature_spec = tf.feature_column.make_parse_example_spec(fea_columns)
166+
feats = tf.parse_single_example(serial_exmp, features=feature_spec)
167+
click = feats.pop('click')
168+
pay = feats.pop('pay')
169+
return feats, {'ctr': tf.to_float(click), 'cvr': tf.to_float(pay)}
170+
171+
172+
def train_input_fn(filenames, batch_size, shuffle_buffer_size):
173+
#dataset = tf.data.TFRecordDataset(filenames)
174+
files = tf.data.Dataset.list_files(filenames)
175+
dataset = files.apply(tf.contrib.data.parallel_interleave(tf.data.TFRecordDataset, cycle_length=FLAGS.num_parallel_readers))
176+
# Shuffle, repeat, and batch the examples.
177+
if shuffle_buffer_size > 0:
178+
dataset = dataset.shuffle(shuffle_buffer_size)
179+
#dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func=parse_exmp, batch_size=batch_size))
180+
#dataset = dataset.repeat().prefetch(1)
181+
dataset = dataset.map(parse_exmp, num_parallel_calls=8)
182+
dataset = dataset.repeat().batch(batch_size).prefetch(1)
183+
print(dataset.output_types)
184+
print(dataset.output_shapes)
185+
# Return the read end of the pipeline.
186+
return dataset
187+
188+
189+
def eval_input_fn(filename, batch_size):
190+
dataset = tf.data.TFRecordDataset(filename)
191+
dataset = dataset.map(parse_exmp, num_parallel_calls=8)
192+
# Shuffle, repeat, and batch the examples.
193+
dataset = dataset.batch(batch_size)
194+
# Return the read end of the pipeline.
195+
return dataset
196+
197+
198+
def build_mode(features, mode, params):
199+
net = fc.input_layer(features, params['feature_columns'])
200+
# Build the hidden layers, sized according to the 'hidden_units' param.
201+
for units in params['hidden_units']:
202+
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
203+
if 'dropout_rate' in params and params['dropout_rate'] > 0.0:
204+
net = tf.layers.dropout(net, params['dropout_rate'], training=(mode == tf.estimator.ModeKeys.TRAIN))
205+
# Compute logits
206+
logits = tf.layers.dense(net, 1, activation=None)
207+
return logits
208+
209+
def my_model(features, labels, mode, params):
210+
with tf.variable_scope('ctr_model'):
211+
ctr_logits = build_mode(features, mode, params)
212+
with tf.variable_scope('cvr_model'):
213+
cvr_logits = build_mode(features, mode, params)
214+
215+
ctr_predictions = tf.sigmoid(ctr_logits)
216+
prop = tf.multiply(ctr_predictions, tf.sigmoid(cvr_logits))
217+
if mode == tf.estimator.ModeKeys.PREDICT:
218+
predictions = {
219+
'probabilities': prop,
220+
'ctr_probabilities': ctr_predictions
221+
}
222+
export_outputs = {
223+
'prediction': tf.estimator.export.PredictOutput(predictions)
224+
}
225+
return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)
226+
227+
y = labels['cvr']
228+
cvr_loss = tf.reduce_sum(tf.keras.backend.binary_crossentropy(y, prop), name="cvr_loss")
229+
ctr_loss = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(labels=labels['ctr'], logits=ctr_logits), name="ctr_loss")
230+
loss = tf.add(ctr_loss, cvr_loss, name="ctcvr_loss")
231+
232+
ctr_accuracy = tf.metrics.accuracy(labels=labels['ctr'], predictions=tf.to_float(tf.greater_equal(ctr_predictions, 0.5)))
233+
cvr_accuracy = tf.metrics.accuracy(labels=y, predictions=tf.to_float(tf.greater_equal(prop, 0.5)))
234+
ctr_auc = tf.metrics.auc(labels['ctr'], ctr_predictions)
235+
cvr_auc = tf.metrics.auc(y, prop)
236+
metrics = {'cvr_accuracy': cvr_accuracy, 'ctr_accuracy': ctr_accuracy, 'ctr_auc': ctr_auc, 'cvr_auc': cvr_auc}
237+
tf.summary.scalar('ctr_accuracy', ctr_accuracy[1])
238+
tf.summary.scalar('cvr_accuracy', cvr_accuracy[1])
239+
tf.summary.scalar('ctr_auc', ctr_auc[1])
240+
tf.summary.scalar('cvr_auc', cvr_auc[1])
241+
if mode == tf.estimator.ModeKeys.EVAL:
242+
return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)
243+
244+
# Create training op.
245+
assert mode == tf.estimator.ModeKeys.TRAIN
246+
optimizer = tf.train.AdagradOptimizer(learning_rate=params['learning_rate'])
247+
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
248+
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
249+
250+
def main(unused_argv):
251+
set_tfconfig_environ()
252+
create_feature_columns()
253+
classifier = tf.estimator.Estimator(
254+
model_fn=my_model,
255+
params={
256+
'feature_columns': my_feature_columns,
257+
'hidden_units': FLAGS.hidden_units.split(','),
258+
'learning_rate': FLAGS.learning_rate,
259+
'dropout_rate': FLAGS.dropout_rate
260+
},
261+
config=tf.estimator.RunConfig(model_dir=FLAGS.model_dir, save_checkpoints_steps=FLAGS.save_checkpoints_steps)
262+
)
263+
batch_size = FLAGS.batch_size
264+
print("train steps:", FLAGS.train_steps, "batch_size:", batch_size)
265+
if isinstance(FLAGS.train_data, str) and os.path.isdir(FLAGS.train_data):
266+
train_files = [FLAGS.train_data + '/' + x for x in os.listdir(FLAGS.train_data)] if os.path.isdir(
267+
FLAGS.train_data) else FLAGS.train_data
268+
else:
269+
train_files = FLAGS.train_data
270+
if isinstance(FLAGS.eval_data, str) and os.path.isdir(FLAGS.eval_data):
271+
eval_files = [FLAGS.eval_data + '/' + x for x in os.listdir(FLAGS.eval_data)] if os.path.isdir(
272+
FLAGS.eval_data) else FLAGS.eval_data
273+
else:
274+
eval_files = FLAGS.eval_data
275+
shuffle_buffer_size = FLAGS.shuffle_buffer_size
276+
print("train_data:", train_files)
277+
print("eval_data:", eval_files)
278+
print("shuffle_buffer_size:", shuffle_buffer_size)
279+
280+
train_spec = tf.estimator.TrainSpec(
281+
input_fn=lambda: train_input_fn(train_files, batch_size, shuffle_buffer_size),
282+
max_steps=FLAGS.train_steps
283+
)
284+
input_fn_for_eval = lambda: eval_input_fn(eval_files, batch_size)
285+
eval_spec = tf.estimator.EvalSpec(input_fn=input_fn_for_eval, throttle_secs=600)
286+
287+
print("before train and evaluate")
288+
tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)
289+
print("after train and evaluate")
290+
291+
# Evaluate accuracy.
292+
results = classifier.evaluate(input_fn=input_fn_for_eval)
293+
for key in sorted(results): print('%s: %s' % (key, results[key]))
294+
print("after evaluate")
295+
296+
if FLAGS.job_name == "worker" and FLAGS.task_index == 0:
297+
print("exporting model ...")
298+
feature_spec = tf.feature_column.make_parse_example_spec(my_feature_columns)
299+
print(feature_spec)
300+
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
301+
classifier.export_savedmodel(FLAGS.output_model, serving_input_receiver_fn)
302+
print("quit main")
303+
304+
305+
if __name__ == "__main__":
306+
if "CUDA_VISIBLE_DEVICES" in os.environ:
307+
print("CUDA_VISIBLE_DEVICES:", os.environ["CUDA_VISIBLE_DEVICES"])
308+
if FLAGS.run_on_cluster: parse_argument()
309+
tf.logging.set_verbosity(tf.logging.INFO)
310+
tf.app.run(main=main)

esmm/readme.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
本代码是“基于Tensorflow高阶API构建大规模分布式深度学习模型系列”文章的第三遍中介绍的内容的完整代码,文章以文本分类任务为例,详细介绍了如何创建一个基于Estimator接口的tensorflow模型,并使用`tf.estimator.train_and_evaluate`来完成模型训练和评估的完整过程。
2+
3+
## 使用说明
4+
5+
训练和测试数据打包在`dbpedia_csv.tar.gz`中,下载完整的代码目录,并解压`dbpedia_csv.tar.gz`,接着运行数据预处理任务:
6+
7+
```
8+
python build_vocab.py
9+
```
10+
11+
接着就可以训练和评估模型了:
12+
13+
```
14+
python word_cnn.py --train_steps=20000
15+
```
16+
17+
18+
## 推荐阅读
19+
20+
1. [基于Tensorflow高阶API构建大规模分布式深度学习模型系列: 开篇](https://zhuanlan.zhihu.com/p/38470806)
21+
2. [基于Tensorflow高阶API构建大规模分布式深度学习模型系列:基于Dataset API处理Input pipeline](https://zhuanlan.zhihu.com/p/38421397)
22+
3. [基于Tensorflow高阶API构建大规模分布式深度学习模型系列: 自定义Estimator(以文本分类CNN模型为例)](https://zhuanlan.zhihu.com/p/41473323)
23+
24+
## 后记
25+
26+
- 欢迎关注我的知乎专栏:[算法工程师的自我修养](https://zhuanlan.zhihu.com/yangxudong)
27+
- 欢迎收藏我的个人博客,会不定期更新:[https://yangxudong.github.io](https://yangxudong.github.io)
28+
或者国内镜像:[https://xudongyang.coding.me](https://xudongyang.coding.me)

0 commit comments

Comments
 (0)