// Set up a callback to acknowledge a message. This closes around an event
// so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message):
try:
if (message.attributes["DlpJobName"] == operation.name):
// This is the message we"re looking for, so acknowledge it.
message.ack()
// Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(operation.name)
results = job.risk_details.numerical_stats_result
print("Value Range: [{}, {}]".format(
results.min_value.integer_value,
results.max_value.integer_value))
prev_value = None
for percent, result in enumerate(results.quantile_values):
value = result.integer_value
if prev_value != value:
print("Value at {}% quantile: {}".format(
percent, value))
prev_value = value
// Signal to the main thread that we can exit.
job_done.set()
else:
// This is not the message we"re looking for.
message.drop()
except Exception as e:
// Because this is executing in a thread, an exception won"t be
// noted unless we print it manually.
print(e)
raise
// Register the callback and wait on the event.
subscription.open(callback)
finished = job_done.wait(timeout=timeout)if not finished:
print("No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided.")
// [END dlp_numerical_stats]
// [START dlp_categorical_stats]
def categorical_risk_analysis(project, table_project_id, dataset_id, table_id,
column_name, topic_id, subscription_id,
timeout=300):
Uses the Data Loss Prevention API to compute risk metrics of a column