From 476a1e857792d3426238f865159c7ed5b6dbea45 Mon Sep 17 00:00:00 2001 From: Charles Engelke Date: Wed, 30 Jan 2019 17:02:32 -0800 Subject: [PATCH 1/6] Update to use new subscribe() syntax --- dlp/jobs_test.py | 5 +- dlp/quickstart.py | 2 +- dlp/quickstart_test.py | 2 +- dlp/requirements.txt | 10 +- dlp/risk.py | 433 +++++++++++++++++++---------------------- 5 files changed, 208 insertions(+), 244 deletions(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 87c39d4c3cc..8f47fb4d428 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -26,7 +26,7 @@ @pytest.fixture(scope='session') -def create_test_job(): +def test_job_name(): import google.cloud.dlp dlp = google.cloud.dlp.DlpServiceClient() @@ -76,6 +76,5 @@ def test_list_dlp_jobs_with_job_type(capsys): assert 'Job: projects/' in out -def test_delete_dlp_job(capsys): - test_job_name = create_test_job() +def test_delete_dlp_job(test_job_name, capsys): jobs.delete_dlp_job(GCLOUD_PROJECT, test_job_name) diff --git a/dlp/quickstart.py b/dlp/quickstart.py index f905fbe1aaf..736d59ddd8f 100644 --- a/dlp/quickstart.py +++ b/dlp/quickstart.py @@ -59,7 +59,7 @@ def quickstart(project_id): } # Convert the project id into a full resource id. - parent = dlp_client.project_path(project) + parent = dlp_client.project_path(project_id) # Call the API. response = dlp_client.inspect_content(parent, inspect_config, item) diff --git a/dlp/quickstart_test.py b/dlp/quickstart_test.py index 924e7141c70..19c215fdbb0 100644 --- a/dlp/quickstart_test.py +++ b/dlp/quickstart_test.py @@ -29,7 +29,7 @@ def test_quickstart(capsys): google.cloud.dlp.DlpServiceClient, 'project_path', return_value='projects/{}'.format(GCLOUD_PROJECT)): - quickstart.quickstart() + quickstart.quickstart(GCLOUD_PROJECT) out, _ = capsys.readouterr() assert 'FIRST_NAME' in out diff --git a/dlp/requirements.txt b/dlp/requirements.txt index 404764b7743..7e812cb15f0 100644 --- a/dlp/requirements.txt +++ b/dlp/requirements.txt @@ -1,5 +1,5 @@ -google-cloud-dlp==0.9.0 -google-cloud-storage==1.13.0 -google-cloud-pubsub==0.38.0 -google-cloud-datastore==1.7.1 -google-cloud-bigquery==1.7.0 +google-cloud-dlp==0.10.0 +google-cloud-storage==1.13.2 +google-cloud-pubsub==0.39.1 +google-cloud-datastore==1.7.3 +google-cloud-bigquery==1.8.1 diff --git a/dlp/risk.py b/dlp/risk.py index 8512f054bce..9539aa71bc6 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -49,8 +49,33 @@ def numerical_risk_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + def callback(message): + try: + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + results = job.risk_details.numerical_stats_result + print('Value Range: [{}, {}]'.format( + results.min_value.integer_value, + results.max_value.integer_value)) + prev_value = None + for percent, result in enumerate(results.quantile_values): + value = result.integer_value + if prev_value != value: + print('Value at {}% quantile: {}'.format( + percent, value)) + prev_value = value + else: + # This is not the message we're looking for. + message.drop() + except Exception as e: + # Because this is executing in a thread, an exception won't be + # noted unless we print it manually. + print(e) + raise # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -92,48 +117,16 @@ def numerical_risk_analysis(project, table_project_id, dataset_id, table_id, subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) - - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() - - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - results = job.risk_details.numerical_stats_result - print('Value Range: [{}, {}]'.format( - results.min_value.integer_value, - results.max_value.integer_value)) - prev_value = None - for percent, result in enumerate(results.quantile_values): - value = result.integer_value - if prev_value != value: - print('Value at {}% quantile: {}'.format( - percent, value)) - prev_value = value - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + subscription = subscriber.subscribe(subscription_path, callback) - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() + + subscription.cancel() # [END dlp_numerical_stats] @@ -167,8 +160,37 @@ def categorical_risk_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + def callback(message): + try: + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .categorical_stats_result + .value_frequency_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Most common value occurs {} time(s)'.format( + bucket.value_frequency_upper_bound)) + print(' Least common value occurs {} time(s)'.format( + bucket.value_frequency_lower_bound)) + print(' {} unique values total.'.format( + bucket.bucket_size)) + for value in bucket.bucket_values: + print(' Value {} occurs {} time(s)'.format( + value.value.integer_value, value.count)) + else: + # This is not the message we're looking for. + message.drop() + except Exception as e: + # Because this is executing in a thread, an exception won't be + # noted unless we print it manually. + print(e) + raise # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -210,52 +232,16 @@ def categorical_risk_analysis(project, table_project_id, dataset_id, table_id, subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) - - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() - - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .categorical_stats_result - .value_frequency_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Most common value occurs {} time(s)'.format( - bucket.value_frequency_upper_bound)) - print(' Least common value occurs {} time(s)'.format( - bucket.value_frequency_lower_bound)) - print(' {} unique values total.'.format( - bucket.bucket_size)) - for value in bucket.bucket_values: - print(' Value {} occurs {} time(s)'.format( - value.value.integer_value, value.count)) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + subscription = subscriber.subscribe(subscription_path, callback) - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() + + subscription.cancel() # [END dlp_categorical_stats] @@ -288,8 +274,42 @@ def k_anonymity_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + # Create helper function for unpacking values + def get_values(obj): + return int(obj.integer_value) + + def callback(message): + try: + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .k_anonymity_result + .equivalence_class_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + if bucket.equivalence_class_size_lower_bound: + print(' Bucket size range: [{}, {}]'.format( + bucket.equivalence_class_size_lower_bound, + bucket.equivalence_class_size_upper_bound)) + for value_bucket in bucket.bucket_values: + print(' Quasi-ID values: {}'.format( + map(get_values, value_bucket.quasi_ids_values) + )) + print(' Class size: {}'.format( + value_bucket.equivalence_class_size)) + else: + # This is not the message we're looking for. + message.drop() + except Exception as e: + # Because this is executing in a thread, an exception won't be + # noted unless we print it manually. + print(e) + raise # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -334,57 +354,16 @@ def map_fields(field): subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) - - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() - - # Create helper function for unpacking values - def get_values(obj): - return int(obj.integer_value) - - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .k_anonymity_result - .equivalence_class_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - if bucket.equivalence_class_size_lower_bound: - print(' Bucket size range: [{}, {}]'.format( - bucket.equivalence_class_size_lower_bound, - bucket.equivalence_class_size_upper_bound)) - for value_bucket in bucket.bucket_values: - print(' Quasi-ID values: {}'.format( - map(get_values, value_bucket.quasi_ids_values) - )) - print(' Class size: {}'.format( - value_bucket.equivalence_class_size)) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + subscription = subscriber.subscribe(subscription_path, callback) - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() + + subscription.cancel() # [END dlp_k_anonymity] @@ -419,8 +398,44 @@ def l_diversity_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + # Create helper function for unpacking values + def get_values(obj): + return int(obj.integer_value) + + def callback(message): + try: + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = ( + job.risk_details + .l_diversity_result + .sensitive_value_frequency_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Bucket size range: [{}, {}]'.format( + bucket.sensitive_value_frequency_lower_bound, + bucket.sensitive_value_frequency_upper_bound)) + for value_bucket in bucket.bucket_values: + print(' Quasi-ID values: {}'.format( + map(get_values, value_bucket.quasi_ids_values))) + print(' Class size: {}'.format( + value_bucket.equivalence_class_size)) + for value in value_bucket.top_sensitive_values: + print((' Sensitive value {} occurs {} time(s)' + .format(value.value, value.count))) + else: + # This is not the message we're looking for. + message.drop() + except Exception as e: + # Because this is executing in a thread, an exception won't be + # noted unless we print it manually. + print(e) + raise # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -471,57 +486,14 @@ def map_fields(field): project, subscription_id) subscription = subscriber.subscribe(subscription_path) - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() - - # Create helper function for unpacking values - def get_values(obj): - return int(obj.integer_value) - - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = ( - job.risk_details - .l_diversity_result - .sensitive_value_frequency_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Bucket size range: [{}, {}]'.format( - bucket.sensitive_value_frequency_lower_bound, - bucket.sensitive_value_frequency_upper_bound)) - for value_bucket in bucket.bucket_values: - print(' Quasi-ID values: {}'.format( - map(get_values, value_bucket.quasi_ids_values))) - print(' Class size: {}'.format( - value_bucket.equivalence_class_size)) - for value in value_bucket.top_sensitive_values: - print((' Sensitive value {} occurs {} time(s)' - .format(value.value, value.count))) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() + + subscription.cancel() # [END dlp_l_diversity] @@ -562,8 +534,40 @@ def k_map_estimate_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + # Create helper function for unpacking values + def get_values(obj): + return int(obj.integer_value) + + def callback(message): + try: + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .k_map_estimation_result + .k_map_estimation_histogram) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Anonymity range: [{}, {}]'.format( + bucket.min_anonymity, bucket.max_anonymity)) + print(' Size: {}'.format(bucket.bucket_size)) + for value_bucket in bucket.bucket_values: + print(' Values: {}'.format( + map(get_values, value_bucket.quasi_ids_values))) + print(' Estimated k-map anonymity: {}'.format( + value_bucket.estimated_anonymity)) + else: + # This is not the message we're looking for. + message.drop() + except Exception as e: + # Because this is executing in a thread, an exception won't be + # noted unless we print it manually. + print(e) + raise # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -617,53 +621,14 @@ def map_fields(quasi_id, info_type): project, subscription_id) subscription = subscriber.subscribe(subscription_path) - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() - - # Create helper function for unpacking values - def get_values(obj): - return int(obj.integer_value) - - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .k_map_estimation_result - .k_map_estimation_histogram) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Anonymity range: [{}, {}]'.format( - bucket.min_anonymity, bucket.max_anonymity)) - print(' Size: {}'.format(bucket.bucket_size)) - for value_bucket in bucket.bucket_values: - print(' Values: {}'.format( - map(get_values, value_bucket.quasi_ids_values))) - print(' Estimated k-map anonymity: {}'.format( - value_bucket.estimated_anonymity)) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() + + subscription.cancel() # [END dlp_k_map] From 7138a549e46a45cdef6c970404aa5ee72c2ba491 Mon Sep 17 00:00:00 2001 From: Charles Engelke Date: Fri, 1 Feb 2019 14:39:42 -0800 Subject: [PATCH 2/6] Missed two subscribe() call changes before --- dlp/risk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index 9539aa71bc6..bb702cfb957 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -484,7 +484,7 @@ def map_fields(field): subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) try: subscription.result(timeout=timeout) @@ -619,7 +619,7 @@ def map_fields(quasi_id, info_type): subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) try: subscription.result(timeout=timeout) From 96f9d1fd1b2a6c9fbac5fdc7d5b65cd1eb315015 Mon Sep 17 00:00:00 2001 From: Charles Engelke Date: Mon, 4 Feb 2019 13:07:23 -0800 Subject: [PATCH 3/6] Cancel subscription when processed --- dlp/risk.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index 9539aa71bc6..c13cab64711 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -68,6 +68,7 @@ def callback(message): print('Value at {}% quantile: {}'.format( percent, value)) prev_value = value + subscription.cancel() else: # This is not the message we're looking for. message.drop() @@ -109,9 +110,6 @@ def callback(message): 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() @@ -119,6 +117,9 @@ def callback(message): project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + try: subscription.result(timeout=timeout) except TimeoutError: @@ -183,6 +184,7 @@ def callback(message): for value in bucket.bucket_values: print(' Value {} occurs {} time(s)'.format( value.value.integer_value, value.count)) + subscription.cancel() else: # This is not the message we're looking for. message.drop() @@ -224,9 +226,6 @@ def callback(message): 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() @@ -234,6 +233,9 @@ def callback(message): project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + try: subscription.result(timeout=timeout) except TimeoutError: @@ -302,6 +304,7 @@ def callback(message): )) print(' Class size: {}'.format( value_bucket.equivalence_class_size)) + subscription.cancel() else: # This is not the message we're looking for. message.drop() @@ -346,8 +349,6 @@ def map_fields(field): 'source_table': source_table, 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. @@ -356,6 +357,9 @@ def map_fields(field): project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + try: subscription.result(timeout=timeout) except TimeoutError: @@ -428,6 +432,7 @@ def callback(message): for value in value_bucket.top_sensitive_values: print((' Sensitive value {} occurs {} time(s)' .format(value.value, value.count))) + subscription.cancel() else: # This is not the message we're looking for. message.drop() @@ -476,15 +481,15 @@ def map_fields(field): 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) try: subscription.result(timeout=timeout) @@ -560,6 +565,7 @@ def callback(message): map(get_values, value_bucket.quasi_ids_values))) print(' Estimated k-map anonymity: {}'.format( value_bucket.estimated_anonymity)) + subscription.cancel() else: # This is not the message we're looking for. message.drop() @@ -611,15 +617,15 @@ def map_fields(quasi_id, info_type): 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) try: subscription.result(timeout=timeout) From 762149293ce3826c53c8d1bf0a721cda5bee1b78 Mon Sep 17 00:00:00 2001 From: Charles Engelke Date: Mon, 4 Feb 2019 13:23:54 -0800 Subject: [PATCH 4/6] Update risk.py --- dlp/risk.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index f6e5d4801b7..c13cab64711 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -487,12 +487,9 @@ def map_fields(field): subscription_path = subscriber.subscription_path( project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) -<<<<<<< HEAD # Call API to start risk analysis job operation = dlp.create_dlp_job(parent, risk_job=risk_job) -======= ->>>>>>> 7138a549e46a45cdef6c970404aa5ee72c2ba491 try: subscription.result(timeout=timeout) @@ -626,12 +623,9 @@ def map_fields(quasi_id, info_type): subscription_path = subscriber.subscription_path( project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) -<<<<<<< HEAD # Call API to start risk analysis job operation = dlp.create_dlp_job(parent, risk_job=risk_job) -======= ->>>>>>> 7138a549e46a45cdef6c970404aa5ee72c2ba491 try: subscription.result(timeout=timeout) From e1fc466246ca4a43be857ba3b85e7e97e2650fdb Mon Sep 17 00:00:00 2001 From: Charles Engelke Date: Mon, 4 Feb 2019 15:23:04 -0800 Subject: [PATCH 5/6] Fix waiting for message --- dlp/risk.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index c13cab64711..c0583635c1a 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -68,7 +68,7 @@ def callback(message): print('Value at {}% quantile: {}'.format( percent, value)) prev_value = value - subscription.cancel() + subscription.set_result(None) else: # This is not the message we're looking for. message.drop() @@ -126,8 +126,6 @@ def callback(message): print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') subscription.close() - - subscription.cancel() # [END dlp_numerical_stats] @@ -184,7 +182,7 @@ def callback(message): for value in bucket.bucket_values: print(' Value {} occurs {} time(s)'.format( value.value.integer_value, value.count)) - subscription.cancel() + subscription.set_result(None) else: # This is not the message we're looking for. message.drop() @@ -242,8 +240,6 @@ def callback(message): print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') subscription.close() - - subscription.cancel() # [END dlp_categorical_stats] @@ -304,7 +300,7 @@ def callback(message): )) print(' Class size: {}'.format( value_bucket.equivalence_class_size)) - subscription.cancel() + subscription.set_result(None) else: # This is not the message we're looking for. message.drop() @@ -366,8 +362,6 @@ def map_fields(field): print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') subscription.close() - - subscription.cancel() # [END dlp_k_anonymity] @@ -432,7 +426,7 @@ def callback(message): for value in value_bucket.top_sensitive_values: print((' Sensitive value {} occurs {} time(s)' .format(value.value, value.count))) - subscription.cancel() + subscription.set_result(None) else: # This is not the message we're looking for. message.drop() @@ -497,8 +491,6 @@ def map_fields(field): print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') subscription.close() - - subscription.cancel() # [END dlp_l_diversity] @@ -565,7 +557,7 @@ def callback(message): map(get_values, value_bucket.quasi_ids_values))) print(' Estimated k-map anonymity: {}'.format( value_bucket.estimated_anonymity)) - subscription.cancel() + subscription.set_result(None) else: # This is not the message we're looking for. message.drop() @@ -633,8 +625,6 @@ def map_fields(quasi_id, info_type): print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') subscription.close() - - subscription.cancel() # [END dlp_k_map] From 9e1cdd2141db3383747f1ac3b4a310bd7f47c7e2 Mon Sep 17 00:00:00 2001 From: Charles Engelke Date: Mon, 4 Feb 2019 16:29:00 -0800 Subject: [PATCH 6/6] Unneeded try/except removed --- dlp/risk.py | 278 +++++++++++++++++++++++----------------------------- 1 file changed, 124 insertions(+), 154 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index c0583635c1a..273cfd1548d 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -50,33 +50,27 @@ def numerical_risk_analysis(project, table_project_id, dataset_id, table_id, import google.cloud.pubsub def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - results = job.risk_details.numerical_stats_result - print('Value Range: [{}, {}]'.format( - results.min_value.integer_value, - results.max_value.integer_value)) - prev_value = None - for percent, result in enumerate(results.quantile_values): - value = result.integer_value - if prev_value != value: - print('Value at {}% quantile: {}'.format( - percent, value)) - prev_value = value - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + results = job.risk_details.numerical_stats_result + print('Value Range: [{}, {}]'.format( + results.min_value.integer_value, + results.max_value.integer_value)) + prev_value = None + for percent, result in enumerate(results.quantile_values): + value = result.integer_value + if prev_value != value: + print('Value at {}% quantile: {}'.format( + percent, value)) + prev_value = value + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -160,37 +154,31 @@ def categorical_risk_analysis(project, table_project_id, dataset_id, table_id, import google.cloud.pubsub def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .categorical_stats_result - .value_frequency_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Most common value occurs {} time(s)'.format( - bucket.value_frequency_upper_bound)) - print(' Least common value occurs {} time(s)'.format( - bucket.value_frequency_lower_bound)) - print(' {} unique values total.'.format( - bucket.bucket_size)) - for value in bucket.bucket_values: - print(' Value {} occurs {} time(s)'.format( - value.value.integer_value, value.count)) - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .categorical_stats_result + .value_frequency_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Most common value occurs {} time(s)'.format( + bucket.value_frequency_upper_bound)) + print(' Least common value occurs {} time(s)'.format( + bucket.value_frequency_lower_bound)) + print(' {} unique values total.'.format( + bucket.bucket_size)) + for value in bucket.bucket_values: + print(' Value {} occurs {} time(s)'.format( + value.value.integer_value, value.count)) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -277,38 +265,32 @@ def get_values(obj): return int(obj.integer_value) def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .k_anonymity_result - .equivalence_class_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - if bucket.equivalence_class_size_lower_bound: - print(' Bucket size range: [{}, {}]'.format( - bucket.equivalence_class_size_lower_bound, - bucket.equivalence_class_size_upper_bound)) - for value_bucket in bucket.bucket_values: - print(' Quasi-ID values: {}'.format( - map(get_values, value_bucket.quasi_ids_values) - )) - print(' Class size: {}'.format( - value_bucket.equivalence_class_size)) - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .k_anonymity_result + .equivalence_class_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + if bucket.equivalence_class_size_lower_bound: + print(' Bucket size range: [{}, {}]'.format( + bucket.equivalence_class_size_lower_bound, + bucket.equivalence_class_size_upper_bound)) + for value_bucket in bucket.bucket_values: + print(' Quasi-ID values: {}'.format( + map(get_values, value_bucket.quasi_ids_values) + )) + print(' Class size: {}'.format( + value_bucket.equivalence_class_size)) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -401,40 +383,34 @@ def get_values(obj): return int(obj.integer_value) def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = ( - job.risk_details - .l_diversity_result - .sensitive_value_frequency_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Bucket size range: [{}, {}]'.format( - bucket.sensitive_value_frequency_lower_bound, - bucket.sensitive_value_frequency_upper_bound)) - for value_bucket in bucket.bucket_values: - print(' Quasi-ID values: {}'.format( - map(get_values, value_bucket.quasi_ids_values))) - print(' Class size: {}'.format( - value_bucket.equivalence_class_size)) - for value in value_bucket.top_sensitive_values: - print((' Sensitive value {} occurs {} time(s)' - .format(value.value, value.count))) - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = ( + job.risk_details + .l_diversity_result + .sensitive_value_frequency_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Bucket size range: [{}, {}]'.format( + bucket.sensitive_value_frequency_lower_bound, + bucket.sensitive_value_frequency_upper_bound)) + for value_bucket in bucket.bucket_values: + print(' Quasi-ID values: {}'.format( + map(get_values, value_bucket.quasi_ids_values))) + print(' Class size: {}'.format( + value_bucket.equivalence_class_size)) + for value in value_bucket.top_sensitive_values: + print((' Sensitive value {} occurs {} time(s)' + .format(value.value, value.count))) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -536,36 +512,30 @@ def get_values(obj): return int(obj.integer_value) def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .k_map_estimation_result - .k_map_estimation_histogram) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Anonymity range: [{}, {}]'.format( - bucket.min_anonymity, bucket.max_anonymity)) - print(' Size: {}'.format(bucket.bucket_size)) - for value_bucket in bucket.bucket_values: - print(' Values: {}'.format( - map(get_values, value_bucket.quasi_ids_values))) - print(' Estimated k-map anonymity: {}'.format( - value_bucket.estimated_anonymity)) - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .k_map_estimation_result + .k_map_estimation_histogram) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Anonymity range: [{}, {}]'.format( + bucket.min_anonymity, bucket.max_anonymity)) + print(' Size: {}'.format(bucket.bucket_size)) + for value_bucket in bucket.bucket_values: + print(' Values: {}'.format( + map(get_values, value_bucket.quasi_ids_values))) + print(' Estimated k-map anonymity: {}'.format( + value_bucket.estimated_anonymity)) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient()