diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml
index ea06d395ea..9602d54059 100644
--- a/.github/.OwlBot.lock.yaml
+++ b/.github/.OwlBot.lock.yaml
@@ -1,3 +1,3 @@
docker:
image: gcr.io/repo-automation-bots/owlbot-python:latest
- digest: sha256:58c7342b0bccf85028100adaa3d856cb4a871c22ca9c01960d996e66c40548ce
+ digest: sha256:b8c131c558606d3cea6e18f8e87befbd448c1482319b0db3c5d5388fa6ea72e3
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 4f00c7cffc..62eb5a77d9 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -16,7 +16,7 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
- rev: v3.4.0
+ rev: v4.0.1
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6437eba3d..0bbc465238 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,27 @@
# Changelog
+### [1.1.1](https://www.github.com/googleapis/python-aiplatform/compare/v1.1.0...v1.1.1) (2021-06-22)
+
+
+### Features
+
+* add cancel method to pipeline client ([#488](https://www.github.com/googleapis/python-aiplatform/issues/488)) ([3b19fff](https://www.github.com/googleapis/python-aiplatform/commit/3b19fff399b85c92e661eb83a48a4c6636423518))
+
+
+### Bug Fixes
+
+* check if training_task_metadata is populated before logging backingCustomJob ([#494](https://www.github.com/googleapis/python-aiplatform/issues/494)) ([2e627f8](https://www.github.com/googleapis/python-aiplatform/commit/2e627f876e1d7dd03e5d6bd2e81e6234e361a9df))
+
+
+### Documentation
+
+* omit mention of Python 2.7 in 'CONTRIBUTING.rst' ([#1127](https://www.github.com/googleapis/python-aiplatform/issues/1127)) ([#489](https://www.github.com/googleapis/python-aiplatform/issues/489)) ([cbc47f8](https://www.github.com/googleapis/python-aiplatform/commit/cbc47f862f291b00b85718498571e0c737cb26a6))
+
+
+### Miscellaneous Chores
+
+* release 1.1.1 ([1a38ce2](https://www.github.com/googleapis/python-aiplatform/commit/1a38ce2f9879e1c42c0c6b25b72bd4836e3a1f73))
+
## [1.1.0](https://www.github.com/googleapis/python-aiplatform/compare/v1.0.1...v1.1.0) (2021-06-17)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index f865e3769d..4fc605b7ef 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -69,7 +69,6 @@ We use `nox `__ to instrument our tests.
- To test your changes, run unit tests with ``nox``::
- $ nox -s unit-2.7
$ nox -s unit-3.8
$ ...
@@ -144,7 +143,6 @@ Running System Tests
# Run all system tests
$ nox -s system-3.8
- $ nox -s system-2.7
# Run a single system test
$ nox -s system-3.8 -- -k
@@ -152,9 +150,8 @@ Running System Tests
.. note::
- System tests are only configured to run under Python 2.7 and
- Python 3.8. For expediency, we do not run them in older versions
- of Python 3.
+ System tests are only configured to run under Python 3.8.
+ For expediency, we do not run them in older versions of Python 3.
This alone will not run the tests. You'll need to change some local
auth settings and change some configuration in your project to
diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py
index 8bf06ccb49..84b39c2ae8 100644
--- a/google/cloud/aiplatform/pipeline_jobs.py
+++ b/google/cloud/aiplatform/pipeline_jobs.py
@@ -264,7 +264,7 @@ def state(self) -> Optional[gca_pipeline_state_v1beta1.PipelineState]:
@property
def _has_run(self) -> bool:
"""Helper property to check if this pipeline job has been run."""
- return bool(self._gca_resource.name)
+ return bool(self._gca_resource.create_time)
@property
def has_failed(self) -> bool:
@@ -310,3 +310,19 @@ def _block_until_complete(self):
log_wait = min(log_wait * multiplier, max_wait)
previous_time = current_time
time.sleep(wait)
+
+ def cancel(self) -> None:
+ """Starts asynchronous cancellation on the PipelineJob. The server
+ makes a best effort to cancel the job, but success is not guaranteed.
+ On successful cancellation, the PipelineJob is not deleted; instead it
+ becomes a job with state set to `CANCELLED`.
+
+ Raises:
+ RuntimeError: If this PipelineJob has not started running.
+ """
+ if not self._has_run:
+ raise RuntimeError(
+ "This PipelineJob has not been launched, use the `run()` method "
+ "to start. `cancel()` can only be called on a job that is running."
+ )
+ self.api_client.cancel_pipeline_job(name=self.resource_name)
diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py
index 99f4f088a5..0b66c74fc1 100644
--- a/google/cloud/aiplatform/training_jobs.py
+++ b/google/cloud/aiplatform/training_jobs.py
@@ -1243,7 +1243,8 @@ def _prepare_training_task_inputs_and_output_dir(
def _wait_callback(self):
if (
- self._gca_resource.training_task_metadata.get("backingCustomJob")
+ self._gca_resource.training_task_metadata
+ and self._gca_resource.training_task_metadata.get("backingCustomJob")
and not self._has_logged_custom_job
):
_LOGGER.info(f"View backing custom job:\n{self._custom_job_console_uri()}")
diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt
index 37822f739b..c68998555f 100644
--- a/samples/snippets/requirements.txt
+++ b/samples/snippets/requirements.txt
@@ -1,3 +1,3 @@
pytest==6.2.4
google-cloud-storage>=1.26.0, <2.0.0dev
-google-cloud-aiplatform==1.0.1
+google-cloud-aiplatform==1.1.0
diff --git a/setup.py b/setup.py
index f627999563..6657023335 100644
--- a/setup.py
+++ b/setup.py
@@ -21,7 +21,7 @@
import setuptools # type: ignore
name = "google-cloud-aiplatform"
-version = "1.1.0"
+version = "1.1.1"
description = "Cloud AI Platform API client library"
package_root = os.path.abspath(os.path.dirname(__file__))
diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py
index e156163348..18dc692d38 100644
--- a/tests/unit/aiplatform/test_pipeline_jobs.py
+++ b/tests/unit/aiplatform/test_pipeline_jobs.py
@@ -21,12 +21,11 @@
from unittest import mock
from importlib import reload
from unittest.mock import patch
+from datetime import datetime
from google.auth import credentials as auth_credentials
-
from google.cloud import aiplatform
from google.cloud import storage
-
from google.cloud.aiplatform import pipeline_jobs
from google.cloud.aiplatform import initializer
from google.protobuf import json_format
@@ -72,6 +71,7 @@
_TEST_PIPELINE_RESOURCE_NAME = (
f"{_TEST_PARENT}/fakePipelineJobs/{_TEST_PIPELINE_JOB_ID}"
)
+_TEST_PIPELINE_CREATE_TIME = datetime.now()
@pytest.fixture
@@ -82,13 +82,16 @@ def mock_pipeline_service_create():
mock_create_pipeline_job.return_value = gca_pipeline_job_v1beta1.PipelineJob(
name=_TEST_PIPELINE_JOB_NAME,
state=gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED,
+ create_time=_TEST_PIPELINE_CREATE_TIME,
)
yield mock_create_pipeline_job
def make_pipeline_job(state):
return gca_pipeline_job_v1beta1.PipelineJob(
- name=_TEST_PIPELINE_JOB_NAME, state=state,
+ name=_TEST_PIPELINE_JOB_NAME,
+ state=state,
+ create_time=_TEST_PIPELINE_CREATE_TIME,
)
@@ -130,6 +133,14 @@ def mock_pipeline_service_get():
yield mock_get_pipeline_job
+@pytest.fixture
+def mock_pipeline_service_cancel():
+ with mock.patch.object(
+ pipeline_service_client_v1beta1.PipelineServiceClient, "cancel_pipeline_job"
+ ) as mock_cancel_pipeline_job:
+ yield mock_cancel_pipeline_job
+
+
@pytest.fixture
def mock_load_json():
with patch.object(storage.Blob, "download_as_bytes") as mock_load_json:
@@ -155,13 +166,10 @@ def setup_method(self):
def teardown_method(self):
initializer.global_pool.shutdown(wait=True)
+ @pytest.mark.usefixtures("mock_load_json")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_service_create(
- self,
- mock_pipeline_service_create,
- mock_pipeline_service_get,
- mock_load_json,
- sync,
+ self, mock_pipeline_service_create, mock_pipeline_service_get, sync,
):
aiplatform.init(
project=_TEST_PROJECT,
@@ -213,3 +221,51 @@ def test_run_call_pipeline_service_create(
assert job._gca_resource == make_pipeline_job(
gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED
)
+
+ @pytest.mark.usefixtures(
+ "mock_pipeline_service_create", "mock_pipeline_service_get", "mock_load_json",
+ )
+ def test_cancel_pipeline_job(
+ self, mock_pipeline_service_cancel,
+ ):
+ aiplatform.init(
+ project=_TEST_PROJECT,
+ staging_bucket=_TEST_GCS_BUCKET_NAME,
+ credentials=_TEST_CREDENTIALS,
+ )
+
+ job = pipeline_jobs.PipelineJob(
+ display_name=_TEST_PIPELINE_JOB_ID,
+ template_path=_TEST_TEMPLATE_PATH,
+ job_id=_TEST_PIPELINE_JOB_ID,
+ )
+
+ job.run()
+ job.cancel()
+
+ mock_pipeline_service_cancel.assert_called_once_with(
+ name=_TEST_PIPELINE_JOB_NAME
+ )
+
+ @pytest.mark.usefixtures(
+ "mock_pipeline_service_create", "mock_pipeline_service_get", "mock_load_json",
+ )
+ def test_cancel_pipeline_job_without_running(
+ self, mock_pipeline_service_cancel,
+ ):
+ aiplatform.init(
+ project=_TEST_PROJECT,
+ staging_bucket=_TEST_GCS_BUCKET_NAME,
+ credentials=_TEST_CREDENTIALS,
+ )
+
+ job = pipeline_jobs.PipelineJob(
+ display_name=_TEST_PIPELINE_JOB_ID,
+ template_path=_TEST_TEMPLATE_PATH,
+ job_id=_TEST_PIPELINE_JOB_ID,
+ )
+
+ with pytest.raises(RuntimeError) as e:
+ job.cancel()
+
+ assert e.match(regexp=r"PipelineJob has not been launched")
diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py
index b160204d7d..0995e0cb95 100644
--- a/tests/unit/aiplatform/test_training_jobs.py
+++ b/tests/unit/aiplatform/test_training_jobs.py
@@ -443,13 +443,15 @@ def mock_pipeline_service_create():
yield mock_create_training_pipeline
-def make_training_pipeline(state):
+def make_training_pipeline(state, add_training_task_metadata=True):
return gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=state,
model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME),
training_task_inputs={"tensorboard": _TEST_TENSORBOARD_RESOURCE_NAME},
- training_task_metadata={"backingCustomJob": _TEST_CUSTOM_JOB_RESOURCE_NAME},
+ training_task_metadata={"backingCustomJob": _TEST_CUSTOM_JOB_RESOURCE_NAME}
+ if add_training_task_metadata
+ else None,
)
@@ -460,7 +462,11 @@ def mock_pipeline_service_get():
) as mock_get_training_pipeline:
mock_get_training_pipeline.side_effect = [
make_training_pipeline(
- gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING
+ gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING,
+ add_training_task_metadata=False,
+ ),
+ make_training_pipeline(
+ gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING,
),
make_training_pipeline(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED