diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d646e64 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +venv/ +build/ +dist/ + +.eggs/ +*.egg-info/ +.pytest_cache +__pycache__/ + +.idea/ + +*.swp +*.vim +*.pyc +*.log + +.DS_Store + +.vscode \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f91a3d9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,27 @@ +dist: xenial +cache: pip +branches: + only: + - master + - develop + - "/^v\\d+\\.\\d+(\\.\\d+)?(-\\S*)?$/" +language: python +python: +- 3.6 +- 3.7 +service: +- docker +install: +- python -m pip install --upgrade pip +- python -m pip install --upgrade setuptools>=41.0.0 +- python setup.py install +script: +- python setup.py -q test + +jobs: + include: + - stage: ElasticDLTest + script: + - cd base_image && docker build -t sqlflow/modelzoo_base . && cd .. + - cd sqlflow_models && docker build -t sqlflow/sqlflow_models . && cd .. + - bash scripts/elasticdl_travis_test_job.sh diff --git a/LICENSE b/LICENSE new file mode 100755 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100755 index 0000000..64ad321 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include README.md LICENSE diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1ad5ae0 --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +SHELL := /bin/bash + +setup: ## Setup virtual environment for local development + python3 -m venv venv + source venv/bin/activate && \ + pip install -U pip && \ + $(MAKE) install-requirements + +install-requirements: + pip install -U -e . + +test: ## Run tests + python3 setup.py test + +clean: ## Clean up temporary folders + rm -rf build dist .eggs *.egg-info .pytest_cache sqlflow/proto + +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +.PHONY: help +.DEFAULT_GOAL := help diff --git a/README.md b/README.md index 394cc71..53611bb 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,25 @@ -# models -Premade Models for SQLFlow +# SQLFlow Models + +[![Build Status](https://travis-ci.com/sql-machine-learning/models.svg?branch=develop)](https://travis-ci.org/sql-machine-learning) [![PyPI Package](https://img.shields.io/pypi/v/sqlflow_models.svg)](https://pypi.python.org/pypi/sqlflow_models) + +Premade Models for [SQLFlow](https://github.com/sql-machine-learning/sqlflow). + +## Installation + +This package is available on PyPI as `sqlflow_models`. So you can install it by running the following command: + + ```bash + pip install sqlflow_models + ``` + +## Development + +## Prerequisite +### Python 3 +`brew install python` + +### Setup Environment +`make setup` + +### Test +`make test` diff --git a/base_image/Dockerfile b/base_image/Dockerfile new file mode 100644 index 0000000..21ef106 --- /dev/null +++ b/base_image/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.7 + +# install PAI python support +RUN pip install pypai + +# install go needed by installing ElasticDL +ENV GOPATH /root/go +ENV PATH /usr/local/go/bin:$GOPATH/bin:$PATH +RUN curl --silent https://dl.google.com/go/go1.13.4.linux-amd64.tar.gz | tar -C /usr/local -xzf - + +# install ElasticDL to manage ElasticDL jobs +RUN git clone https://github.com/sql-machine-learning/elasticdl.git && \ +cd elasticdl && \ +git checkout 62b255a918df5b6594c888b19aebbcc74bbce6e4 && \ +pip install -r elasticdl/requirements.txt && \ +python setup.py install && \ +cd .. && rm -rf elasticdl diff --git a/doc/contribute_models.md b/doc/contribute_models.md new file mode 100644 index 0000000..e7d00f2 --- /dev/null +++ b/doc/contribute_models.md @@ -0,0 +1,91 @@ +# How to Contribute SQLFLow Models + +This guide will introduce how to contribute to SQLFlow models. You can find design doc: [Define SQLFLow Models](/doc/customized+model.md), and feel free to check it out. + +## Develop an SQLFlow Model + +1. Open the [SQLFlow models repo](https://github.com/sql-machine-learning/models) on your web browser, and fork the official repo to your account. + +1. Clone the forked repo on your hosts: + + ``` bash + > git clone https://github.com//models.git + ``` + +1. Set up your local python environment by `make setup && source venv/bin/activate`. If you are using [PyCharm](https://www.jetbrains.com/pycharm/), you can simply `make setup` and then import the `models` folder as a new project. + +1. You can add a new mode definition Python script under the folder [sqlflow_models](/sqlflow_models). For example, adding a new Python script `mydnnclassfier.py`: + + ``` text + `-sqlflow_models + |- dnnclassifier.py + `- mydnnclassifier.py + ``` + +1. You can choose whatever name you like for your model. Your model definition should be a [keras subclass model](https://keras.io/models/about-keras-models/#model-subclassing) + + ``` python + import tensorflow as tf + + class MyDNNClassifier(tf.keras.Model): + def __init__(self, feature_columns, hidden_units=[10,10], n_classes=2): + ... + ... + ``` + +1. Import `MyDNNClassfier` in [sqlflow_models/\_\_init__.py](/sqlflow_models/__init__.py): + + ``` python + ... + from .mydnnclassfier import MyDNNClassifier + ``` + +1. You can test your `MyDNNClassifier` by adding a new Python unit test script `tests/test_mydnnclassifier.py` and run the test as: `python tests/test_mydnnclassifier.py`: + + ``` python + from sqlflow_models import MyDNNClassifier + from tests.base import BaseTestCases + + import tensorflow as tf + import unittest + + + class TestMyDNNClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {...} + self.label = [...] + feature_columns = [...] + self.model = MyDNNClassifier(feature_columns=feature_columns) + + if __name__ == '__main__': + unittest.main() + ``` + +## Test and Debug Your Model With SQLFlow + +If you have developed a new model, please perform the integration test with the SQLFlow gRPC server to make sure it works well with SQLFlow. + +1. Launch an SQLFlow all-in-one Docker container + + ``` bash + cd ./models + > docker run --rm -it -v $PWD:/models -e PYTHONPATH=/models -p 8888:8888 sqlflow/sqlflow + ``` + +1. Open a web browser and go to `localhost:8888` to access the Jupyter Notebook. Using your custom model by modifying the `TRAIN` parameter of the SQLFlow extend SQL: `TRAIN sqlflow_models.MyDNNClassifier`: + +``` sql +SELECT * from iris.train +TRAIN sqlflow_models.MyDNNClassifier +WITH n_classes = 3, hidden_units = [10, 20] +COLUMN sepal_length, sepal_width, petal_length, petal_width +LABEL class +INTO sqlflow_models.my_dnn_model; +``` + +1. When you need to update the model and test a gain, just modify the mode Python file on your host then run the SQL statement in the notebook one more time. + +## Publish your model in the SQLFlow all-in-one Docker image + +If you have already tested your code, please create a pull request and invite other develops to review it. If one of the develops **approve** your pull request, then you can merge it to the develop branch. +The travis-ci would build the SQLFlow all-in-one Docker image with the latest models code every night and push it to the Docker hub with tag: `sqlflow/sqlflow:nightly`, you can find the latest models in it the second day. diff --git a/doc/customized+model.md b/doc/customized+model.md new file mode 100644 index 0000000..40f05ec --- /dev/null +++ b/doc/customized+model.md @@ -0,0 +1,127 @@ +# Design Doc: Define Models for SQLFlow + +SQLFlow enables SQL programs to call deep learning models defined in Python. This document is about how to define models for SQLFlow. + +## Keras v.s. Estimator + +Many deep leareners define models using Keras API or as an Estimator derived class. +We prefer [Keras](https://keras.io/) over [Estimator](https://www.tensorflow.org/guide/estimators) for some reasons: + +1. [TensorFlow Submit 2019](https://www.youtube.com/watch?v=k5c-vg4rjBw) announced that TensorFlow 2.x will closely integrate with Keras. + +2. We found more documents about Keras than Estimator. + +3. We found more models defined using Keras than Estimator. + +## Keras APIs + +Keras provides three approaches to define models. + +### 1. Subclassing `tf.keras.Model` + + ```python + class DNNClassifier(tf.keras.Model): + def __init__(self, feature_columns, hidden_units, n_classes): + super(DNNClassifier, self).__init__() + self.feature_layer = tf.keras.layers.DenseFeatures(feature_columns) + self.hidden_layers = [] + for hidden_unit in hidden_units: + self.hidden_layers.append(tf.keras.layers.Dense(hidden_unit)) + self.prediction_layer = tf.keras.layers.Dense(n_classes, activation='softmax') + + def call(self, inputs): + x = self.feature_layer(inputs) + for hidden_layer in self.hidden_layers: + x = hidden_layer(x) + return self.prediction_layer(x) + + model = DNNClassifier(feature_columns, hidden_units, n_classes) + ``` + + Please be aware that `tf.keras.Model` has methods `save_weights` and `load_weights`, which save/load model parameters but no the topology, as expalined in [this guidence](https://stackoverflow.com/questions/51806852/cant-save-custom-subclassed-model) and [this example list](https://stackoverflow.com/questions/52826134/keras-model-subclassing-examples). + +### 2. Functional API + + ```python + x = tf.feature_column.input_layer(shape=(5,)) + for n in hidden_units: + x = tf.keras.layers.Dense(n, activation='relu')(x) + pred = tf.keras.layers.Dense(n_classes, activation='softmax')(x) + model = tf.keras.models.Model(inputs=feature_columns, outputs=pred) + ``` + + Please be aware that functional API doesn't work with feature column API, as reported [here](https://github.com/tensorflow/tensorflow/issues/27416). However, the approach of deriving classes from `keras.Model` works with the feature column API. + +### 3. `keras.Sequential` + + ```python + model = tf.keras.Sequential() + model.add(tf.keras.layers.DenseFeatures(feature_columns)) + for n in hidden_units: + model.add(tf.keras.layers.Dense(n, activation='relu')) + model.add(tf.keras.layers.Dense(n_classes, activation='softmax')) + ``` + + Please be aware that `tf.keras.Sequential()` only covers a small variety of models. It doesn't cover many well-known models including ResNet, Transforms, and WideAndDeep. + +### The Choice + +We chose the approach of subclassing `tf.keras.Model` according to the following table. + +| Keras APIs | Work with feature column API | Save/load models | Model coverage | +| ------------------ | ---------------------------- | -------------------------- | -------------- | +| `tf.keras.Model` | ☑️ | weights-only, no topology | High | +| Functional API | ❌ | ☑️ | High | +| Sequential Model | ☑️ | ☑️ | Low | + + +## An Example + +Here is an example `DNNClassifier` of multiple hidden layers as a Python class derived from `tf.keras.Model`. To run it, please use TensorFlow 2.0 alpha or newer versions. + +```python +class DNNClassifier(tf.keras.Model): + def __init__(self, feature_columns, hidden_units, n_classes): + """DNNClassifier + :param feature_columns: feature columns. + :type feature_columns: list[tf.feature_column]. + :param hidden_units: number of hidden units. + :type hidden_units: list[int]. + :param n_classes: List of hidden units per layer. + :type n_classes: int. + """ + super(DNNClassifier, self).__init__() + + # combines all the data as a dense tensor + self.feature_layer = tf.keras.layers.DenseFeatures(feature_columns) + self.hidden_layers = [] + for hidden_unit in hidden_units: + self.hidden_layers.append(tf.keras.layers.Dense(hidden_unit)) + self.prediction_layer = tf.keras.layers.Dense(n_classes, activation='softmax') + + def call(self, inputs): + x = self.feature_layer(inputs) + for hidden_layer in self.hidden_layers: + x = hidden_layer(x) + return self.prediction_layer(x) + + def default_optimizer(self): + """Default optimizer name. Used in model.compile.""" + return 'adam' + + def default_loss(self): + """Default loss function. Used in model.compile.""" + return 'categorical_crossentropy' + + def default_training_epochs(self): + """Default training epochs. Used in model.fit.""" + return 5 + + def prepare_prediction_column(self, prediction): + """Return the class label of highest probability.""" + return prediction.argmax(axis=-1) +``` + +## Further Reading + +We read the following Keras source code files: [model.py](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/keras/models.py), [network.py](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/keras/engine/network.py), and [training.py](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/keras/engine/training.py). diff --git a/runnables/Dockerfile b/runnables/Dockerfile new file mode 100644 index 0000000..2c96426 --- /dev/null +++ b/runnables/Dockerfile @@ -0,0 +1,10 @@ +FROM sqlflow/sqlflow:step + +RUN apt-get clean && apt-get update && \ + apt-get -qq install libmysqld-dev libmysqlclient-dev + +ADD ./requirements.txt / +RUN pip3 install --no-cache-dir -r /requirements.txt && rm -rf /requirements.txt + +ADD . /opt/sqlflow/run +ENV PYTHONPATH "${PYTHONPATH}:/opt/sqlflow/run" diff --git a/runnables/bin/__init__.py b/runnables/bin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/runnables/bin/binning_calculator.py b/runnables/bin/binning_calculator.py new file mode 100644 index 0000000..adaa641 --- /dev/null +++ b/runnables/bin/binning_calculator.py @@ -0,0 +1,147 @@ +import mars.dataframe as md +import mars.tensor as mt +import numpy as np +import pandas as pd + + +class BinningMethod(object): + BUCKET = "bucket" + QUANTILE = "quantile" + LOG_BUCKET = "log_bucket" + + +def binning( + in_md, + col_name, + bin_method, + bins, + boundaries): + if boundaries: + bin_o, bins = md.cut(in_md[col_name], bins=boundaries, labels=False, retbins=True) + bins_np = bins.to_numpy() + else: + if bin_method.lower() == BinningMethod.BUCKET.lower(): + bin_o, bins = md.cut(in_md[col_name], bins=bins, labels=False, retbins=True) + bins_np = bins.to_numpy() + elif bin_method.lower() == BinningMethod.LOG_BUCKET.lower(): + bin_o, bins = md.cut(mt.log(in_md[col_name]), bins=bins, labels=False, retbins=True) + bins_np = np.exp(bins.to_numpy()) + else: + raise ValueError("Unsupport binning method: {}".format(bin_method)) + + return bin_o, bins_np + + +def cumsum(arr, reverse): + if type(arr) == np.ndarray: + sum_arr = arr + elif type(arr) == pd.DataFrame: + sum_arr = arr.to_numpy() + else: + raise ValueError("Invalid input type: {}".format(type(arr))) + + for i in range(np.ndim(arr)): + sum_arr = np.flip(np.cumsum(np.flip(sum_arr, i), i), i) if reverse else np.cumsum(sum_arr, i) + + if type(arr) == np.ndarray: + return sum_arr + elif type(arr) == pd.DataFrame: + return pd.DataFrame(sum_arr) + else: + raise ValueError("Invalid input type: {}".format(type(arr))) + + +def calc_binning_stats( + in_md, + sel_cols, + bin_methods, + bin_nums, + cols_bin_boundaries, + reverse_cumsum): + cols_bin_stats = [] + for i in range(len(sel_cols)): + sel_col = sel_cols[i] + bin_o, bins = binning(in_md, sel_col, bin_methods[i], bin_nums[i], cols_bin_boundaries.get(sel_col, None)) + bin_num = len(bins) - 1 + bin_prob_df = bin_o.value_counts(normalize=True).to_pandas().to_frame() + bin_prob_df = bin_prob_df.reindex(range(bin_num), fill_value=0) + bin_cumsum_prob_df = cumsum(bin_prob_df, reverse_cumsum) + + cols_bin_stats.append( + { + "name": sel_col, + "bin_boundaries": ','.join(bins.astype(str)), + "bin_prob": ','.join(bin_prob_df[bin_prob_df.columns[0]].to_numpy().astype(str)), + "bin_cumsum_prob": ','.join(bin_cumsum_prob_df[bin_cumsum_prob_df.columns[0]].to_numpy().astype(str)) + } + ) + + return pd.DataFrame(cols_bin_stats) + + +def calc_basic_stats( + in_md, + sel_cols): + stats_data = [ + { + "name": sel_col, + "min": mt.min(in_md[sel_col]).to_numpy(), + "max": mt.max(in_md[sel_col]).to_numpy(), + "mean": mt.mean(in_md[sel_col]).to_numpy(), + "median": mt.median(in_md[sel_col]).to_numpy(), + "std": mt.std(in_md[sel_col]).to_numpy(), + } for sel_col in sel_cols + ] + + return pd.DataFrame(stats_data) + + +def calc_stats( + in_md, + sel_cols, + bin_methods, + bin_nums, + cols_bin_boundaries, + reverse_cumsum): + basic_stats_df = calc_basic_stats(in_md, sel_cols) + cols_bin_stats_df = calc_binning_stats(in_md, sel_cols, bin_methods, bin_nums, cols_bin_boundaries, reverse_cumsum) + + stats_df = pd.merge(basic_stats_df, cols_bin_stats_df, how='inner', on='name') + + return stats_df + + +def calc_two_dim_binning_stats( + in_md, + sel_col_1, + sel_col_2, + bin_method_1, + bin_method_2, + bin_num_1, + bin_num_2, + bin_boundaries_1, + bin_boundaries_2, + reverse_cumsum): + bin_o1, bins_1 = binning(in_md, sel_col_1, bin_method_1, bin_num_1, bin_boundaries_1) + bin_o2, bins_2 = binning(in_md, sel_col_2, bin_method_2, bin_num_2, bin_boundaries_2) + + bin_num_1 = len(bins_1) - 1 + bin_num_2 = len(bins_2) - 1 + + bin_o = bin_o1 * bin_num_2 + bin_o2 + bin_prob_df = bin_o.value_counts(normalize=True).to_pandas().to_frame() + bin_prob_df = bin_prob_df.reindex(range(bin_num_1 * bin_num_2), fill_value=0) + two_dim_bin_prob_np = bin_prob_df.to_numpy().reshape((bin_num_1, bin_num_2)) + two_dim_bin_cumsum_prob_np = cumsum(two_dim_bin_prob_np, reverse_cumsum) + + return pd.DataFrame(two_dim_bin_prob_np), pd.DataFrame(two_dim_bin_cumsum_prob_np) + + +def get_cols_bin_boundaries(stats_df): + col_boundaries = {} + for _, row in stats_df.iterrows(): + col_name = row['name'] + boundaries = [float(item) for item in row['bin_boundaries'].split(',')] + col_boundaries[col_name] = boundaries + + return col_boundaries diff --git a/runnables/bin/psi_calculator.py b/runnables/bin/psi_calculator.py new file mode 100644 index 0000000..90aa43b --- /dev/null +++ b/runnables/bin/psi_calculator.py @@ -0,0 +1,36 @@ +import numpy as np +import pandas as pd + + +def calc_psi_per_bin( + expected_prob, + actual_prob): + FALLBACK_VALUE = 0.001 + expected_prob = FALLBACK_VALUE if expected_prob == 0.0 else expected_prob + actual_prob = FALLBACK_VALUE if actual_prob == 0.0 else actual_prob + + return (expected_prob - actual_prob) * np.log(expected_prob * 1.0 / actual_prob) + + +def calc_psi( + expected_bin_probs, + actual_bin_probs): + assert(len(expected_bin_probs) == len(actual_bin_probs)) + + result = 0.0 + for i in range(len(expected_bin_probs)): + result += calc_psi_per_bin(expected_bin_probs[i], actual_bin_probs[i]) + + return result + + +def get_cols_bin_probs( + stats_df, + bin_prob_column_name): + col_bin_probs = {} + for _, row in stats_df.iterrows(): + col_name = row['name'] + bin_probs = [float(item) for item in row[bin_prob_column_name].split(',')] + col_bin_probs[col_name] = bin_probs + + return col_bin_probs diff --git a/runnables/binning.py b/runnables/binning.py new file mode 100644 index 0000000..f7e1191 --- /dev/null +++ b/runnables/binning.py @@ -0,0 +1,84 @@ +import argparse +import mars.dataframe as md +import os +import pandas as pd +from bin.binning_calculator import calc_stats, calc_two_dim_binning_stats, get_cols_bin_boundaries +from run_io.db_adapter import convertDSNToRfc1738 +from sqlalchemy import create_engine + + +def build_argument_parser(): + parser = argparse.ArgumentParser(allow_abbrev=False) + parser.add_argument("--dbname", type=str, required=True) + parser.add_argument("--columns", type=str, required=True) + parser.add_argument("--bin_method", type=str, required=False) + parser.add_argument("--bin_num", type=str, required=False) + parser.add_argument("--bin_input_table", type=str, required=False) + parser.add_argument("--reverse_cumsum", type=bool, default=False) + parser.add_argument("--two_dim_bin_cols", type=str, required=False) + + return parser + + +if __name__ == "__main__": + parser = build_argument_parser() + args, _ = parser.parse_known_args() + columns = args.columns.split(',') + bin_method_array = args.bin_method.split(',') if args.bin_method else None + bin_num_array = [int(item) for item in args.bin_num.split(',')] if args.bin_num else None + + select_input = os.getenv("SQLFLOW_TO_RUN_SELECT") + output = os.getenv("SQLFLOW_TO_RUN_INTO") + output_tables = output.split(',') + datasource = os.getenv("SQLFLOW_DATASOURCE") + + assert len(output_tables) == 1, "The output tables shouldn't be null and can contain only one." + + url = convertDSNToRfc1738(datasource, args.dbname) + engine = create_engine(url) + input_md = md.read_sql( + sql=select_input, + con=engine) + input_md.execute() + + cols_bin_boundaries = {} + if args.bin_input_table: + print("Get provided bin boundaries from table {}".format(args.bin_input_table)) + bin_input_df = pd.read_sql_table( + table_name=args.bin_input_table, + con=engine) + cols_bin_boundaries = get_cols_bin_boundaries(bin_input_df) + + if set(columns) > cols_bin_boundaries.keys(): + raise ValueError("The provided bin boundaries contains keys: {}. But they cannot cover all the \ + input columns: {}".format(cols_bin_boundaries.keys(), columns)) + + print("Ignore the bin_num and bin_method arguments") + bin_num_array = [None] * len(columns) + bin_method_array = [None] * len(columns) + else: + if len(bin_num_array) == 1: + bin_num_array = bin_num_array * len(columns) + else: + assert(len(bin_num_array) == len(columns)) + + if len(bin_method_array) == 1: + bin_method_array = bin_method_array * len(columns) + else: + assert(len(bin_method_array) == len(columns)) + + print("Calculate the statistics result for columns: {}".format(columns)) + stats_df = calc_stats( + input_md, + columns, + bin_method_array, + bin_num_array, + cols_bin_boundaries, + args.reverse_cumsum) + + print("Persist the statistics result into the table {}".format(output_tables[0])) + stats_df.to_sql( + name=output_tables[0], + con=engine, + index=False + ) diff --git a/runnables/extract_ts_features.py b/runnables/extract_ts_features.py new file mode 100644 index 0000000..e168a8c --- /dev/null +++ b/runnables/extract_ts_features.py @@ -0,0 +1,58 @@ +import argparse +import os +import pandas as pd +from run_io.db_adapter import convertDSNToRfc1738 +from sqlalchemy import create_engine +from time_series_processing.ts_feature_extractor import add_features_extracted_from_ts_data, add_lag_columns + + +def build_argument_parser(): + parser = argparse.ArgumentParser(allow_abbrev=False) + parser.add_argument("--dbname", type=str, required=True) + parser.add_argument("--column_id", type=str, required=True) + parser.add_argument("--column_time", type=str, required=True) + parser.add_argument("--columns_value", type=str, required=True) + parser.add_argument("--lag_num", type=int, default=1) + parser.add_argument("--windows", type=str, required=True) + parser.add_argument("--min_window", type=str, default=0) + parser.add_argument("--extract_setting", type=str, default="minimal", choices=["minimal", "efficient", "comprehensive"]) + + return parser + + +if __name__ == "__main__": + parser = build_argument_parser() + args, _ = parser.parse_known_args() + columns_value = args.columns_value.split(',') + windows = [int(item) for item in args.windows.split(',')] + + select_input = os.getenv("SQLFLOW_TO_RUN_SELECT") + output = os.getenv("SQLFLOW_TO_RUN_INTO") + datasource = os.getenv("SQLFLOW_DATASOURCE") + + url = convertDSNToRfc1738(datasource, args.dbname) + engine = create_engine(url) + input = pd.read_sql( + sql=select_input, + con=engine) + + df_with_lag_columns, lag_column_names = add_lag_columns(input, columns_value, args.lag_num) + + print("Start extracting the features from the time series data.") + df_with_extracted_features = add_features_extracted_from_ts_data( + df_with_lag_columns, + column_id=args.column_id, + column_time=args.column_time, + columns_value=lag_column_names, + windows=windows, + min_window=args.min_window, + extract_setting=args.extract_setting) + print("Complete the feature extraction.") + + df_with_extracted_features = df_with_extracted_features.drop(columns=lag_column_names) + + df_with_extracted_features.to_sql( + name=output, + con=engine, + index=False) + print("Complete save the result data into table {}.".format(output)) diff --git a/runnables/psi.py b/runnables/psi.py new file mode 100644 index 0000000..a0ef50a --- /dev/null +++ b/runnables/psi.py @@ -0,0 +1,60 @@ +import argparse +import os +import pandas as pd +from bin.psi_calculator import calc_psi, get_cols_bin_probs +from run_io.db_adapter import convertDSNToRfc1738 +from sqlalchemy import create_engine + + +def build_argument_parser(): + parser = argparse.ArgumentParser(allow_abbrev=False) + parser.add_argument("--dbname", type=str, required=True) + parser.add_argument("--refer_stats_table", type=str, required=True) + parser.add_argument("--bin_prob_column", type=str, default="bin_prob") + + return parser + + +if __name__ == "__main__": + parser = build_argument_parser() + args, _ = parser.parse_known_args() + + select_input = os.getenv("SQLFLOW_TO_RUN_SELECT") + output = os.getenv("SQLFLOW_TO_RUN_INTO") + datasource = os.getenv("SQLFLOW_DATASOURCE") + + url = convertDSNToRfc1738(datasource, args.dbname) + engine = create_engine(url) + + input_df = pd.read_sql( + sql=select_input, + con=engine) + refer_stats_df = pd.read_sql_table( + table_name=args.refer_stats_table, + con=engine) + + actual_cols_bin_probs = get_cols_bin_probs(input_df, args.bin_prob_column) + expected_cols_bin_probs = get_cols_bin_probs(input_df, args.bin_prob_column) + + common_column_names = set.intersection( + set(actual_cols_bin_probs.keys()), + set(expected_cols_bin_probs.keys())) + + print("Calculate the PSI value for {} fields.".format(len(common_column_names))) + cols_psi_data = [] + for column_name in common_column_names: + psi_value = calc_psi(actual_cols_bin_probs[column_name], expected_cols_bin_probs[column_name]) + cols_psi_data.append( + { + "name": column_name, + "psi": psi_value + } + ) + cols_psi_df = pd.DataFrame(cols_psi_data) + + print("Persist the PSI result into the table {}".format(output)) + cols_psi_df.to_sql( + name=output, + con=engine, + index=False + ) diff --git a/runnables/requirements.txt b/runnables/requirements.txt new file mode 100644 index 0000000..366ffee --- /dev/null +++ b/runnables/requirements.txt @@ -0,0 +1,5 @@ +tsfresh==0.16.0 +sqlalchemy==1.3.19 +mysql==0.0.2 +pymars==0.5.1 +pandas>=1.0.0 \ No newline at end of file diff --git a/runnables/run_io/__init__.py b/runnables/run_io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/runnables/run_io/db_adapter.py b/runnables/run_io/db_adapter.py new file mode 100644 index 0000000..4f779c4 --- /dev/null +++ b/runnables/run_io/db_adapter.py @@ -0,0 +1,26 @@ +import re + +def parseMySQLDSN(dsn): + # [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] + pattern = "^(\w*):(\w*)@tcp\(([.a-zA-Z0-9\-]*):([0-9]*)\)/(\w*)(\?.*)?$" # noqa: W605, E501 + found_result = re.findall(pattern, dsn) + user, passwd, host, port, database, config_str = found_result[0] + config = {} + if len(config_str) > 1: + for c in config_str[1:].split("&"): + k, v = c.split("=") + config[k] = v + return user, passwd, host, port, database, config + +# TODO(brightcoder01): Should we put this kind of common method +# in sqlflow runtime? While writing the runnable code, users can +# import the runtime library. +def convertDSNToRfc1738(driver_dsn, defaultDbName): + driver, dsn = driver_dsn.split("://") + user, passwd, host, port, database, config = parseMySQLDSN(dsn) + + if not database: + database = defaultDbName + + # mysql://root:root@127.0.0.1:3306/dbname + return "{}://{}:{}@{}:{}/{}".format(driver, user, passwd, host, port, database) diff --git a/runnables/time_series_processing/__init__.py b/runnables/time_series_processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/runnables/time_series_processing/ts_feature_extractor.py b/runnables/time_series_processing/ts_feature_extractor.py new file mode 100644 index 0000000..7c12bd0 --- /dev/null +++ b/runnables/time_series_processing/ts_feature_extractor.py @@ -0,0 +1,144 @@ +import pandas as pd +from functools import reduce +from tsfresh import extract_features +from tsfresh.feature_extraction.settings import MinimalFCParameters, ComprehensiveFCParameters, EfficientFCParameters +from tsfresh.utilities.dataframe_functions import roll_time_series + + +EXTRACT_SETTING_NAME_TO_CLASS_DICT = { + "minimal": MinimalFCParameters, + "efficient": EfficientFCParameters, + "comprehensive": ComprehensiveFCParameters +} + +ROLLED_TS_ID_COLUMN_NAME = "id" +ORIGIN_JOIN_ID_COLUMN_NAME = "join_id" +ROLLED_TS_ID_FORMAT = "id={},timeshift={}" + + +def _roll_ts_and_extract_features( + input, + column_id, + column_time, + columns_value, + max_window, + min_window, + extract_setting): + rolled_ts = roll_time_series( + input, + column_id=column_id, + column_kind=None, + column_sort=column_time, + rolling_direction=1, + max_timeshift=max_window, + min_timeshift=min_window, + n_jobs=0) + + rename_columns = { + value_column: "{}_w_{}".format(value_column, max_window) + for value_column in columns_value + } + rolled_ts = rolled_ts.rename(columns=rename_columns) + rolled_ts = rolled_ts.drop(columns=[column_id]) + + extract_setting_clz = EXTRACT_SETTING_NAME_TO_CLASS_DICT.get(extract_setting, MinimalFCParameters) + extracted_features = extract_features( + rolled_ts, + column_id=ROLLED_TS_ID_COLUMN_NAME, + column_sort=column_time, + n_jobs=0, + default_fc_parameters=extract_setting_clz()) + + return extracted_features + + +def add_lag_columns( + input, + columns_value, + lag_num): + lag_column_names = [] + for column_value in columns_value: + lag_column_name = "{}_lag_{}".format(column_value, lag_num) + input[lag_column_name] = input[column_value].shift(lag_num) + lag_column_names.append(lag_column_name) + + return input[lag_num:], lag_column_names + + +def add_features_extracted_from_ts_data( + input, + column_id, + column_time, + columns_value, + windows, + min_window=0, + extract_setting="minimal"): + """Extract features from the time series data and append them to the + original data. + + Build the rolled time series data with various window sizes, extract + the features using TSFresh and then append the derived features to + the original data. + + Args: + input: A pandas DataFrame for the input data. + column_id: The name of the id column to group by the time series data. + The input data can contain the time series for various entities. + For example, the UV for different websites. + column_time: The name of the time column. + columns_value: Array. The names of the columns for the time series data. + windows: Array of window sizes. The time series data will be rolled with + each window size. + min_window: The extract forecast windows smaller or equal than this will + be throwed away. + extract_setting: minimal | efficient | comprehensive. Control which features + will be extracted. The order of feature numbers is: + minimal < efficient < comprehensive + + Returns: + A pandas DataFrame containing the original input data and extracted features. + """ + + input_with_join_id = pd.DataFrame() + input_with_join_id[ORIGIN_JOIN_ID_COLUMN_NAME] = input.apply( + lambda row: ROLLED_TS_ID_FORMAT.format(row[column_id], row[column_time]), + axis=1) + + input_with_join_id = pd.concat( + [input, input_with_join_id], + axis=1) + + input = input[[column_id, column_time] + columns_value] + input.sort_values(by=[column_id, column_time]) + + extracted_features_multi_windows = [ + _roll_ts_and_extract_features( + input=input, + column_id=column_id, + column_time=column_time, + columns_value=columns_value, + max_window=window, + min_window=min_window, + extract_setting=extract_setting + ) for window in windows + ] + + extracted_features_multi_windows = reduce(lambda left, right: pd.merge( + left=left, + right=right, + how="left", + on=ROLLED_TS_ID_COLUMN_NAME + ), extracted_features_multi_windows) + + original_data_with_extracted_features = pd.merge( + input_with_join_id, + extracted_features_multi_windows, + how='inner', + left_on=ORIGIN_JOIN_ID_COLUMN_NAME, + right_on=ROLLED_TS_ID_COLUMN_NAME + ) + + original_data_with_extracted_features.sort_values(by=[column_id, column_time]) + original_data_with_extracted_features = original_data_with_extracted_features.drop(columns=[ORIGIN_JOIN_ID_COLUMN_NAME]) + + return original_data_with_extracted_features diff --git a/runnables/two_dim_binning.py b/runnables/two_dim_binning.py new file mode 100644 index 0000000..b74baf0 --- /dev/null +++ b/runnables/two_dim_binning.py @@ -0,0 +1,111 @@ +import argparse +import mars.dataframe as md +import os +import pandas as pd +from bin.binning_calculator import calc_stats, calc_two_dim_binning_stats, get_cols_bin_boundaries +from run_io.db_adapter import convertDSNToRfc1738 +from sqlalchemy import create_engine + + +def build_argument_parser(): + parser = argparse.ArgumentParser(allow_abbrev=False) + parser.add_argument("--dbname", type=str, required=True) + parser.add_argument("--columns", type=str, required=True) + parser.add_argument("--bin_method", type=str, required=False) + parser.add_argument("--bin_num", type=str, required=False) + parser.add_argument("--bin_input_table", type=str, required=False) + parser.add_argument("--reverse_cumsum", type=bool, default=False) + + return parser + + +if __name__ == "__main__": + parser = build_argument_parser() + args, _ = parser.parse_known_args() + columns = args.columns.split(',') + bin_method_array = args.bin_method.split(',') if args.bin_method else None + bin_num_array = [int(item) for item in args.bin_num.split(',')] if args.bin_num else None + + select_input = os.getenv("SQLFLOW_TO_RUN_SELECT") + output = os.getenv("SQLFLOW_TO_RUN_INTO") + output_tables = output.split(',') + datasource = os.getenv("SQLFLOW_DATASOURCE") + + # Check arguments + assert len(columns) == 2, "The column number should only be 2" + assert len(output_tables) == 3, "The output table number should only be 3" + + url = convertDSNToRfc1738(datasource, args.dbname) + engine = create_engine(url) + input_md = md.read_sql( + sql=select_input, + con=engine) + input_md.execute() + + cols_bin_boundaries = {} + if args.bin_input_table: + print("Get provided bin boundaries from table {}".format(args.bin_input_table)) + bin_input_df = pd.read_sql_table( + table_name=args.bin_input_table, + con=engine) + cols_bin_boundaries = get_cols_bin_boundaries(bin_input_df) + + if set(columns) > cols_bin_boundaries.keys(): + raise ValueError("The provided bin boundaries contains keys: {}. But they cannot cover all the \ + input columns: {}".format(cols_bin_boundaries.keys(), columns)) + + print("Ignore the bin_num and bin_method arguments") + bin_num_array = [None] * len(columns) + bin_method_array = [None] * len(columns) + else: + if len(bin_num_array) == 1: + bin_num_array = bin_num_array * len(columns) + else: + assert(len(bin_num_array) == len(columns)) + + if len(bin_method_array) == 1: + bin_method_array = bin_method_array * len(columns) + else: + assert(len(bin_method_array) == len(columns)) + + print("Calculate the statistics result for columns: {}".format(columns)) + stats_df = calc_stats( + input_md, + columns, + bin_method_array, + bin_num_array, + cols_bin_boundaries, + args.reverse_cumsum) + + print("Persist the statistics result into the table {}".format(output_tables[0])) + stats_df.to_sql( + name=output_tables[0], + con=engine, + index=False + ) + + print("Calculate two dimension binning result for columns: {}".format(columns)) + bin_prob_df, bin_cumsum_prob_df = calc_two_dim_binning_stats( + input_md, + columns[0], + columns[1], + bin_method_array[0], + bin_method_array[1], + bin_num_array[0], + bin_num_array[1], + cols_bin_boundaries.get(columns[0], None), + cols_bin_boundaries.get(columns[1], None), + args.reverse_cumsum) + + print("Persist the binning probabilities into table {}".format(output_tables[1])) + bin_prob_df.to_sql( + name=output_tables[1], + con=engine, + index=False + ) + print("Persist the binning accumulated probabilities into table {}".format(output_tables[2])) + bin_cumsum_prob_df.to_sql( + name=output_tables[2], + con=engine, + index=False + ) diff --git a/scripts/data/iris.recordio b/scripts/data/iris.recordio new file mode 100644 index 0000000..83bccd4 Binary files /dev/null and b/scripts/data/iris.recordio differ diff --git a/scripts/elasticdl_travis_test_job.sh b/scripts/elasticdl_travis_test_job.sh new file mode 100644 index 0000000..583cb9c --- /dev/null +++ b/scripts/elasticdl_travis_test_job.sh @@ -0,0 +1,22 @@ +if [ "$SQLFLOW_TEST_DB_MAXCOMPUTE_AK" = "" ] || [ "$SQLFLOW_TEST_DB_MAXCOMPUTE_SK" == "" ]; then + echo "skip maxcompute test because the env SQLFLOW_TEST_DB_MAXCOMPUTE_AK or SQLFLOW_TEST_DB_MAXCOMPUTE_SK is empty" + exit 0 +fi + +curl -s https://raw.githubusercontent.com/sql-machine-learning/elasticdl/4a995fe7eaf91bc5a9d50181e9aaaa14d15c8a09/scripts/setup_k8s_env.sh | bash +kubectl apply -f https://raw.githubusercontent.com/sql-machine-learning/elasticdl/develop/elasticdl/manifests/examples/elasticdl-rbac.yaml + +docker run --rm -it --net=host \ + -v $HOME/.kube:/root/.kube \ + -v /home/$USER/.minikube/:/home/$USER/.minikube/ \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v $PWD:/workspace \ + -e ODPS_ACCESS_ID=$MAXCOMPUTE_AK \ + -e ODPS_ACCESS_KEY=$MAXCOMPUTE_SK \ + sqlflow/sqlflow_models bash /workspace/scripts/test_elasticdl_submit.sh + +docker run --rm -it --net=host \ + -v $HOME/.kube:/root/.kube \ + -v /home/$USER/.minikube/:/home/$USER/.minikube/ \ + sqlflow/sqlflow_models \ + bash -c "curl -s https://raw.githubusercontent.com/sql-machine-learning/elasticdl/62b255a918df5b6594c888b19aebbcc74bbce6e4/scripts/validate_job_status.py | python - odps 1 2" diff --git a/scripts/test_elasticdl_submit.sh b/scripts/test_elasticdl_submit.sh new file mode 100644 index 0000000..09413e9 --- /dev/null +++ b/scripts/test_elasticdl_submit.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +elasticdl train --image_base=sqlflow/sqlflow_models \ +--model_def=dnnclassifier.DNNClassifier \ +--training_data=sqlflow_test_iris_train \ +--data_reader_params='columns=["sepal_length", "sepal_width", "petal_length", "petal_width", "class"];label_col="class"' \ +--envs="ODPS_PROJECT_NAME=gomaxcompute_driver_w7u,ODPS_ACCESS_ID=$ODPS_ACCESS_ID,ODPS_ACCESS_KEY=$ODPS_ACCESS_KEY" \ +--minibatch_size=32 \ +--num_epochs=2 \ +--model_zoo=/sqlflow_models \ +--job_name=test-odps \ +--num_minibatches_per_task=2 \ +--image_pull_policy=Never \ +--num_workers=2 \ +--num_ps_pods=1 \ +--master_resource_request="cpu=200m,memory=128Mi" \ +--master_resource_limit="cpu=1,memory=2048Mi" \ +--worker_resource_request="cpu=200m,memory=128Mi" \ +--worker_resource_limit="cpu=1,memory=3072Mi" \ +--ps_resource_request="cpu=200m,memory=128Mi" \ +--ps_resource_limit="cpu=1,memory=2048Mi" \ +--grads_to_wait=2 \ +--output=model_output diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..9abcdc0 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[aliases] +test=pytest + +[tool:pytest] +rootdir=tests diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3dea1c9 --- /dev/null +++ b/setup.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Note: To use the 'upload' functionality of this file, you must: +# $ pip install twine + +import io +import os +import sys +from shutil import rmtree + +from setuptools import find_packages, setup, Command + +# Package meta-data. +NAME = 'sqlflow_models' +DESCRIPTION = 'Premade Models for SQLFlow.' +URL = 'https://github.com/sql-machine-learning/models' +EMAIL = 'yzhdoudou@gmail.com' +AUTHOR = 'Yang Yang' +REQUIRES_PYTHON = '>=3.6.0' +VERSION = None + +# What packages are required for this module to be executed? +REQUIRED = [ + 'protobuf==3.7.1', + 'tensorflow==2.0.1', + 'scikit-learn==0.21.0', + 'numpy==1.16.2', + 'pandas>=0.25.1', + 'adanet==0.8.0', + "tensorflow-datasets==3.0.0", + "statsmodels==0.11.1", + "scipy==1.4.1", + "tensorflow-metadata<0.23.0", +] + +SETUP_REQUIRED = [ + 'pytest-runner' +] +TEST_REQUIRED = [ + 'pytest', +] + +# What packages are optional? +EXTRAS = { +} + +# The rest you shouldn't have to touch too much :) +# ------------------------------------------------ +# Except, perhaps the License and Trove Classifiers! +# If you do change the License, remember to change the Trove Classifier for that! + +here = os.path.abspath(os.path.dirname(__file__)) + +# Import the README and use it as the long-description. +# Note: this will only work if 'README.md' is present in your MANIFEST.in file! +try: + with io.open(os.path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = '\n' + f.read() +except FileNotFoundError: + long_description = DESCRIPTION + +# Load the package's __version__.py module as a dictionary. +about = {} +if not VERSION: + with open(os.path.join(here, NAME, '_version.py')) as f: + exec(f.read(), about) +else: + about['__version__'] = VERSION + + +class UploadCommand(Command): + """Support setup.py upload.""" + + description = 'Build and publish the package.' + user_options = [] + + @staticmethod + def status(s): + """Prints things in bold.""" + print('\033[1m{0}\033[0m'.format(s)) + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + try: + self.status('Removing previous builds…') + rmtree(os.path.join(here, 'dist')) + except OSError: + pass + + self.status('Building Source and Wheel (universal) distribution…') + os.system('{0} setup.py sdist bdist_wheel --universal'.format(sys.executable)) + + self.status('Uploading the package to PyPI via Twine…') + os.system('twine upload dist/*') + + self.status('Pushing git tags…') + os.system('git tag v{0}'.format(about['__version__'])) + os.system('git push --tags') + + sys.exit() + + +# Where the magic happens: +setup( + name=NAME, + version=about['__version__'], + description=DESCRIPTION, + long_description=long_description, + long_description_content_type='text/markdown', + author=AUTHOR, + author_email=EMAIL, + python_requires=REQUIRES_PYTHON, + url=URL, + packages=find_packages(exclude=('tests',)), + # If your package is a single module, use this instead of 'packages': + # py_modules=['mypackage'], + + # entry_points={ + # 'console_scripts': ['mycli=mymodule:cli'], + # }, + install_requires=REQUIRED, + setup_requires=SETUP_REQUIRED, + tests_require=TEST_REQUIRED, + extras_require=EXTRAS, + license='Apache License 2.0', + classifiers=[ + # Trove classifiers + # Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: Implementation :: CPython', + 'Programming Language :: Python :: Implementation :: PyPy' + ], + # $ setup.py publish support. + cmdclass={ + 'upload': UploadCommand, + }, + zip_safe=False, +) diff --git a/sqlflow_models/Dockerfile b/sqlflow_models/Dockerfile new file mode 100644 index 0000000..b2afc36 --- /dev/null +++ b/sqlflow_models/Dockerfile @@ -0,0 +1,4 @@ +FROM sqlflow/modelzoo_base + +RUN pip install tensorflow==2.0.0 scikit-learn==0.21.0 numpy==1.16.2 pandas==0.25.1 +ADD *.py /sqlflow_models/ diff --git a/sqlflow_models/__init__.py b/sqlflow_models/__init__.py new file mode 100755 index 0000000..8666158 --- /dev/null +++ b/sqlflow_models/__init__.py @@ -0,0 +1,20 @@ +import traceback +from ._version import __version__ +from .dnnclassifier import DNNClassifier +from .dnnregressor import DNNRegressor +from .rnnclassifier import StackedRNNClassifier +from .deep_embedding_cluster import DeepEmbeddingClusterModel +from .dnnclassifier_functional_api_example import dnnclassifier_functional_model +from .rnn_based_time_series import RNNBasedTimeSeriesModel +from .auto_estimator import AutoClassifier, AutoRegressor +from .score_card import ScoreCard +from .native_keras import RawDNNClassifier +from .custom_model_example import CustomClassifier +from .gcn import GCN +from .one_class_svm import OneClassSVM +try: + # NOTE: statsmodels have version conflict on PAI + from .arima_with_stl_decomposition import ARIMAWithSTLDecomposition +except: + print("model ARIMAWithSTLDecomposition is not imported") + traceback.print_exc() diff --git a/sqlflow_models/_version.py b/sqlflow_models/_version.py new file mode 100755 index 0000000..7341562 --- /dev/null +++ b/sqlflow_models/_version.py @@ -0,0 +1,3 @@ +VERSION = (0, 1, 0) + +__version__ = '.'.join(map(str, VERSION)) diff --git a/sqlflow_models/arima_with_stl_decomposition.py b/sqlflow_models/arima_with_stl_decomposition.py new file mode 100644 index 0000000..a4bd602 --- /dev/null +++ b/sqlflow_models/arima_with_stl_decomposition.py @@ -0,0 +1,124 @@ +import numpy as np +import six +from statsmodels.tsa.arima_model import ARIMA +from statsmodels.tsa.seasonal import STL +from datetime import datetime +import tensorflow as tf +import pandas as pd + +class ARIMAWithSTLDecomposition(tf.keras.Model): + def __init__(self, + order, + period, + date_format, + forecast_start, + forecast_end, + **kwargs): + super(ARIMAWithSTLDecomposition, self).__init__() + + self.order = order + if not isinstance(period, (list, tuple)): + period = period + self.period = period + self.date_format = date_format + self.forecast_start = self._str2date(forecast_start) + self.forecast_end = self._str2date(forecast_end) + self.seasonal = [] + self.kwargs = kwargs + + def _str2date(self, date_str): + if isinstance(date_str, bytes): + date_str = date_str.decode('utf-8') + return datetime.strptime(str(date_str), self.date_format) + + def _read_all_data(self, dataset): + data = None + for batch_idx, items in enumerate(dataset): + if data is None: + data = [[] for _ in six.moves.range(len(items))] + + for i, item in enumerate(items): + if isinstance(item, dict): + assert len(item) == 1 + dict_values = list(item.values()) + item = dict_values[0] + + if isinstance(item, tf.Tensor): + item = item.numpy() + + item = np.reshape(item, [-1]).tolist() + data[i].extend(item) + + dates, values = data + sorted_dates_index = sorted(range(len(dates)), key=lambda k: dates[k]) + dates = np.array([self._str2date(dates[i]) for i in sorted_dates_index]) + values = np.array([values[i] for i in sorted_dates_index]).astype('float32') + + return dates, values + + def _stl_decompose(self, values): + left_values = values + self.seasonal = [] + for p in self.period: + stl_model = STL(left_values, period=p).fit() + seasonal = np.array(stl_model.seasonal) + self.seasonal.append(seasonal) + left_values -= seasonal + + return left_values + + def _addup_seasonal(self, dates, values): + time_interval = dates[1] - dates[0] + start_interval = self.forecast_start - dates[0] + start_index = int(start_interval.total_seconds() / time_interval.total_seconds()) + + length = len(values) + + for p, seasonal in six.moves.zip(self.period, self.seasonal): + if length % p == 0: + offset = length + else: + offset = (int(length / p) + 1) * p + + idx = start_index - offset + values += seasonal[idx:idx+length] + + return values + + def _normalize(self, values): + min_value = np.min(values) + max_value = np.max(values) + values = (values - min_value) / (max_value - min_value) + return values, min_value, max_value + + def print_prediction_result(self, prediction, interval): + t_strs = [] + for i, p in enumerate(prediction): + t = self.forecast_start + i * interval + t_str = datetime.strftime(t, self.date_format) + t_strs.append(t_str) + + df = pd.DataFrame(data={'time': t_strs, 'prediction': prediction}) + with pd.option_context('display.max_columns', None): + print(df) + + def sqlflow_train_loop(self, dataset): + dates, values = self._read_all_data(dataset) + + left_values = self._stl_decompose(values) + left_values, min_value, max_value = self._normalize(left_values) + + model = ARIMA(left_values, order=self.order, dates=dates).fit(disp=-1) + + prediction = model.predict(start=self.forecast_start, end=self.forecast_end, typ='levels') + + prediction = prediction * (max_value - min_value) + min_value + prediction = self._addup_seasonal(dates, prediction) + self.print_prediction_result(prediction, interval=dates[1] - dates[0]) + return prediction + +def loss(*args, **kwargs): + return None + +def optimizer(*args, **kwargs): + return None diff --git a/sqlflow_models/auto_estimator.py b/sqlflow_models/auto_estimator.py new file mode 100644 index 0000000..41d96b0 --- /dev/null +++ b/sqlflow_models/auto_estimator.py @@ -0,0 +1,116 @@ +from __future__ import absolute_import, division, print_function, unicode_literals +from collections import defaultdict + +import absl +import logging +import tensorflow as tf +import warnings + +absl.logging.set_verbosity(absl.logging.ERROR) +tf.get_logger().setLevel(logging.ERROR) +tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) +warnings.warn = lambda *args, **kargs:None +import adanet + +from tensorflow import keras +from tensorflow_estimator.python.estimator.canned import optimizers +from .simple_dnn_generator import SimpleDNNGenerator + + +LEARN_MIXTURE_WEIGHTS=True +RANDOM_SEED = 42 + +class AutoClassifier(adanet.Estimator): + def __init__(self, feature_columns, layer_size=50, optimizer='Adagrad', linear_optimizer='Ftrl', + model_dir=None, n_classes=2, activation_fn=tf.nn.relu, complexity_penalty=0.01, + search_every_n_steps=1000, max_iterations=10, config=None): + """AutoClassifier + :param feature_columns: Feature columns. + :type feature_columns: list[tf.feature_column]. + :param layer_size: Number of hidden_units in each layers. + :type layer_size: int. + :param n_classes: Number of label classes. Defaults to 2, namely binary classification. + :type n_classes: int. + :param optimizer: Optimizer for the the neural multi-layer parts of the generated network. + :type optimizer: str. + :param linear_optimizer: Optimizer for the linear part of the generated network. + :type linear_optimizer: str. + :param model_dir: Directory to save or restore model checkpoints. + :type model_dir: str. + :param activation_fn: Activation function. + :type activation_fn: function. + :param complexity_penalty: Regularization of the complexity of the network. + :type complexity_penalty: float. + :param search_every_n_steps: Search new architecture every n steps. + :type search_every_n_steps: int. + :param max_iterations: Max times of architecture searching. + :type max_iterations: int. + :param config: Estimator configuration. + :type config: dict. + """ + if n_classes == 2: + head = tf.estimator.BinaryClassHead() + else: + head = tf.estimator.MultiClassHead(n_classes=n_classes) + + opts= defaultdict(lambda: optimizers.get_optimizer_instance(optimizer, 0.001)) + opts[0] = optimizers.get_optimizer_instance(linear_optimizer, 0.1) + # Define the generator, which defines the search space of subnetworks + # to train as candidates to add to the final AdaNet model. + subnetwork_generator = SimpleDNNGenerator( + feature_columns=feature_columns, + layer_size=layer_size, + optimizers=opts, + learn_mixture_weights=LEARN_MIXTURE_WEIGHTS, + seed=RANDOM_SEED) + super(AutoClassifier, self).__init__(head=head, + model_dir=model_dir, + adanet_lambda=complexity_penalty, + subnetwork_generator=subnetwork_generator, + max_iteration_steps=search_every_n_steps, + max_iterations=max_iterations) + +class AutoRegressor(adanet.Estimator): + def __init__(self, feature_columns, layer_size=50, optimizer='Adagrad', linear_optimizer='Ftrl', + model_dir=None, activation_fn=tf.nn.relu, complexity_penalty=0.01, + search_every_n_steps=1000, max_iterations=10, config=None): + """AutoRegressor + :param feature_columns: Feature columns. + :type feature_columns: list[tf.feature_column]. + :param layer_size: Number of hidden_units in each layers. + :type layer_size: int. + :param optimizer: Optimizer for the the neural multi-layer parts of the generated network. + :type optimizer: str. + :param linear_optimizer: Optimizer for the linear part of the generated network. + :type linear_optimizer: str. + :param model_dir: Directory to save or restore model checkpoints. + :type model_dir: str. + :param activation_fn: Activation function. + :type activation_fn: function. + :param complexity_penalty: Regularization of the complexity of the network. + :type complexity_penalty: float. + :param search_every_n_steps: Search new architecture every n steps. + :type search_every_n_steps: int. + :param max_iterations: Max times of architecture searching. + :type max_iterations: int. + :param config: Estimator configuration. + :type config: dict. + """ + head = tf.estimator.RegressionHead() + + opts= defaultdict(lambda: optimizers.get_optimizer_instance(optimizer, 0.001)) + opts[0] = optimizers.get_optimizer_instance(linear_optimizer, 0.1) + # Define the generator, which defines the search space of subnetworks + # to train as candidates to add to the final AdaNet model. + subnetwork_generator = SimpleDNNGenerator( + feature_columns=feature_columns, + layer_size=layer_size, + optimizers=opts, + learn_mixture_weights=LEARN_MIXTURE_WEIGHTS, + seed=RANDOM_SEED) + super(AutoRegressor, self).__init__(head=head, + model_dir=model_dir, + adanet_lambda=complexity_penalty, + subnetwork_generator=subnetwork_generator, + max_iteration_steps=search_every_n_steps, + max_iterations=max_iterations) diff --git a/sqlflow_models/custom_model_example.py b/sqlflow_models/custom_model_example.py new file mode 100644 index 0000000..fe962cf --- /dev/null +++ b/sqlflow_models/custom_model_example.py @@ -0,0 +1,40 @@ +import tensorflow as tf +import random +import numpy as np + +class CustomClassifier(tf.keras.Model): + def __init__(self, feature_columns=None): + """The model init function. You can define any model parameter in the function's argument list. + You can also add custom training routines together with a Keras + model (see deep_embedding_cluster.py), or define a model with out Keras layers + (e.g. use sklearn or numpy only). + """ + pass + + def sqlflow_train_loop(self, x): + """The custom model traininig loop, input x is a tf.dataset object that generates training data. + """ + pass + + def sqlflow_predict_one(self, sample): + """Run prediction with one sample and return the prediction result. The result must be a + list of numpy array. SQLFlow determine the output type by: + - if the array have only one element, the model must be regression model. + - if the array have multiple elements: + - if the sum of all the elements are close to 1, it is likely to be a classification model. + - else the model is a regression model with multiple outputs. + """ + pos = random.random() + neg = 1 - pos + array = np.array([pos, neg]) + return [array] + + def sqlflow_evaluate_loop(self, x, metric_names): + """Run evaluation on the validation dataset and return a list of metrics. + NOTE: the first result metric is always loss. If no loss is defined, add 0. + """ + metric_len = len(metric_names) + result = [] + for i in range(metric_len+1): + result.append(random.random()) + return result diff --git a/sqlflow_models/deep_embedding_cluster.py b/sqlflow_models/deep_embedding_cluster.py new file mode 100644 index 0000000..5f9e5d5 --- /dev/null +++ b/sqlflow_models/deep_embedding_cluster.py @@ -0,0 +1,360 @@ +#!usr/bin/env python +# -*- coding:utf-8 _*- + +""" +__author__ : chenxiang +__email__ : alfredchenxiang@didichuxing.com +__file_name__ : deep_embedding_cluster.py +__create_time__ : 2019/09/03 +""" +from datetime import datetime +import tensorflow as tf +from tensorflow import keras +from tensorflow.python.data import make_one_shot_iterator +from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau +from tensorflow.keras.layers import Dense, Layer, DenseFeatures, InputSpec +from tensorflow.keras import backend +import numpy as np +from sklearn.cluster import KMeans +from tensorflow.keras.losses import kld +from tensorflow.keras.optimizers import SGD +import tensorflow_datasets as tfds +import pandas as pd + +_train_lr = 0.01 +_default_loss = kld + +class DeepEmbeddingClusterModel(keras.Model): + + def __init__(self, + feature_columns, + n_clusters=10, + kmeans_init=20, + run_pretrain=True, + existed_pretrain_model=None, + pretrain_dims=[100, 100, 10], + pretrain_activation_func='relu', + pretrain_use_callbacks=False, + pretrain_cbearlystop_patience=30, + pretrain_cbearlystop_mindelta=0.0001, + pretrain_cbreduce_patience=10, + pretrain_cbreduce_factor=0.1, + pretrain_epochs=30, + pretrain_initializer='glorot_uniform', + pretrain_lr=1, + train_lr=0.01, + train_max_iters=8000, + update_interval=100, + train_use_tol=True, + tol=0.0001, + loss=kld): + + """ + Implement cluster model mostly based on DEC. + :param feature_columns: a list of tf.feature_column + :param n_clusters: Number of clusters. + :param kmeans_init: Number of running K-Means to get best choice of centroids. + :param run_pretrain: Run pre-train process or not. + :param existed_pretrain_model: Path of existed pre-train model. Not used now. + :param pretrain_dims: Dims of layers which is used for build autoencoder. + :param pretrain_activation_func: Active function of autoencoder layers. + :param pretrain_use_callbacks: Use callbacks when pre-train or not. + :param pretrain_cbearlystop_patience: Patience value of EarlyStopping when use callbacks. + :param pretrain_cbearlystop_mindelta: Min_delta value of EarlyStopping when use callbacks. + :param pretrain_cbreduce_patience: Patience value of ReduceLROnPlateau when use callbacks. + :param pretrain_cbreduce_factor: Factor value of ReduceLROnPlateau when use callbacks. + :param pretrain_epochs: Number of epochs when pre-train. + :param pretrain_initializer: Initialize function for autoencoder layers. + :param pretrain_lr: learning rate to train the auto encoder. + :param train_lr: learning rate to train the cluster network. + :param train_max_iters: Number of iterations when train. + :param update_interval: Interval between updating target distribution. + :param train_use_tol: Use tolerance during clusteringlayer or not. + :param tol: Tolerance of earlystopping when train during clusteringlayer. + :param loss: Default 'kld' when init. + """ + global _train_lr + global _default_loss + super(DeepEmbeddingClusterModel, self).__init__(name='DECModel') + + # Common + self._feature_columns = feature_columns + self._feature_columns_dims = len(self._feature_columns) + self._n_clusters = n_clusters + _default_loss = loss + self._train_max_iters = train_max_iters + self._update_interval = update_interval + self._current_interval = 0 + self._train_use_tol = train_use_tol + self._tol = tol + + # Pre-train + self._run_pretrain = run_pretrain + self._existed_pretrain_model = existed_pretrain_model + self._pretrain_activation_func = pretrain_activation_func + self._pretrain_dims = pretrain_dims + self._pretrain_epochs = pretrain_epochs + self._pretrain_initializer = pretrain_initializer + self._pretrain_lr = pretrain_lr + self._pretrain_optimizer = SGD(lr=self._pretrain_lr, momentum=0.9) + + # Pre-train-callbacks + self._pretrain_use_callbacks = pretrain_use_callbacks + self._pretrain_cbearlystop_patience = pretrain_cbearlystop_patience + self._pretrain_cbearlystop_mindelta = pretrain_cbearlystop_mindelta + self._pretrain_cbreduce_patience = pretrain_cbreduce_patience + self._pretrain_cbreduce_factor = pretrain_cbreduce_factor + + # K-Means + self._kmeans_init = kmeans_init + + # Cluster + _train_lr = train_lr + self._cluster_optimizer = SGD(lr=_train_lr, momentum=0.9) + + # Build model + self._n_stacks = len(self._pretrain_dims) + self.input_layer = DenseFeatures(feature_columns) + + # Layers - encoder + self.encoder_layers = [] + for i in range(self._n_stacks): + self.encoder_layers.append(Dense(units=self._pretrain_dims[i], + activation=self._pretrain_activation_func, + name='encoder_%d' % i)) + + self.clustering_layer = ClusteringLayer(name='clustering', n_clusters=self._n_clusters) + + @staticmethod + def target_distribution(q): + """ + Calculate auxiliary softer target distributions by raising q to the second power and + then normalizing by frequency. + :param q: Original distributions. + :return: Auxiliary softer target distributions + """ + weight = q ** 2 / q.sum(0) + return (weight.T / weight.sum(1)).T + + def pre_train(self, x): + """ + Used for preparing encoder part by loading ready-to-go model or training one. + :param x: + :return: + """ + print('{} Start pre_train.'.format(datetime.now())) + + print('{} Start preparing training dataset to save into memory.'.format(datetime.now())) + # Concatenate input feature to meet requirement of keras.Model.fit() + def _concate_generate(dataset_element): + concate_y = tf.stack([dataset_element[feature.key] for feature in self._feature_columns], axis=1) + return (dataset_element, concate_y) + + y = x.cache().map(map_func=_concate_generate) + y.prefetch(1) + + self.input_x = dict() + self.input_y = None + for np_sample in tfds.as_numpy(y): + sample_dict = np_sample[0] + label = np_sample[1] + if self.input_y is None: + self.input_y = label + else: + self.input_y = np.concatenate([self.input_y, label]) + if len(self.input_x) == 0: + self.input_x = sample_dict + else: + for k in self.input_x: + self.input_x[k] = np.concatenate([self.input_x[k], sample_dict[k]]) + print('{} Done preparing training dataset.'.format(datetime.now())) + + # Layers - decoder + self.decoder_layers = [] + for i in range(self._n_stacks - 2, -1, -1): + self.decoder_layers.append(Dense(units=self._pretrain_dims[i], + activation=self._pretrain_activation_func, + kernel_initializer=self._pretrain_initializer, + name='decoder_%d' % (i + 1))) + + self.decoder_layers.append(Dense(units=self._feature_columns_dims, + kernel_initializer=self._pretrain_initializer, + name='decoder_0')) + # Pretrain - autoencoder, encoder + # autoencoder + self._autoencoder = keras.Sequential(layers=[self.input_layer] + self.encoder_layers + self.decoder_layers, + name='autoencoder') + self._autoencoder.compile(optimizer=self._pretrain_optimizer, loss='mse') + # encoder + self._encoder = keras.Sequential(layers=[self.input_layer] + self.encoder_layers, name='encoder') + self._encoder.compile(optimizer=self._pretrain_optimizer, loss='mse') + + # pretrain_callbacks + print('{} Training auto-encoder.'.format(datetime.now())) + if self._pretrain_use_callbacks: + callbacks = [ + EarlyStopping(monitor='loss', + patience=self._pretrain_cbearlystop_patience, min_delta=self._pretrain_cbearlystop_mindelta), + ReduceLROnPlateau(monitor='loss', + factor=self._pretrain_cbreduce_factor, patience=self._pretrain_cbreduce_patience) + ] + self._autoencoder.fit(self.input_x, self.input_y, + epochs=self._pretrain_epochs, callbacks=callbacks, verbose=1) + else: + self._autoencoder.fit(self.input_x, self.input_y, + epochs=self._pretrain_epochs, verbose=1) + # encoded_input + # type : numpy.ndarray shape : (num_of_all_records,num_of_cluster) (70000,10) if mnist + print('{} Calculating encoded_input.'.format(datetime.now())) + self.encoded_input = self._encoder.predict(x) + + del self._autoencoder + del self._encoder + del self.decoder_layers + print('{} Done pre-train.'.format(datetime.now())) + + def call(self, inputs, training=None, mask=None): + x = self.input_layer(inputs) + for encoder_layer in self.encoder_layers: + x = encoder_layer(x) + return self.clustering_layer(x) + + def init_centroids(self): + """ + Training K-means `_kmeans_init` times on the output of encoder to get best initial centroids. + :return: + """ + self.kmeans = KMeans(n_clusters=self._n_clusters, n_init=self._kmeans_init) + self.y_pred_last = self.kmeans.fit_predict(self.encoded_input) + print('{} Done init centroids by k-means.'.format(datetime.now())) + + def sqlflow_train_loop(self, x, epochs=1, verbose=0): + """ Parameter `epochs` and `verbose` will not be used in this function. """ + # There is a bug which will cause build failed when using `DenseFeatures` with `keras.Model` + # https://github.com/tensorflow/tensorflow/issues/28111 + # Using 'predict' to solve this problem here. + # Preparation + for features in x.take(1): + self.predict(x=features) + + # Get train.batch_size from sqlflow + for feature_name, feature_series in features.items(): + self._train_batch_size = feature_series.shape[0] + break + + # Pre-train autoencoder to prepare weights of encoder layers. + self.pre_train(x) + + # Initialize centroids for clustering. + self.init_centroids() + + # Setting cluster layer. + self.get_layer(name='clustering').set_weights([self.kmeans.cluster_centers_]) + + # Train + # flatten y to shape (num_samples, flattened_features) + record_num = self.input_y.shape[0] + feature_dims = self.input_y.shape[1:] + feature_dim_total = 1 + for d in feature_dims: + feature_dim_total = feature_dim_total * d + y_reshaped = self.input_y.reshape([record_num, feature_dim_total]) + print('{} Done preparing training dataset.'.format(datetime.now())) + + index_array = np.arange(record_num) + index, loss, p = 0, 0., None + + for ite in range(self._train_max_iters): + if ite % self._update_interval == 0: + q = self.predict(self.input_x) # numpy.ndarray shape(record_num,n_clusters) + p = self.target_distribution(q) # update the auxiliary target distribution p + + if self._train_use_tol: + y_pred = q.argmax(1) + # delta_percentage means the percentage of changed predictions in this train stage. + delta_percentage = np.sum(y_pred != self.y_pred_last).astype(np.float32) / y_pred.shape[0] + print('{} Updating at iter: {} -> delta_percentage: {}.'.format(datetime.now(), ite, delta_percentage)) + self.y_pred_last = np.copy(y_pred) + if ite > 0 and delta_percentage < self._tol: + print('Early stopping since delta_table {} has reached tol {}'.format(delta_percentage, self._tol)) + break + idx = index_array[index * self._train_batch_size: min((index + 1) * self._train_batch_size, record_num)] + + loss = self.train_on_batch(x=list(y_reshaped[idx].T), y=p[idx]) + if ite % 100 == 0: + print('{} Training at iter:{} -> loss:{}.'.format(datetime.now(), ite, loss)) + index = index + 1 if (index + 1) * self._train_batch_size <= record_num else 0 # Update index + + def display_model_info(self, verbose=0): + if verbose >= 0: + print('Summary : ') + print(self.summary()) + if verbose >= 1: + print('Layer\'s Shape : ') + for layer in self.encoder_layers: + print(layer.name + ' : ') + for i in layer.get_weights(): + print(i.shape) + print(self.clustering_layer.name + ' : ') + for i in self.clustering_layer.get_weights(): + print(i.shape) + if verbose >= 2: + print('Layer\'s Info : ') + for layer in self.encoder_layers: + print(layer.name + ' : ') + print(layer.get_weights()) + # Cluster + print(self.clustering_layer.name + ' : ') + print(self.clustering_layer.get_weights()) + + +def optimizer(): + global _train_lr + return SGD(lr=_train_lr, momentum=0.9) + +def loss(labels, output): + global _default_loss + return _default_loss(labels, output) + +def prepare_prediction_column(prediction): + """ Return the cluster label of the highest probability. """ + return prediction.argmax(axis=-1) + +class ClusteringLayer(Layer): + def __init__(self, n_clusters, alpha=1.0, **kwargs): + """ + Using clustering layer to refine the cluster centroids by learning from current high confidence assignment + using auxiliary target distribution. + + :param n_clusters: Number of clusters. + :param weights: Initial cluster centroids. + :param alpha: Degrees of freedom parameters in Student's t-distribution. Default to 1.0 for all experiments. + :param kwargs: + """ + self.n_clusters = n_clusters + self.alpha = alpha + self.input_spec = InputSpec(ndim=2) + super(ClusteringLayer, self).__init__(**kwargs) + + def build(self, input_shape): + input_dim = input_shape[1] + self.input_spec = InputSpec(dtype=backend.floatx(), shape=(None, input_dim)) + shape = tf.TensorShape(dims=(self.n_clusters, input_dim)) + self.kernel = self.add_weight(name='kernel', shape=shape, initializer='glorot_uniform', trainable=True) + super(ClusteringLayer, self).build(shape) + + def call(self, inputs, **kwargs): + q = 1.0 / (1.0 + (backend.sum(backend.square(backend.expand_dims(inputs, axis=1) - self.kernel), + axis=2) / self.alpha)) + q **= (self.alpha + 1.0) / 2.0 + q = backend.transpose(backend.transpose(q) / backend.sum(q, axis=1)) + return q + + def compute_output_shape(self, input_shape): + assert input_shape and len(input_shape) == 2 + return input_shape[0], self.n_clusters + + def get_config(self): + config = {'n_clusters': self.n_clusters} + base_config = super(ClusteringLayer, self).get_config() + return dict(list(base_config.items()) + list(config.items())) diff --git a/sqlflow_models/dnnclassifier.py b/sqlflow_models/dnnclassifier.py new file mode 100644 index 0000000..30518ba --- /dev/null +++ b/sqlflow_models/dnnclassifier.py @@ -0,0 +1,65 @@ +import tensorflow as tf + +class DNNClassifier(tf.keras.Model): + def __init__(self, feature_columns=None, hidden_units=[100,100], n_classes=3): + """DNNClassifier + :param feature_columns: feature columns. + :type feature_columns: list[tf.feature_column]. + :param hidden_units: number of hidden units. + :type hidden_units: list[int]. + :param n_classes: List of hidden units per layer. + :type n_classes: int. + """ + global _loss + super(DNNClassifier, self).__init__() + self.feature_layer = None + self.n_classes = n_classes + if feature_columns is not None: + # combines all the data as a dense tensor + self.feature_layer = tf.keras.layers.DenseFeatures(feature_columns) + self.hidden_layers = [] + for hidden_unit in hidden_units: + self.hidden_layers.append(tf.keras.layers.Dense(hidden_unit, activation='relu')) + if self.n_classes == 2: + # special setup for binary classification + pred_act = 'sigmoid' + _loss = 'binary_crossentropy' + n_out = 1 + else: + pred_act = 'softmax' + _loss = 'categorical_crossentropy' + n_out = self.n_classes + self.prediction_layer = tf.keras.layers.Dense(n_out, activation=pred_act) + + def call(self, inputs, training=True): + if self.feature_layer is not None: + x = self.feature_layer(inputs) + else: + x = tf.keras.layers.Flatten()(inputs) + for hidden_layer in self.hidden_layers: + x = hidden_layer(x) + return self.prediction_layer(x) + +def optimizer(learning_rate=0.001): + """Default optimizer name. Used in model.compile.""" + return tf.keras.optimizers.Adagrad(lr=learning_rate) + +def loss(labels, output): + """Default loss function. Used in model.compile.""" + global _loss + if _loss == "binary_crossentropy": + return tf.reduce_mean(tf.keras.losses.binary_crossentropy(labels, output)) + elif _loss == "categorical_crossentropy": + return tf.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(labels, output)) + +def prepare_prediction_column(prediction): + """Return the class label of highest probability.""" + return prediction.argmax(axis=-1) + +def eval_metrics_fn(): + return { + "accuracy": lambda labels, predictions: tf.equal( + tf.argmax(predictions, 1, output_type=tf.int32), + tf.cast(tf.reshape(labels, [-1]), tf.int32), + ) + } diff --git a/sqlflow_models/dnnclassifier_functional_api_example.py b/sqlflow_models/dnnclassifier_functional_api_example.py new file mode 100644 index 0000000..12522f3 --- /dev/null +++ b/sqlflow_models/dnnclassifier_functional_api_example.py @@ -0,0 +1,40 @@ +import tensorflow as tf + +global _loss + +def dnnclassifier_functional_model(feature_columns, field_metas, n_classes=2, learning_rate=0.001): + feature_layer_inputs = dict() + for fmkey in field_metas: + fm = field_metas[fmkey] + feature_layer_inputs[fm["feature_name"]] = tf.keras.Input(shape=(fm["shape"]), name=fm["feature_name"], dtype=fm["dtype"]) + feature_layer = tf.keras.layers.DenseFeatures(feature_columns) + feature_layer_outputs = feature_layer(feature_layer_inputs) + global _loss + if n_classes == 2: + # special setup for binary classification + pred_act = 'sigmoid' + _loss = 'binary_crossentropy' + else: + pred_act = 'softmax' + _loss = 'categorical_crossentropy' + x = tf.keras.layers.Dense(128, activation='relu')(feature_layer_outputs) + x = tf.keras.layers.Dense(64, activation='relu')(x) + pred = tf.keras.layers.Dense(n_classes, activation=pred_act)(x) + return tf.keras.Model(inputs=[v for v in feature_layer_inputs.values()], outputs=pred) + +def loss(labels, output): + global _loss + if _loss == "binary_crossentropy": + return tf.reduce_mean(tf.keras.losses.binary_crossentropy(labels, output)) + elif _loss == "categorical_crossentropy": + return tf.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(labels, output)) + +def epochs(): + return 1 + +def optimizer(lr=0.1): + return tf.keras.optimizers.Adagrad(lr=lr) + +def prepare_prediction_column(self, prediction): + """Return the class label of highest probability.""" + return prediction.argmax(axis=-1) diff --git a/sqlflow_models/dnnregressor.py b/sqlflow_models/dnnregressor.py new file mode 100644 index 0000000..c5eca66 --- /dev/null +++ b/sqlflow_models/dnnregressor.py @@ -0,0 +1,48 @@ +import tensorflow as tf + +class DNNRegressor(tf.keras.Model): + def __init__(self, feature_columns=None, hidden_units=[100,100]): + """DNNRegressor + :param feature_columns: feature columns. + :type feature_columns: list[tf.feature_column]. + :param hidden_units: number of hidden units. + :type hidden_units: list[int]. + """ + super(DNNRegressor, self).__init__() + self.feature_layer = None + if feature_columns is not None: + # combines all the data as a dense tensor + self.feature_layer = tf.keras.layers.DenseFeatures(feature_columns) + self.hidden_layers = [] + for hidden_unit in hidden_units: + self.hidden_layers.append(tf.keras.layers.Dense(hidden_unit, activation='relu')) + self.prediction_layer = tf.keras.layers.Dense(1) + + def call(self, inputs, training=True): + if self.feature_layer is not None: + x = self.feature_layer(inputs) + else: + x = tf.keras.layers.Flatten()(inputs) + for hidden_layer in self.hidden_layers: + x = hidden_layer(x) + return self.prediction_layer(x) + +def optimizer(learning_rate=0.001): + """Default optimizer name. Used in model.compile.""" + return tf.keras.optimizers.Adagrad(lr=learning_rate) + +def loss(labels, output): + """Default loss function. Used in model.compile.""" + return tf.keras.losses.MSE(labels, output) + +def prepare_prediction_column(prediction): + """Return the prediction directly.""" + return prediction[0] + +def eval_metrics_fn(): + return { + "mse": lambda labels, predictions: tf.reduce_mean( + tf.pow( + tf.cast(predictions, tf.float64) - tf.cast(labels, tf.float64), 2) + ) + } diff --git a/sqlflow_models/gcn.py b/sqlflow_models/gcn.py new file mode 100644 index 0000000..c9f6d75 --- /dev/null +++ b/sqlflow_models/gcn.py @@ -0,0 +1,404 @@ +# Based on the code from: https://github.com/tkipf/keras-gcn +import tensorflow as tf +from tensorflow.keras import activations, initializers, constraints +from tensorflow.keras import regularizers +import tensorflow.keras.backend as K +import scipy.sparse as sp +import numpy as np +import pickle, copy + + +class GCN(tf.keras.Model): + def __init__(self, nhid, nclass, epochs, train_ratio, eval_ratio, + sparse_input=True, early_stopping=True, dropout=0.5, nlayer=2, feature_columns=None, + id_col='id', feature_col='features', from_node_col='from_node_id', to_node_col='to_node_id'): + """ + Implementation of GCN in this paper: https://arxiv.org/pdf/1609.02907.pdf. The original tensorflow implementation + is accessible here: https://github.com/tkipf/gcn, and one can find more information about GCN through: + http://tkipf.github.io/graph-convolutional-networks/. + :param nhid: Number of hidden units for GCN. + type nhid: int. + :param nclass: Number of classes in total which will be the output dimension. + type nclass: int. + :param epochs: Number of epochs for the model to be trained. + type epochs: int. + :param train_ratio: Percentage of data points to be used for training. + type train_ratio: float. + :param eval_ratio: Percentage of data points to be used for evaluating. + type eval_ratio: float. + :param early_stopping: Whether to use early stopping trick during the training phase. + type early_stopping: bool. + :param dropout: The rate for dropout. + type dropout: float. + :param nlayer: Number of GCNLayer to be used in the model. + type nlayer: int. + :param feature_columns: a list of tf.feature_column. (Not used in this model) + type feature_columns: list. + :param id_col: Name for the column in database to be used as the id of each node. + type id_col: string. + :param feature_col: Name for the column in database to be used as the features of each node. + type feature_col: string. + :param from_node_col: Name for the column in database to be used as the from_node id of each edge. + type from_node_col: string. + :param to_node_col: Name for the column in database to be used as the to_node id of each edge. + type to_node_col: string. + """ + super(GCN, self).__init__() + + assert dropout < 1 and dropout > 0, "Please make sure dropout rate is a float between 0 and 1." + assert train_ratio < 1 and train_ratio > 0, "Please make sure train_ratio is a float between 0 and 1." + assert eval_ratio < 1 and eval_ratio > 0, "Please make sure eval_ratio is a float between 0 and 1." + self.gc_layers = list() + self.gc_layers.append(GCNLayer(nhid, kernel_regularizer=tf.keras.regularizers.l2(5e-4), sparse_input=sparse_input)) + for i in range(nlayer-1): + self.gc_layers.append(GCNLayer(nhid, kernel_regularizer=tf.keras.regularizers.l2(5e-4))) + self.gc_layers.append(GCNLayer(nclass)) + self.keep_prob = 1 - dropout + self.dropout = tf.keras.layers.Dropout(dropout) + self.nshape = None + self.train_ratio = train_ratio + self.eval_ratio = eval_ratio + self.nlayer = nlayer + self.epochs = epochs + self.early_stopping = early_stopping + self.sparse_input = sparse_input + self.id_col = id_col + self.feature_col = feature_col + self.from_node_col = from_node_col + self.to_node_col = to_node_col + # try to load the result file + try: + with open('./results.pkl', 'rb') as f: + self.results = pickle.load(f) + except (FileNotFoundError, IOError): + self.results = None + + def call(self, data): + x, adj = data + assert self.nshape is not None, "Should calculate the shape of input by preprocessing the data with model.preprocess(data)." + if self.sparse_input: + x = GCN.sparse_dropout(x, self.keep_prob, self.nshape) + else: + x = self.dropout(x) + for i in range(self.nlayer-1): + x = tf.keras.activations.relu(self.gc_layers[i](x, adj)) + x = self.dropout(x) + x = self.gc_layers[-1](x, adj) + + return tf.keras.activations.softmax(x) + + def evaluate(self, data, y, sample_weight): + """Function to evaluate the model.""" + return self.test(sample_weight, return_loss=True) + + def predict(self, data): + """Function to predict labels with the model.""" + x, adj = data + for i in range(self.nlayer-1): + x = tf.keras.activations.relu(self.gc_layers[i](x, adj)) + x = self.gc_layers[-1](x, adj) + return tf.keras.activations.softmax(x) + + @staticmethod + def sparse_dropout(x, keep_prob, noise_shape): + """Dropout for sparse tensors.""" + random_tensor = keep_prob + random_tensor += tf.random.uniform(noise_shape) + dropout_mask = tf.cast(tf.floor(random_tensor), dtype=tf.bool) + pre_out = tf.sparse.retain(x, dropout_mask) + return pre_out * (1./keep_prob) + + @staticmethod + def encode_onehot(labels): + classes = set(labels) + classes_dict = {c: np.identity(len(classes))[i, :] for i, c in enumerate(classes)} + labels_onehot = np.array(list(map(classes_dict.get, labels)), dtype=np.int32) + return labels_onehot + + @staticmethod + def normalize_adj(adjacency, symmetric=True): + """ + Function to normalize the adjacency matrix (get the laplacian matrix). + :param adjacency: Adjacency matrix of the dataset. + type adjacency: Scipy COO_Matrix. + :param symmetric: Boolean variable to determine whether to use symmetric laplacian. + type symmetric: bool. + """ + adjacency += sp.eye(adjacency.shape[0]) + if symmetric: + """L=D^-0.5 * (A+I) * D^-0.5""" + d = sp.diags(np.power(np.array(adjacency.sum(1)), -0.5).flatten(), 0) + a_norm = adjacency.dot(d).transpose().dot(d).tocoo() + else: + """L=D^-1 * (A+I)""" + d = sp.diags(np.power(np.array(adjacency.sum(1)), -1).flatten(), 0) + a_norm = d.dot(adjacency).tocoo() + + return a_norm + + @staticmethod + def normalize_feature(features, sparse_input): + """Function to row-normalize the features input.""" + rowsum = np.array(features.sum(1)) + r_inv = np.power(rowsum, -1).flatten() + r_inv[np.isinf(r_inv)] = 0. + r_mat_inv = sp.diags(r_inv) + features = r_mat_inv.dot(features) + if sparse_input: + return sp.csr_matrix(features).tocoo() + else: + return features + + def preprocess(self, ids, features, labels, edges): + """Function to preprocess the node features and adjacency matrix.""" + if len(features.shape) > 2: + features = np.squeeze(features) + if len(edges.shape) > 2: + edges = np.squeeze(edges) + # sort the data in the correct order + idx = np.argsort(np.array(ids)) + features = features[idx] + labels = labels[idx] + # preprocess + features = GCN.normalize_feature(features, self.sparse_input) + labels = GCN.encode_onehot(labels) + adjacency = sp.coo_matrix((np.ones(len(edges)), + (edges[:, 0], edges[:, 1])), + shape=(features.shape[0], features.shape[0]), dtype="float32") + + adjacency = adjacency + adjacency.T.multiply(adjacency.T > adjacency) - adjacency.multiply(adjacency.T > adjacency) + adjacency = GCN.normalize_adj(adjacency, symmetric=True) + + nf_shape = features.data.shape + na_shape = adjacency.data.shape + if self.sparse_input: + features = tf.SparseTensor( + indices=np.array(list(zip(features.row, features.col)), dtype=np.int64), + values=tf.cast(features.data, tf.float32), + dense_shape=features.shape) + features = tf.sparse.reorder(features) + adjacency = tf.SparseTensor( + indices=np.array(list(zip(adjacency.row, adjacency.col)), dtype=np.int64), + values=tf.cast(adjacency.data, tf.float32), + dense_shape=adjacency.shape) + adjacency = tf.sparse.reorder(adjacency) + + total_num = features.shape[0] + train_num = round(total_num*self.train_ratio) + eval_num = round(total_num*self.eval_ratio) + train_index = np.arange(train_num) + val_index = np.arange(train_num, train_num+eval_num) + test_index = np.arange(train_num+eval_num, total_num) + + self.train_mask = np.zeros(total_num, dtype = np.bool) + self.val_mask = np.zeros(total_num, dtype = np.bool) + self.test_mask = np.zeros(total_num, dtype = np.bool) + self.train_mask[train_index] = True + self.val_mask[val_index] = True + self.test_mask[test_index] = True + + print('Dataset has {} nodes, {} edges, {} features.'.format(features.shape[0], edges.shape[0], features.shape[1])) + + return features, labels, adjacency, nf_shape, na_shape + + def loss_func(self, model, x, y, train_mask, training=True): + '''Customed loss function for the model.''' + + y_ = model(x, training=training) + + test_mask_logits = tf.gather_nd(y_, tf.where(train_mask)) + masked_labels = tf.gather_nd(y, tf.where(train_mask)) + + return loss(labels=masked_labels, output=test_mask_logits) + + def grad(self, model, inputs, targets, train_mask): + '''Calculate the gradients of the parameters.''' + with tf.GradientTape() as tape: + loss_value = self.loss_func(model, inputs, targets, train_mask) + + return loss_value, tape.gradient(loss_value, model.trainable_variables) + + def test(self, mask, return_loss=False): + '''Test the results on the model. Return accuracy''' + logits = self.predict(data=[self.features, self.adjacency]) + + test_mask_logits = tf.gather_nd(logits, tf.where(mask)) + masked_labels = tf.gather_nd(self.labels, tf.where(mask)) + + ll = tf.equal(tf.argmax(masked_labels, -1), tf.argmax(test_mask_logits, -1)) + accuracy = tf.reduce_mean(tf.cast(ll, dtype=tf.float32)) + + if return_loss: + loss_value = loss(labels=masked_labels, output=test_mask_logits) + return [loss_value, accuracy] + + return accuracy + + def sqlflow_train_loop(self, x): + """Customized training function.""" + # load data + ids, ids_check, features, labels, edges, edge_check = list(), dict(), list(), list(), list(), dict() + from_node = 0 + for inputs, label in x: + id = inputs[self.id_col].numpy().astype(np.int32) + feature = inputs[self.feature_col].numpy().astype(np.float32) + from_node = inputs[self.from_node_col].numpy().astype(np.int32) + to_node = inputs[self.to_node_col].numpy().astype(np.int32) + if int(id) not in ids_check: + ids.append(int(id)) + features.append(feature) + labels.append(label.numpy()[0]) + ids_check[int(id)] = 0 + if tuple([int(from_node), int(to_node)]) not in edge_check: + edge_check[tuple([int(from_node), int(to_node)])] = 0 + edges.append([from_node, to_node]) + features = np.stack(features) + labels = np.stack(labels) + edges = np.stack(edges) + + self.features, self.labels, self.adjacency, self.nshape, na_shape = self.preprocess(ids, features, labels, edges) + # training the model + wait = 0 + best_acc = -9999999 + PATIENCE = 10 + for epoch in range(self.epochs): + # calculate the gradients and take the step + loss_value, grads = self.grad(self, [self.features, self.adjacency], self.labels, self.train_mask) + optimizer().apply_gradients(zip(grads, self.trainable_variables)) + # Test on train and evaluate dataset + train_acc = self.test(self.train_mask) + val_acc = self.test(self.val_mask) + print("Epoch {} loss={:6f} accuracy={:6f} val_acc={:6f}".format(epoch, loss_value, train_acc, val_acc)) + # early stopping + if epoch > 50 and self.early_stopping: + if float(val_acc.numpy()) > best_acc: + best_acc = float(val_acc.numpy()) + wait = 0 + else: + if wait >= PATIENCE: + print('Epoch {}: early stopping'.format(epoch)) + break + wait += 1 + # evaluate the model + result = self.evaluate(data=[self.features, self.adjacency], y=self.labels, sample_weight=self.val_mask) + # get all the results + predicted = self.predict([self.features, self.adjacency]) + # store the results in a pickled file + with open('./results.pkl', 'wb') as f: + results = dict() + for i in range(len(ids)): + results[str(ids[i])] = predicted[i] + results['evaluation'] = result + pickle.dump(results, f) + self.results = results + + def sqlflow_evaluate_loop(self, x, metric_names): + """Customed evaluation, can only support calculating the accuracy.""" + assert self.results is not None, "Please make sure to train the model first." + eval_result = self.results['evaluation'] + return eval_result + + def sqlflow_predict_one(self, sample): + """Customed prediction, sample must be the node id.""" + assert self.results is not None, "Please make sure to train the model first." + prediction = self.results[str(int(sample))] + return [prediction] + +def optimizer(): + """Default optimizer name. Used in model.compile.""" + return tf.keras.optimizers.Adam(lr=0.01) + +def loss(labels, output): + """Default loss function for classification task.""" + criterion = tf.keras.losses.CategoricalCrossentropy(from_logits=False) + return criterion(y_true=labels, y_pred=output) + +# Graph Convolutional Layer +class GCNLayer(tf.keras.layers.Layer): + + def __init__(self, units, use_bias=True, sparse_input=False, + kernel_initializer='glorot_uniform', + bias_initializer='zeros', + kernel_regularizer=None, + bias_regularizer=None, + kernel_constraint=None, + bias_constraint=None, + **kwargs): + """GCNLayer + Graph Convolutional Networks Layer from paper: https://arxiv.org/pdf/1609.02907.pdf. This is used in the GCN model for + classification task on graph-structured data. + :param units: Number of hidden units for the layer. + type units: int. + :param use_bias: Boolean variable to determine whether to use bias. + type use_bias: bool. + :param sparse_input: Boolean variable to check if input tensor is sparse. + type sparse_input: bool. + :param kernel_initializer: Weight initializer for the GCN kernel. + :param bias_initializer: Weight initializer for the bias. + :param kernel_regularizer: Weight regularizer for the GCN kernel. + :param bias_regularizer: Weight regularizer for the bias. + :param kernel_constraint: Weight value constraint for the GCN kernel. + :param bias_constraint: Weight value constraint for the bias. + :param kwargs: + """ + if 'input_shape' not in kwargs and 'input_dim' in kwargs: + kwargs['input_shape'] = (kwargs.pop('input_dim'),) + super(GCNLayer, self).__init__(**kwargs) + self.units = units + self.use_bias = use_bias + self.sparse_input = sparse_input + self.kernel_initializer = initializers.get(kernel_initializer) + self.bias_initializer = initializers.get(bias_initializer) + self.kernel_regularizer = regularizers.get(kernel_regularizer) + self.bias_regularizer = regularizers.get(bias_regularizer) + self.kernel_constraint = constraints.get(kernel_constraint) + self.bias_constraint = constraints.get(bias_constraint) + + def build(self, input_shape): + self.kernel = self.add_weight(shape=(input_shape[-1], self.units), + initializer=self.kernel_initializer, + name='kernel', + regularizer=self.kernel_regularizer, + constraint=self.kernel_constraint, + trainable=True) + if self.use_bias: + self.bias = self.add_weight(shape=(self.units,), + initializer=self.bias_initializer, + name='bias', + regularizer=self.bias_regularizer, + constraint=self.bias_constraint, + trainable=True) + self.built = True + + def call(self, inputs, adj, **kwargs): + assert isinstance(adj, tf.SparseTensor), "Adjacency matrix should be a SparseTensor" + if self.sparse_input: + assert isinstance(inputs, tf.SparseTensor), "Input matrix should be a SparseTensor" + support = tf.sparse.sparse_dense_matmul(inputs, self.kernel) + else: + support = tf.matmul(inputs, self.kernel) + output = tf.sparse.sparse_dense_matmul(adj, support) + if self.use_bias: + output = output + self.bias + else: + output = output + return output + + def get_config(self): + config = {'units': self.units, + 'use_bias': self.use_bias, + 'sparse_input': self.sparse_input, + 'kernel_initializer': initializers.serialize( + self.kernel_initializer), + 'bias_initializer': initializers.serialize( + self.bias_initializer), + 'kernel_regularizer': regularizers.serialize( + self.kernel_regularizer), + 'bias_regularizer': regularizers.serialize( + self.bias_regularizer), + 'kernel_constraint': constraints.serialize( + self.kernel_constraint), + 'bias_constraint': constraints.serialize(self.bias_constraint) + } + base_config = super(GCNLayer, self).get_config() + return dict(list(base_config.items()) + list(config.items())) \ No newline at end of file diff --git a/sqlflow_models/native_keras.py b/sqlflow_models/native_keras.py new file mode 100644 index 0000000..2756619 --- /dev/null +++ b/sqlflow_models/native_keras.py @@ -0,0 +1,26 @@ +import tensorflow as tf + +class RawDNNClassifier(tf.keras.Model): + def __init__(self, hidden_units=[100,100], n_classes=3): + super(RawDNNClassifier, self).__init__() + self.feature_layer = None + self.n_classes = n_classes + self.hidden_layers = [] + for hidden_unit in hidden_units: + self.hidden_layers.append(tf.keras.layers.Dense(hidden_unit, activation='relu')) + if self.n_classes == 2: + pred_act = 'sigmoid' + n_out = 1 + else: + pred_act = 'softmax' + n_out = self.n_classes + self.prediction_layer = tf.keras.layers.Dense(n_out, activation=pred_act) + + def call(self, inputs, training=True): + if self.feature_layer is not None: + x = self.feature_layer(inputs) + else: + x = tf.keras.layers.Flatten()(inputs) + for hidden_layer in self.hidden_layers: + x = hidden_layer(x) + return self.prediction_layer(x) diff --git a/sqlflow_models/one_class_svm.py b/sqlflow_models/one_class_svm.py new file mode 100644 index 0000000..d58739d --- /dev/null +++ b/sqlflow_models/one_class_svm.py @@ -0,0 +1,107 @@ +# Copyright 2020 The SQLFlow Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pickle + +import numpy as np +import tensorflow as tf +from sklearn.svm import OneClassSVM as SklearnOneClassSVM + +MODEL_DIR = "model_save" +MODEL_PATH = MODEL_DIR + "/one_class_svm_model" + +ENABLE_EAGER_EXECUTION = False + +try: + tf.enable_eager_execution() + ENABLE_EAGER_EXECUTION = True +except Exception: + try: + tf.compat.v1.enable_eager_execution() + ENABLE_EAGER_EXECUTION = True + except Exception: + ENABLE_EAGER_EXECUTION = False + + +def dataset_reader(dataset): + if ENABLE_EAGER_EXECUTION: + for features in dataset: + yield features + else: + iter = dataset.make_one_shot_iterator() + one_element = iter.get_next() + with tf.Session() as sess: + try: + while True: + yield sess.run(one_element) + except tf.errors.OutOfRangeError: + pass + + +class OneClassSVM(tf.keras.Model): + def __init__(self, + feature_columns=None, + kernel='rbf', + degree=3, + gamma='scale', + coef0=0.0, + tol=0.001, + nu=0.5, + shrinking=True, + cache_size=200, + verbose=False, + max_iter=-1): + if os.path.exists(MODEL_PATH): + with open(MODEL_PATH, "rb") as f: + self.svm = pickle.load(f) + else: + self.svm = SklearnOneClassSVM(kernel=kernel, + degree=degree, + gamma=gamma, + coef0=coef0, + tol=tol, + nu=nu, + shrinking=shrinking, + cache_size=cache_size, + verbose=verbose, + max_iter=max_iter) + + def concat_features(self, features): + assert isinstance(features, dict) + each_feature = [] + for k, v in features.items(): + if ENABLE_EAGER_EXECUTION: + v = v.numpy() + each_feature.append(v) + return np.concatenate(each_feature, axis=1) + + def sqlflow_train_loop(self, dataset): + X = [] + for features in dataset_reader(dataset): + X.append(self.concat_features(features)) + X = np.concatenate(X) + + self.svm.fit(X) + + if not os.path.exists(MODEL_DIR): + os.mkdir(MODEL_DIR) + + with open(MODEL_PATH, "wb") as f: + pickle.dump(self.svm, f, protocol=2) + + def sqlflow_predict_one(self, features): + features = self.concat_features(features) + pred = self.svm.predict(features) + score = self.svm.decision_function(features) + return pred, score diff --git a/sqlflow_models/rnn_based_time_series.py b/sqlflow_models/rnn_based_time_series.py new file mode 100644 index 0000000..4f1e809 --- /dev/null +++ b/sqlflow_models/rnn_based_time_series.py @@ -0,0 +1,68 @@ +import tensorflow as tf + +class RNNBasedTimeSeriesModel(tf.keras.Model): + + def __init__(self, + feature_columns=None, + stack_units=[500, 500], + n_in=7, + n_out=1, + n_features=1, + model_type='rnn'): + """RNNBasedTimeSeriesModel + :param feature_columns: All columns must be embedding of sequence column with same sequence_length. + type feature_columns: list[tf.feature_column.numeric_column]. + :param stack_units: Units for RNN layer. + type stack_units: vector of ints. + :param n_in: Size of time window. + type n_in: int. + :param n_out: Number of predicted labels. + type n_out: int. + :param n_features: number of features in every time window. + type n_features: int. + :param model_type: Specific RNN model to be used, which can be chose from: ('rnn', 'lstm' and 'gru'). + type model_type: string. + """ + super(RNNBasedTimeSeriesModel, self).__init__(name='RNN_TS_Model') + # Common + self.feature_columns = feature_columns + self.loss = loss + self.n_out = n_out + self.n_in = n_in + self.n_features = n_features + self.stack_units = stack_units + self.models = {'rnn':tf.keras.layers.SimpleRNN, 'lstm':tf.keras.layers.LSTM, 'gru':tf.keras.layers.GRU} + # combines all the data as a dense tensor + self.feature_layer = None + if feature_columns is not None: + self.feature_layer = tf.keras.layers.DenseFeatures(feature_columns) + self.stack_layers = [] + for unit in self.stack_units[:-1]: + self.stack_layers.append(self.models[model_type.lower()](unit, input_shape=(self.n_in, self.n_features), return_sequences=True)) + self.rnn = self.models[model_type.lower()](self.stack_units[-1], input_shape=(self.n_in, self.n_features)) + self.dropout = tf.keras.layers.Dropout(0.2) + self.prediction_layer = tf.keras.layers.Dense(self.n_out) + + def call(self, inputs): + if self.feature_layer: + x = self.feature_layer(inputs) + else: + x = inputs + x = tf.reshape(x, (-1, self.n_in, self.n_features)) + for i in range(len(self.stack_units) - 1): + x = self.stack_layers[i](x) + x = self.rnn(x) + x = self.dropout(x) + return self.prediction_layer(x) + +def optimizer(learning_rate=0.001): + """Default optimizer name. Used in model.compile.""" + return tf.keras.optimizers.Adam(lr=learning_rate) + +def prepare_prediction_column(prediction): + """Return the prediction directly.""" + return prediction + +def loss(labels, output): + return tf.reduce_mean(tf.keras.losses.MSE(labels, output)) + diff --git a/sqlflow_models/rnnclassifier.py b/sqlflow_models/rnnclassifier.py new file mode 100644 index 0000000..49f86b6 --- /dev/null +++ b/sqlflow_models/rnnclassifier.py @@ -0,0 +1,83 @@ +import tensorflow as tf + +_loss = '' + +class StackedRNNClassifier(tf.keras.Model): + def __init__(self, feature_columns=None, stack_units=[32], hidden_size=64, n_classes=2, model_type='rnn', bidirectional=False): + """StackedRNNClassifier + :param feature_columns: All columns must be embedding of sequence column with same sequence_length. + :type feature_columns: list[tf.embedding_column]. + :param stack_units: Units for RNN layer. + :type stack_units: vector of ints. + :param n_classes: Target number of classes. + :type n_classes: int. + :param model_type: Specific RNN model to be used, which can be chose from: ('rnn', 'lstm' and 'gru'). + :type model_type: string. + :param bidirectional: Whether to use bidirectional or not. + :type bidirectional: bool. + """ + global _loss + super(StackedRNNClassifier, self).__init__() + + self.models = {'rnn':tf.keras.layers.SimpleRNN, 'lstm':tf.keras.layers.LSTM, 'gru':tf.keras.layers.GRU} + self.bidirectionals = {True: tf.keras.layers.Bidirectional, False: lambda x: x} + self.feature_layer = None + if feature_columns is not None: + self.feature_layer = tf.keras.experimental.SequenceFeatures(feature_columns) + self.stack_rnn = [] + self.stack_size = len(stack_units) + self.stack_units = stack_units + self.n_classes = n_classes + if self.stack_size > 1: + for i in range(self.stack_size - 1): + self.stack_rnn.append( + self.bidirectionals[bidirectional](self.models[model_type.lower()](self.stack_units[i], return_sequences=True)) + ) + self.rnn = self.bidirectionals[bidirectional](self.models[model_type.lower()](self.stack_units[-1])) + self.hidden = tf.keras.layers.Dense(hidden_size, activation='relu') + if self.n_classes == 2: + # special setup for binary classification + pred_act = 'sigmoid' + _loss = 'binary_crossentropy' + n_out = 1 + else: + pred_act = 'softmax' + _loss = 'categorical_crossentropy' + n_out = self.n_classes + self.pred = tf.keras.layers.Dense(n_out, activation=pred_act) + + def call(self, inputs): + if self.feature_layer: + x, seq_len = self.feature_layer(inputs) + else: + x, seq_len = inputs + seq_mask = tf.sequence_mask(seq_len) + if self.stack_size > 1: + for i in range(self.stack_size - 1): + x = self.stack_rnn[i](x, mask=seq_mask) + x = self.rnn(x, mask=seq_mask) + x = self.hidden(x) + return self.pred(x) + +def optimizer(): + """Default optimizer name. Used in model.compile.""" + return 'adam' + +def loss(labels, output): + global _loss + if _loss == "binary_crossentropy": + return tf.reduce_mean(tf.keras.losses.binary_crossentropy(labels, output)) + elif _loss == "categorical_crossentropy": + return tf.reduce_mean(tf.keras.losses.categorical_crossentropy(labels, output)) + +def prepare_prediction_column(prediction): + """Return the class label of highest probability.""" + return prediction.argmax(axis=-1) + +def eval_metrics_fn(): + return { + "accuracy": lambda labels, predictions: tf.equal( + tf.argmax(predictions, 1, output_type=tf.int32), + tf.cast(tf.reshape(labels, [-1]), tf.int32), + ) + } diff --git a/sqlflow_models/score_card.py b/sqlflow_models/score_card.py new file mode 100644 index 0000000..4acec56 --- /dev/null +++ b/sqlflow_models/score_card.py @@ -0,0 +1,106 @@ +#!/bin/env python + +import tensorflow as tf +from tensorflow import keras +from tensorflow.python.data import make_one_shot_iterator +from tensorflow.keras.losses import kld +from tensorflow.keras.optimizers import SGD +import numpy as np +import pandas as pd +import scipy.stats.stats as stats +import sklearn +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import train_test_split +from sklearn.metrics import roc_auc_score, auc +import pickle + + +def optimizer(): + return None + + +def loss(): + return None + + +class ScoreCard(keras.Model): + + def __init__(self, feature_columns=None, pf_bin_size=5): + super(ScoreCard, self).__init__(name='ScoreCard') + + self._target_score = 600 + self._factor = 20/np.log(2) + self._offset = 600 - 20*np.log(20) / np.log(2) + self._bins = dict() + self._pf_bin_size = pf_bin_size + + def _pf_bin(self, y, x): + # population frequency bucket + bad_num = y.sum() + good_num = y.count() - y.sum() + d1 = pd.DataFrame({'x': x,'y': y,'bucket': pd.qcut(x, self._pf_bin_size, duplicates='drop')}) + d2 = d1.groupby('bucket',as_index=True) + d3 = pd.DataFrame(d2.x.min(),columns=['min_bin']) + + d3["min"] = d2.min().x + d3["max"] = d2.max().x + d3["badcostum"] = d2.sum().y + d3["goodcostum"] = d2.count().y - d2.sum().y + d3["total"] = d2.count().y + d3["bad_rate"] = d2.sum().y/d2.count().y + d3["woe"] = np.log(d3["badcostum"]/d3["goodcostum"]*good_num/bad_num) + iv = ((d3["badcostum"]/bad_num-d3["goodcostum"]/good_num)*d3["woe"]) + d3["iv"] = iv + woe = list(d3["woe"].round(6)) + cut = list(d3["max"].round(6)) + cut.insert(0, float("-inf")) + cut[-1] = float("inf") + return d3, cut, woe, iv + + def _to_dataframe(self, dataset): + x_df = pd.DataFrame() + y_df = pd.DataFrame() + for _, minibatch in enumerate(dataset): + data, label = minibatch + dx = {} + dy = {} + for name, value in data.items(): + dx[name] = value.numpy()[0][0] + x_df = x_df.append(dx, ignore_index=True) + dy['label'] = label.numpy()[0] + y_df = y_df.append(dy, ignore_index=True) + return x_df, y_df + + def _replace_woe(self, x, cut, woe): + return pd.cut(x, cut, labels=pd.Categorical(woe)) + + def _woe_encoder(self, x, y): + x_train_dict = {} + for col in x.columns: + dfx, cut, woe, iv = self._pf_bin(y, x[col]) + self._bins[col] = (dfx, cut, woe, iv) + # replacing by the WOE encode + x_train_dict[col] = self._replace_woe(x[col], cut, woe) + return pd.DataFrame.from_dict(x_train_dict) + + def sqlflow_train_loop(self, dataset, epochs=1, verbose=0): + x_df, y_df = self._to_dataframe(dataset) + x = self._woe_encoder(x_df, y_df['label']) + x.to_csv("/tmp/train_woe.csv") + lr = LogisticRegression() + + x_train, x_test, y_train, y_test = train_test_split(x, y_df['label']) + lr.fit(x_train, y_train) + prob = lr.predict_proba(x_test)[:, 1] + auc_score = roc_auc_score(y_test, prob) + print("AUC: {}\n".format(auc_score)) + + # print the score card + print("TARGET SCORE: %d" % self._target_score) + coe = lr.coef_ + for i, col_name in enumerate(x_df.columns): + bin_cols = self._bins[col_name][0].index.to_list() + for j, w in enumerate(self._bins[col_name][2]): + print(col_name, bin_cols[j], + round(coe[0][i] * w * self._factor + + self._offset/self._pf_bin_size, 0)) diff --git a/sqlflow_models/simple_dnn_generator.py b/sqlflow_models/simple_dnn_generator.py new file mode 100644 index 0000000..e63ef71 --- /dev/null +++ b/sqlflow_models/simple_dnn_generator.py @@ -0,0 +1,150 @@ +# This file is based on the AdaNet example +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import functools +import adanet +import tensorflow as tf + +_NUM_LAYERS_KEY = "num_layers" + + +class _SimpleDNNBuilder(adanet.subnetwork.Builder): + """Builds a DNN subnetwork for AdaNet.""" + + def __init__(self, feature_columns, optimizer, layer_size, num_layers, learn_mixture_weights, + seed): + """Initializes a `_DNNBuilder`. + + Args: + optimizer: An `Optimizer` instance for training both the subnetwork and + the mixture weights. + layer_size: The number of nodes to output at each hidden layer. + num_layers: The number of hidden layers. + learn_mixture_weights: Whether to solve a learning problem to find the + best mixture weights, or use their default value according to the + mixture weight type. When `False`, the subnetworks will return a no_op + for the mixture weight train op. + seed: A random seed. + + Returns: + An instance of `_SimpleDNNBuilder`. + """ + + self._optimizer = optimizer + self._layer_size = layer_size + self._num_layers = num_layers + self._learn_mixture_weights = learn_mixture_weights + self._feature_columns = feature_columns + self._seed = seed + + def build_subnetwork(self, + features, + logits_dimension, + training, + iteration_step, + summary, + previous_ensemble=None): + """See `adanet.subnetwork.Builder`.""" + + input_layer = tf.compat.v1.feature_column.input_layer(features, self._feature_columns) + kernel_initializer = tf.compat.v1.glorot_uniform_initializer(seed=self._seed) + last_layer = input_layer + for _ in range(self._num_layers): + last_layer = tf.compat.v1.layers.dense( + last_layer, + units=self._layer_size, + activation=tf.nn.relu, + kernel_initializer=kernel_initializer) + logits = tf.compat.v1.layers.dense( + last_layer, + units=logits_dimension, + kernel_initializer=kernel_initializer) + + persisted_tensors = {_NUM_LAYERS_KEY: tf.constant(self._num_layers)} + return adanet.Subnetwork( + last_layer=last_layer, + logits=logits, + complexity=self._measure_complexity(), + persisted_tensors=persisted_tensors) + + def _measure_complexity(self): + """Approximates Rademacher complexity as the square-root of the depth.""" + return tf.sqrt(tf.cast(self._num_layers, tf.float32)) + + def build_subnetwork_train_op(self, subnetwork, loss, var_list, labels, + iteration_step, summary, previous_ensemble): + """See `adanet.subnetwork.Builder`.""" + return self._optimizer.minimize(loss=loss, var_list=var_list) + + def build_mixture_weights_train_op(self, loss, var_list, logits, labels, + iteration_step, summary): + """See `adanet.subnetwork.Builder`.""" + + if not self._learn_mixture_weights: + return tf.no_op() + return self._optimizer.minimize(loss=loss, var_list=var_list) + + @property + def name(self): + """See `adanet.subnetwork.Builder`.""" + + if self._num_layers == 0: + # A DNN with no hidden layers is a linear model. + return "linear" + return "{}_layer_dnn".format(self._num_layers) + + +class SimpleDNNGenerator(adanet.subnetwork.Generator): + """Generates a two DNN subnetworks at each iteration. + + The first DNN has an identical shape to the most recently added subnetwork + in `previous_ensemble`. The second has the same shape plus one more dense + layer on top. This is similar to the adaptive network presented in Figure 2 of + [Cortes et al. ICML 2017](https://arxiv.org/abs/1607.01097), without the + connections to hidden layers of networks from previous iterations. + """ + + def __init__(self, optimizers, feature_columns, layer_size, learn_mixture_weights, seed): + """Initializes a DNN `Generator`. + + Args: + optimizers: A defaultdict of string for training both the subnetwork and + the mixture weights. + layer_size: Number of nodes in each hidden layer of the subnetwork + candidates. Note that this parameter is ignored in a DNN with no hidden + layers. + learn_mixture_weights: Whether to solve a learning problem to find the + best mixture weights, or use their default value according to the + mixture weight type. When `False`, the subnetworks will return a no_op + for the mixture weight train op. + seed: A random seed. + + Returns: + An instance of `Generator`. + """ + + self._seed = seed + self._optimizers = optimizers + self._dnn_builder_fn = functools.partial( + _SimpleDNNBuilder, + layer_size=layer_size, + feature_columns=feature_columns, + learn_mixture_weights=learn_mixture_weights) + + def generate_candidates(self, previous_ensemble, iteration_number, + previous_ensemble_reports, all_reports): + """See `adanet.subnetwork.Generator`.""" + + num_layers = 0 + seed = self._seed + if previous_ensemble: + num_layers = tf.get_static_value( + previous_ensemble.weighted_subnetworks[ + -1].subnetwork.persisted_tensors[_NUM_LAYERS_KEY]) + if seed is not None: + seed += iteration_number + optimizer = self._optimizers[num_layers + 0] + return [self._dnn_builder_fn(num_layers=num_layers, optimizer=optimizer, seed=seed), + self._dnn_builder_fn(num_layers=num_layers + 1, optimizer=optimizer, seed=seed)] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..a76f56e --- /dev/null +++ b/tests/base.py @@ -0,0 +1,50 @@ +import tensorflow as tf +import unittest +import sys + +def train_input_fn(features, labels, batch_size=32): + dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels)) + dataset = dataset.shuffle(1000).repeat().batch(batch_size) + return dataset + + +def eval_input_fn(features, labels, batch_size=32): + dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels)) + dataset = dataset.batch(batch_size) + return dataset + +class BaseTestCases: + class BaseTest(unittest.TestCase): + def setUp(self): + self.model, self.features, self.label = None, {}, None + + def test_train_and_predict(self): + self.setUp() + model_pkg = sys.modules[self.model_class.__module__] + self.model.compile(optimizer=model_pkg.optimizer(), + loss=model_pkg.loss, + metrics=["accuracy"]) + self.history = self.model.fit(train_input_fn(self.features, self.label), + epochs=10, + steps_per_epoch=200, + verbose=1) + self.historyloss = self.history.history['loss'] + loss_decline_rate = (self.historyloss[0] - self.historyloss[-1]) \ + / self.historyloss[0] + print('historyloss is {}, and the loss_decline_rate is {}'.\ + format(self.historyloss, loss_decline_rate)) + assert(loss_decline_rate > 0.3) + + class BaseEstimatorTest(BaseTest): + def test_train_and_predict(self): + self.setUp() + input_fn = lambda: train_input_fn(self.features, self.label) + train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=1) + eval_spec = tf.estimator.EvalSpec(input_fn=lambda: eval_input_fn(self.features, self.label)) + baseline = tf.estimator.train_and_evaluate(self.model, train_spec, eval_spec)[0] + train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=2000) + result = tf.estimator.train_and_evaluate(self.model, train_spec, eval_spec)[0] + loss_decline_rate = 1- result["loss"] / baseline["loss"] + print('historyloss is {}, and the loss_decline_rate is {}'.\ + format(baseline["loss"], loss_decline_rate)) + assert(loss_decline_rate > 0.3) diff --git a/tests/test_arima_with_stl_decomposition.py b/tests/test_arima_with_stl_decomposition.py new file mode 100644 index 0000000..1694cad --- /dev/null +++ b/tests/test_arima_with_stl_decomposition.py @@ -0,0 +1,61 @@ +from sqlflow_models import ARIMAWithSTLDecomposition +import unittest +import tensorflow as tf +from datetime import datetime, timedelta +import numpy as np + +class TestARIMAWithSTLDecompose(unittest.TestCase): + def setUp(self): + self.order = [7, 0, 2] + self.period = [7, 30] + self.date_format = '%Y-%m-%d' + self.train_start = '2014-04-01' + self.train_end = '2014-08-31' + self.forecast_start = '2014-09-01' + self.forecast_end = '2014-09-30' + + def str2datetime(self, date_str): + if isinstance(date_str, bytes): + date_str = date_str.decode('utf-8') + return datetime.strptime(str(date_str), self.date_format) + + def datetime2str(self, date): + return datetime.strftime(date, self.date_format) + + def create_dataset(self): + def generator(): + start_date = self.str2datetime(self.train_start) + end_date = self.str2datetime(self.train_end) + delta = timedelta(days=1) + while start_date <= end_date: + date_str = np.array(self.datetime2str(start_date)) + label = np.random.random(size=[1]) * 1e8 + yield date_str, label + start_date += delta + + def dict_mapper(date_str, label): + return {'time': date_str}, label + + dataset = tf.data.Dataset.from_generator( + generator, output_types=(tf.dtypes.string, tf.dtypes.float32) + ) + dataset = dataset.map(dict_mapper) + return dataset + + def prediction_days(self): + pred_start = self.str2datetime(self.forecast_start) + pred_end = self.str2datetime(self.forecast_end) + return (pred_end - pred_start).days + 1 + + def test_main(self): + model = ARIMAWithSTLDecomposition(order=[7, 0, 2], + period=[7, 30], + date_format=self.date_format, + forecast_start=self.forecast_start, + forecast_end=self.forecast_end) + prediction = model.sqlflow_train_loop(self.create_dataset()) + self.assertEqual(len(prediction), self.prediction_days()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_auto_estimator.py b/tests/test_auto_estimator.py new file mode 100644 index 0000000..7fca1c7 --- /dev/null +++ b/tests/test_auto_estimator.py @@ -0,0 +1,52 @@ +import sqlflow_models +from tests.base import BaseTestCases, train_input_fn, eval_input_fn + +import sys +import tensorflow as tf +import unittest +import numpy as np +from sklearn.datasets import load_iris, load_boston + +class TestAutoClassifier(BaseTestCases.BaseEstimatorTest): + def setUp(self): + x, y = load_iris(return_X_y=True) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + + self.model_class = sqlflow_models.AutoClassifier + self.model = sqlflow_models.AutoClassifier(feature_columns=feature_columns, n_classes=3) + +class TestAutoBinaryClassifier(BaseTestCases.BaseEstimatorTest): + def setUp(self): + x, y = load_iris(return_X_y=True) + x = np.array([x[i] for i, v in enumerate(y) if v != 2]) + y = np.array([y[i] for i, v in enumerate(y) if v != 2]) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + + self.model_class = sqlflow_models.AutoClassifier + self.model = sqlflow_models.AutoClassifier(feature_columns=feature_columns) + +class TestAutoRegressor(BaseTestCases.BaseEstimatorTest): + def setUp(self): + x, y = load_boston(return_X_y=True) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + self.model_class = sqlflow_models.AutoRegressor + self.model = sqlflow_models.AutoRegressor(feature_columns=feature_columns) + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_deep_embedding_cluster.py b/tests/test_deep_embedding_cluster.py new file mode 100644 index 0000000..20e9310 --- /dev/null +++ b/tests/test_deep_embedding_cluster.py @@ -0,0 +1,105 @@ +from tensorflow.python.keras.losses import kld + +import sqlflow_models +from tests.base import BaseTestCases, eval_input_fn + +import tensorflow as tf +import unittest +from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score +from sklearn.utils.linear_assignment_ import linear_assignment +import numpy as np +from tensorflow.python import keras +import sys + + +def train_input_fn(features, batch_size=32): + dataset = tf.data.Dataset.from_tensor_slices(dict(features)) + dataset = dataset.shuffle(1000).repeat(1).batch(batch_size) + return dataset + +ari = adjusted_rand_score +nmi = normalized_mutual_info_score + + +def acc(y_true, y_pred): + """ + Calculate clustering accuracy. + Using the Hungarian algorithm to solve linear assignment problem. + """ + y_true = y_true.astype(np.int64) + assert y_pred.size == y_true.size + dims = max(y_pred.max(), y_true.max()) + 1 + w = np.zeros((dims, dims), dtype=np.int64) + for i in range(y_pred.size): + w[y_pred[i], y_true[i]] += 1 + + ind = linear_assignment(w.max() - w) + return sum([w[i, j] for i, j in ind]) * 1.0 / y_pred.size + + +def evaluate(x, y, model): + metric = dict() + q = model.predict(x) + y_pred = q.argmax(1) + metric['acc'] = np.round(acc(y, y_pred), 5) + metric['nmi'] = np.round(nmi(y, y_pred), 5) + metric['ari'] = np.round(ari(y, y_pred), 5) + return metric + + +class TestDeepEmbeddingCluster(BaseTestCases.BaseTest): + def setUp(self): + (train_data, train_labels), (test_data, test_labels) = keras.datasets.mnist.load_data() + x = np.concatenate((train_data, test_data)) + y = np.concatenate((train_labels, test_labels)) + x = x.reshape((x.shape[0], -1)) + x = np.divide(x, 255.) + # Sample + x = x[:100] + y = y[:100] + # Generate Data + feature_num = x.shape[1] + feature_column_names = ['col_{}'.format(d) for d in range(feature_num)] + + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + pretrain_dims = [500, 500, 2000, 10] + # Init model + self.model = sqlflow_models.DeepEmbeddingClusterModel(feature_columns=feature_columns, + n_clusters=10, + kmeans_init=20, + run_pretrain=True, + existed_pretrain_model=None, + pretrain_dims=pretrain_dims, + pretrain_activation_func='relu', + pretrain_use_callbacks=True, + pretrain_cbearlystop_patience=10, + pretrain_cbearlystop_mindelta=0.0001, + pretrain_cbreduce_patience=5, + pretrain_cbreduce_factor=0.2, + pretrain_epochs=20, + pretrain_initializer='glorot_uniform', + train_max_iters=500, + update_interval=100, + train_use_tol=True, + tol=0.0001, + loss=kld) + self.model_class = sqlflow_models.DeepEmbeddingClusterModel + + def test_train_and_predict(self): + self.setUp() + model_pkg = sys.modules[self.model_class.__module__] + self.model.compile(optimizer=model_pkg.optimizer(), + loss=model_pkg.loss) + self.model.sqlflow_train_loop(train_input_fn(self.features)) + metric = evaluate(x=eval_input_fn(self.features, self.label), y=self.label, model=self.model) + print(metric) + assert (metric['acc'] > 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_dnnclassifier.py b/tests/test_dnnclassifier.py new file mode 100644 index 0000000..a8f03fa --- /dev/null +++ b/tests/test_dnnclassifier.py @@ -0,0 +1,41 @@ +import sqlflow_models +from tests.base import BaseTestCases + +import tensorflow as tf +import unittest +import numpy as np +from sklearn.datasets import load_iris + +class TestDNNClassifier(BaseTestCases.BaseTest): + def setUp(self): + x, y = load_iris(return_X_y=True) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + + self.model_class = sqlflow_models.DNNClassifier + self.model = sqlflow_models.DNNClassifier(feature_columns=feature_columns, n_classes=3) + +class TestDNNBinaryClassifier(BaseTestCases.BaseTest): + def setUp(self): + x, y = load_iris(return_X_y=True) + x = np.array([x[i] for i, v in enumerate(y) if v != 2]) + y = np.array([y[i] for i, v in enumerate(y) if v != 2]) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + + self.model_class = sqlflow_models.DNNClassifier + self.model = sqlflow_models.DNNClassifier(feature_columns=feature_columns, n_classes=2) + + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_dnnclassifier_functional_api_example.py b/tests/test_dnnclassifier_functional_api_example.py new file mode 100644 index 0000000..194fa66 --- /dev/null +++ b/tests/test_dnnclassifier_functional_api_example.py @@ -0,0 +1,41 @@ +import sqlflow_models +from tests.base import BaseTestCases + +import tensorflow as tf +import unittest + +from sklearn.datasets import load_iris + + +def train_input_fn(features, labels, batch_size=32): + dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels)) + dataset = dataset.shuffle(1000).repeat().batch(batch_size) + return dataset + + +def eval_input_fn(features, labels, batch_size=32): + dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels)) + dataset = dataset.batch(batch_size) + return dataset + +class TestDNNClassifier(BaseTestCases.BaseTest): + def setUp(self): + x, y = load_iris(return_X_y=True) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + fieldmetas = { + "col_0": {"feature_name": "col_0", "shape": [1], "dtype": tf.float32}, + "col_1": {"feature_name": "col_1", "shape": [1], "dtype": tf.float32}, + "col_2": {"feature_name": "col_2", "shape": [1], "dtype": tf.float32}, + "col_3": {"feature_name": "col_3", "shape": [1], "dtype": tf.float32}, + } + self.model = sqlflow_models.dnnclassifier_functional_model(feature_columns=feature_columns, field_metas=fieldmetas, n_classes=3) + self.model_class = sqlflow_models.dnnclassifier_functional_model + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_dnnregressor.py b/tests/test_dnnregressor.py new file mode 100644 index 0000000..95d5e07 --- /dev/null +++ b/tests/test_dnnregressor.py @@ -0,0 +1,24 @@ +import sqlflow_models +from tests.base import BaseTestCases + +import tensorflow as tf +import unittest +from sklearn.datasets import load_boston + + +class TestDNNRegressor(BaseTestCases.BaseTest): + def setUp(self): + x, y = load_boston(return_X_y=True) + feature_column_names = ['col_{}'.format(d) for d in range(x.shape[1])] + self.features = {} + for feature_name, feature_values in zip(feature_column_names, list(x.T)): + self.features[feature_name] = feature_values + self.label = y + feature_columns = [tf.feature_column.numeric_column(key) for key in self.features] + self.model_class = sqlflow_models.DNNRegressor + self.model = sqlflow_models.DNNRegressor(feature_columns=feature_columns) + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_gcn.py b/tests/test_gcn.py new file mode 100644 index 0000000..dac9850 --- /dev/null +++ b/tests/test_gcn.py @@ -0,0 +1,88 @@ +import sqlflow_models +from tests.base import BaseTestCases + +import tensorflow as tf +import numpy as np +import unittest +import random + + +def build_karate_club_graph(): + # All 78 edges are stored in two numpy arrays. One for source endpoints + # while the other for destination endpoints. + # Credit to: https://docs.dgl.ai/tutorials/basics/1_first.html + src = np.array([1, 2, 2, 3, 3, 3, 4, 5, 6, 6, 6, 7, 7, 7, 7, 8, 8, 9, 10, 10, + 10, 11, 12, 12, 13, 13, 13, 13, 16, 16, 17, 17, 19, 19, 21, 21, + 25, 25, 27, 27, 27, 28, 29, 29, 30, 30, 31, 31, 31, 31, 32, 32, + 32, 32, 32, 32, 32, 32, 32, 32, 32, 33, 33, 33, 33, 33, 33, 33, + 33, 33, 33, 33, 33, 33, 33, 33, 33, 33]) + dst = np.array([0, 0, 1, 0, 1, 2, 0, 0, 0, 4, 5, 0, 1, 2, 3, 0, 2, 2, 0, 4, + 5, 0, 0, 3, 0, 1, 2, 3, 5, 6, 0, 1, 0, 1, 0, 1, 23, 24, 2, 23, + 24, 2, 23, 26, 1, 8, 0, 24, 25, 28, 2, 8, 14, 15, 18, 20, 22, 23, + 29, 30, 31, 8, 9, 13, 14, 15, 18, 19, 20, 22, 23, 26, 27, 28, 29, 30, + 31, 32]) + u = np.concatenate([src, dst]) + v = np.concatenate([dst, src]) + u = np.expand_dims(u, axis=1) + v = np.expand_dims(v, axis=1) + return np.concatenate([u,v], 1) + +def acc(y, label): + '''Function to calculate the accuracy.''' + ll = tf.equal(tf.argmax(label, -1), tf.argmax(y, -1)) + accuarcy = tf.reduce_mean(tf.cast(ll, dtype=tf.float32)) + return accuarcy + +def evaluate(x, y, model): + '''Function to evaluate the performance of model.''' + metric = dict() + y_pred = model.predict(x) + metric['acc'] = np.round(acc(y, y_pred), 5) + return metric + +class TestGCN(BaseTestCases.BaseTest): + def setUp(self): + feature = [[0,1,2]+random.sample(range(3, 20), 8), + [0,1,2]+random.sample(range(18, 40),8), + [0,1,2]+random.sample(range(38, 60),8), + [0,1,2]+random.sample(range(58, 80),8)] + label = ['Shotokan', 'Gōjū-ryū', 'Wadō-ryū', 'Shitō-ryū'] + nodes = np.array(list(range(34))) + edges = build_karate_club_graph() + features, labels = list(), list() + for i in range(34): + idx = random.randint(0,3) + features.append(np.eye(81)[feature[idx]].sum(0)) + labels.append(label[idx]) + self.inputs = [dict() for i in range(len(edges)*2)] + self.labels = list() + for i in range(len(edges)): + self.inputs[i]['id'] = tf.convert_to_tensor(edges[i][0]) + self.inputs[i]['features'] = tf.convert_to_tensor(features[edges[i][0]]) + self.inputs[i]['from_node_id'] = tf.convert_to_tensor(edges[i][0]) + self.inputs[i]['to_node_id'] = tf.convert_to_tensor(edges[i][1]) + self.labels.append(tf.convert_to_tensor([labels[edges[i][0]]])) + for i in range(len(edges)): + self.inputs[i+len(edges)]['id'] = tf.convert_to_tensor(edges[i][1]) + self.inputs[i+len(edges)]['features'] = tf.convert_to_tensor(features[edges[i][1]]) + self.inputs[i+len(edges)]['from_node_id'] = tf.convert_to_tensor(edges[i][0]) + self.inputs[i+len(edges)]['to_node_id'] = tf.convert_to_tensor(edges[i][1]) + self.labels.append(tf.convert_to_tensor([labels[edges[i][1]]])) + self.model = sqlflow_models.GCN(nhid=16, nclass=4, epochs=20, train_ratio=0.2, eval_ratio=0.15) + self.model_class = sqlflow_models.GCN + + def test_train_and_predict(self): + self.setUp() + self.model.compile(optimizer=optimizer(), + loss='categorical_crossentropy') + self.model.sqlflow_train_loop(zip(self.inputs, self.labels)) + metric = evaluate([self.model.features, self.model.adjacency], self.model.labels, self.model) + assert (metric['acc'] > 0) + +def optimizer(): + return tf.keras.optimizers.Adam(lr=0.01) + +if __name__ == '__main__': + unittest.main() + + diff --git a/tests/test_one_class_svm.py b/tests/test_one_class_svm.py new file mode 100644 index 0000000..9baa3cf --- /dev/null +++ b/tests/test_one_class_svm.py @@ -0,0 +1,63 @@ +# Copyright 2020 The SQLFlow Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import tempfile +import unittest + +import numpy as np +import tensorflow as tf +from sqlflow_models import OneClassSVM +from sqlflow_models.one_class_svm import dataset_reader + + +class TestOneClassSVM(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.old_cwd = os.getcwd() + os.chdir(self.tmp_dir) + + def tearDown(self): + os.chdir(self.old_cwd) + shutil.rmtree(self.tmp_dir) + + def create_dataset(self): + def generator(): + for _ in range(10): + x1 = np.random.random(size=[1, 1]) + x2 = np.random.random(size=[1, 1]) + yield x1, x2 + + def dict_mapper(x1, x2): + return {"x1": x1, "x2": x2} + + dataset = tf.data.Dataset.from_generator( + generator, output_types=(tf.dtypes.float32, tf.dtypes.float32)) + return dataset.map(dict_mapper) + + def test_main(self): + svm = OneClassSVM() + train_dataset = self.create_dataset() + svm.sqlflow_train_loop(train_dataset) + + predict_dataset = self.create_dataset() + for features in dataset_reader(predict_dataset): + pred = svm.sqlflow_predict_one(features)[0] + pred = np.array(pred) + self.assertEqual(pred.shape, (1,)) + self.assertTrue(pred[0] == 1 or pred[0] == -1) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_rnn.py b/tests/test_rnn.py new file mode 100644 index 0000000..856b1dd --- /dev/null +++ b/tests/test_rnn.py @@ -0,0 +1,108 @@ +import sqlflow_models +from tests.base import BaseTestCases + +import tensorflow as tf +import numpy as np +import unittest + + +class TestStackedRNNClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {"c1": np.array([int(x) for x in range(800)]).reshape(100, 8)} + self.label = [0 for _ in range(50)] + [1 for _ in range(50)] + fea = tf.feature_column.sequence_categorical_column_with_identity( + key="c1", + num_buckets=800 + ) + + emb = tf.feature_column.embedding_column( + fea, + dimension=32) + feature_columns = [emb] + self.model = sqlflow_models.StackedRNNClassifier(feature_columns=feature_columns, stack_units=[64, 32], model_type='rnn') + self.model_class = sqlflow_models.StackedRNNClassifier + +class TestStackedBiRNNClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {"c1": np.array([int(x) for x in range(800)]).reshape(100, 8)} + self.label = [0 for _ in range(50)] + [1 for _ in range(50)] + fea = tf.feature_column.sequence_categorical_column_with_identity( + key="c1", + num_buckets=800 + ) + + emb = tf.feature_column.embedding_column( + fea, + dimension=32) + feature_columns = [emb] + self.model = sqlflow_models.StackedRNNClassifier(feature_columns=feature_columns, stack_units=[64, 32], model_type='rnn', bidirectional=True) + self.model_class = sqlflow_models.StackedRNNClassifier + +class TestStackedLSTMClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {"c1": np.array([int(x) for x in range(800)]).reshape(100, 8)} + self.label = [0 for _ in range(50)] + [1 for _ in range(50)] + fea = tf.feature_column.sequence_categorical_column_with_identity( + key="c1", + num_buckets=800 + ) + + emb = tf.feature_column.embedding_column( + fea, + dimension=32) + feature_columns = [emb] + self.model = sqlflow_models.StackedRNNClassifier(feature_columns=feature_columns, stack_units=[64, 32], model_type='lstm') + self.model_class = sqlflow_models.StackedRNNClassifier + +class TestStackedBiLSTMClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {"c1": np.array([int(x) for x in range(800)]).reshape(100, 8)} + self.label = [0 for _ in range(50)] + [1 for _ in range(50)] + fea = tf.feature_column.sequence_categorical_column_with_identity( + key="c1", + num_buckets=800 + ) + + emb = tf.feature_column.embedding_column( + fea, + dimension=32) + feature_columns = [emb] + self.model = sqlflow_models.StackedRNNClassifier(feature_columns=feature_columns, stack_units=[64, 32], model_type='lstm', bidirectional=True) + self.model_class = sqlflow_models.StackedRNNClassifier + +class TestStackedGRUClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {"c1": np.array([int(x) for x in range(800)]).reshape(100, 8)} + self.label = [0 for _ in range(50)] + [1 for _ in range(50)] + fea = tf.feature_column.sequence_categorical_column_with_identity( + key="c1", + num_buckets=800 + ) + + emb = tf.feature_column.embedding_column( + fea, + dimension=32) + feature_columns = [emb] + self.model = sqlflow_models.StackedRNNClassifier(feature_columns=feature_columns, stack_units=[64, 32], model_type='gru') + self.model_class = sqlflow_models.StackedRNNClassifier + +class TestStackedBiGRUClassifier(BaseTestCases.BaseTest): + def setUp(self): + self.features = {"c1": np.array([int(x) for x in range(800)]).reshape(100, 8)} + self.label = [0 for _ in range(50)] + [1 for _ in range(50)] + fea = tf.feature_column.sequence_categorical_column_with_identity( + key="c1", + num_buckets=800 + ) + + emb = tf.feature_column.embedding_column( + fea, + dimension=32) + feature_columns = [emb] + self.model = sqlflow_models.StackedRNNClassifier(feature_columns=feature_columns, stack_units=[64, 32], model_type='gru', bidirectional=True) + self.model_class = sqlflow_models.StackedRNNClassifier + +if __name__ == '__main__': + unittest.main() + + diff --git a/tests/test_rnnts.py b/tests/test_rnnts.py new file mode 100644 index 0000000..035b80e --- /dev/null +++ b/tests/test_rnnts.py @@ -0,0 +1,73 @@ +import sqlflow_models +from tests.base import BaseTestCases + +import tensorflow as tf +import numpy as np +np.random.seed(22) +import unittest + + +class TestRNNBasedTimeSeriesModel(BaseTestCases.BaseTest): + def setUp(self): + # We use sin data plus perturbation to simulate time series data + time_series_data = np.sin(np.arange(56)) + np.random.normal(0, 0.01, 56) + x = np.array(time_series_data).reshape(8, 7) + y = np.array(np.arange(8).reshape(8, 1)) + self.features = {"col1": x} + self.label = y + self.n_in = 7 + self.n_out = 1 + # time_window=n_in, num_features=n_out + feature_columns = [tf.feature_column.numeric_column(key, shape=(self.n_in, self.n_out)) for key in self.features] + self.model = sqlflow_models.RNNBasedTimeSeriesModel( + feature_columns=feature_columns, + stack_units=[50, 50], + n_in=self.n_in, + n_out=self.n_out, + model_type='rnn') + self.model_class = sqlflow_models.RNNBasedTimeSeriesModel + +class TestLSTMBasedTimeSeriesModel(BaseTestCases.BaseTest): + def setUp(self): + # We use sin data plus perturbation to simulate time series data + time_series_data = np.sin(np.arange(56)) + np.random.normal(0, 0.01, 56) + x = np.array(time_series_data).reshape(8, 7) + y = np.array(np.arange(8).reshape(8, 1)) + self.features = {"col1": x} + self.label = y + self.n_in = 7 + self.n_out = 1 + # time_window=n_in, num_features=n_out + feature_columns = [tf.feature_column.numeric_column(key, shape=(self.n_in, self.n_out)) for key in self.features] + self.model = sqlflow_models.RNNBasedTimeSeriesModel( + feature_columns=feature_columns, + stack_units=[50, 50], + n_in=self.n_in, + n_out=self.n_out, + model_type='lstm') + self.model_class = sqlflow_models.RNNBasedTimeSeriesModel + +class TestGRUBasedTimeSeriesModel(BaseTestCases.BaseTest): + def setUp(self): + # We use sin data plus perturbation to simulate time series data + time_series_data = np.sin(np.arange(56)) + np.random.normal(0, 0.01, 56) + x = np.array(time_series_data).reshape(8, 7) + y = np.array(np.arange(8).reshape(8, 1)) + self.features = {"col1": x} + self.label = y + self.n_in = 7 + self.n_out = 1 + # time_window=n_in, num_features=n_out + feature_columns = [tf.feature_column.numeric_column(key, shape=(self.n_in, self.n_out)) for key in self.features] + self.model = sqlflow_models.RNNBasedTimeSeriesModel( + feature_columns=feature_columns, + stack_units=[50, 50], + n_in=self.n_in, + n_out=self.n_out, + model_type='gru') + self.model_class = sqlflow_models.RNNBasedTimeSeriesModel + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_score_card.py b/tests/test_score_card.py new file mode 100644 index 0000000..7c2c194 --- /dev/null +++ b/tests/test_score_card.py @@ -0,0 +1,34 @@ +from sqlflow_models import ScoreCard +import unittest +import tensorflow as tf +from datetime import datetime, timedelta +import numpy as np + + +class TestScoreCard(unittest.TestCase): + def create_dataset(self): + samples = 20 + f = [np.random.randint(20, size=1) for i in range(samples)] + label = [np.random.randint(2, size=1) for i in range(samples)] + + def generator(): + for i, item in enumerate(f): + yield [f[i]], label[i] + + def dict_mapper(feature, label): + return {'f1': feature}, label + + dataset = tf.data.Dataset.from_generator( + generator, output_types=(tf.dtypes.float32, tf.dtypes.float32) + ) + dataset = dataset.map(dict_mapper) + return dataset + + def test_train(self): + dataset = self.create_dataset() + m = ScoreCard(pf_bin_size=2) + m.sqlflow_train_loop(dataset) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_version.py b/tests/test_version.py new file mode 100644 index 0000000..b412edb --- /dev/null +++ b/tests/test_version.py @@ -0,0 +1,5 @@ +import sqlflow_models + + +def test_answer(): + assert sqlflow_models.__version__ == sqlflow_models._version.__version__