Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preliminary effort at integrating Vertex AI with WIT #170

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,73 @@ Later versions of bazel are currently untested and may fail to build the package
## First-time Setup

1. Install [Bazel](https://docs.bazel.build/versions/master/install.html)
(for building OSS code) and [npm](https://www.npmjs.com/get-npm). Also, if testing the tool inside TensorBoard, install [Docker](https://docs.docker.com/install/) (for hosting TF models using [TensorFlow Serving](https://github.com/tensorflow/serving)
when testing WIT in TensorBoard)
2. Install pip and virtualenv
(for building OSS code) and [npm](https://www.npmjs.com/get-npm).
1. Install pip and virtualenv
`sudo apt-get install python-pip python3-pip virtualenv`
3. Create a python 3 virtualenv for WIT development
1. Create a python 3 virtualenv for WIT development
`virtualenv -p python3 ~/tf` (or wherever you want to save this environment)
4. Create a fork of the official What-If Tool github repo through the GitHub UI
5. Clone your fork to your computer
1. Create a fork of the official What-If Tool github repo through the GitHub UI
1. Clone your fork to your computer
`cd ~/github && git clone https://github.com/[yourGitHubUsername]/what-if-tool.git`
6. Install TensorFlow Serving through docker
`docker pull tensorflow/serving` (Only needed if testing WIT in TensorBoard)
7. Install TensorFlow through pip `pip install tensorflow` to get TensorFlow and
1. Install TensorFlow through pip `pip install tensorflow` to get TensorFlow and
TensorBoard.

### Additional setup for testing WIT in TensorBoard

To test WIT in TensorBoard, you need to use Docker to host TensorFlow models
using [TensorFlow Serving](https://github.com/tensorflow/serving).

1. Install [Docker](https://docs.docker.com/install/).
1. Install TensorFlow Serving through docker:
`docker pull tensorflow/serving`


## Development Workflow
These steps have been tested when using the bash shell and may not work in other shells. The build steps for local development mostly mirror production builds. To speed this up, you can add the [`compilation_level="BUNDLE"`flag](https://github.com/PAIR-code/what-if-tool/issues/89) to the relevant `tf_tensorboard_html_binary` build tasks.
These steps have been tested when using the bash shell and may not work in other shells. The build steps for local development mostly mirror production builds. To speed this up, you can add the [`compilation_level="BUNDLE"` flag](https://github.com/PAIR-code/what-if-tool/issues/89) to the relevant `tf_tensorboard_html_binary` build tasks.


1. Enter your development virtualenv
`source ~/tf/bin/activate`
2. Run TensorBoard, WIT notebooks, and/or WIT demos
1. Run TensorBoard, WIT notebooks, and/or WIT demos
`cd ~/github/what-if-tool`
- For WIT demos, follow the directions in the [README](./README.md#i-dont-want-to-read-this-document-can-i-just-play-with-a-demo).
1. `bazel run wit_dashboard/demo:<demoRule>`
2. Navigate to `http://localhost:6006/wit-dashboard/<demoName>.html`
1. Navigate to `http://localhost:6006/wit-dashboard/<demoName>.html`
- For use in notebook mode, build the witwidget pip package locally and use it in a notebook.
1. `rm -rf /tmp/wit-pip` (if it already exists)
2. `bazel run witwidget/pip_package:build_pip_package`
3. Install the package
1. `bazel run witwidget/pip_package:build_pip_package`
1. Install the package
- For use in Jupyter notebooks, install and enable the locally-build pip package per instructions in the [README](./README.md#how-do-i-enable-it-for-use-in-a-jupyter-notebook), but instead use `pip install <pathToBuiltPipPackageWhlFile>`, then launch the jupyter notebook kernel.
- For use in Colab notebooks, upload the package to the notebook and install it from there
1. In a notebook cell, to upload a file from local disk, run
```
from google.colab import files
uploaded = files.upload()
```
2. In a notebook cell, to install the uploaded pip package, run `!pip install <nameOfPackage.whl>`.
1. In a notebook cell, to install the uploaded pip package, run `!pip install <nameOfPackage.whl>`.
If witwidget was previously installed, uninstall it first.<br>
- For TensorBoard use, build and install the tensorboard_plugin_wit package, then run tensorboard with any logdir (e.g. ./), as WIT does not rely on logdir.<br>
1. Build the tensorboard_plugin_wit pip package as per instuctions in the
[tensorboard_plugin_wit release instructions](tensorboard_plugin_wit/pip_package/RELEASE.md).
2. Install the locally-build tensorboard_plugin_wit pip package with `pip install /tmp/wit-pip/release/dist/<packageName>`
3. WIT needs a served model to query, so serve your trained model through the TF serving docker container.<br>
1. Install the locally-build tensorboard_plugin_wit pip package with `pip install /tmp/wit-pip/release/dist/<packageName>`
1. WIT needs a served model to query, so serve your trained model through the TF serving docker container.<br>
`sudo docker run -p 8500:8500 --mount type=bind,source=<pathToSavedModel>,target=/models/my_model/ -e MODEL_NAME=my_model -t tensorflow/serving`
- When developing model comparison, serve multiple models at once using the proper config as seen in the appendix.<br>
`sudo docker run -p 8500:8500 --mount type=bind,source=<pathToSavedModel1>,target=/models/my_model1 -e When you want to shutdown the served model, find the container ID and stop the container.MODEL_NAME=my_model_1 --mount type=bind,source=<pathToSavedModel2>,target=/models/my_model_2 -e MODEL_NAME=my_model_2 When you want to shutdown the served model, find the container ID and stop the container.--mount type=bind,source=<pathToConfigFile>,target=/models/models.config -t tensorflow/serving --model_config_file="/models/models.config"`
4. Run TensorBoard `tensorboard --logdir /tmp`
5. Navigate to the WIT tab in TensorBoard and set-up WIT (`http://localhost:6006/#whatif&inferenceAddress=localhost%3A8500&modelName=my_model`).<br>
1. Run TensorBoard `tensorboard --logdir /tmp`
1. Navigate to the WIT tab in TensorBoard and set-up WIT (`http://localhost:6006/#whatif&inferenceAddress=localhost%3A8500&modelName=my_model`).<br>
The inferenceAddress and modelName settings point to the model you served in the previous step. Set all other appropriate options and click “accept”.
6. When you want to shutdown the served model, find the container ID and stop the container.
1. When you want to shutdown the served model, find the container ID and stop the container.
```
sudo docker container ls
sudo docker stop <containerIdFromLsOutput>
```
3. The python code has unit tests
1. The python code has unit tests
```
bazel test ...
```
4. Add/commit your code changes on a branch in your fork and push it to github.
5. In the github UI for the master what-if-tool repo, create a pull request from your pushed branch.
1. Add/commit your code changes on a branch in your fork and push it to github.
1. In the github UI for the master what-if-tool repo, create a pull request from your pushed branch.

For notebook users to see new changes to the code, we need to push out a new version of the witwidget pip packages.
Instructions for that can be found in the [witwidget release instructions](witwidget/pip_package/RELEASE.md).
Expand Down
229 changes: 228 additions & 1 deletion witwidget/notebook/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,26 @@
from google.protobuf import json_format
from numbers import Number
from oauth2client.client import GoogleCredentials
import google_auth
from google.oauth2 import service_account
from six import ensure_str
from six import integer_types
from utils import inference_utils
from typing import Dict

from google.cloud import aiplatform
from google.cloud import aiplatform.gapic
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

# Constants used in mutant inference generation.
NUM_MUTANTS_TO_GENERATE = 10
NUM_EXAMPLES_FOR_MUTANT_ANALYSIS = 50

# Custom user agent for tracking number of calls to Cloud AI Platform.
# Custom user agents for tracking number of calls to Cloud AI Platform
# and Vertex AI.
USER_AGENT_FOR_CAIP_TRACKING = 'WhatIfTool'
USER_AGENT_FOR_VERTEX_AI_TRACKING = 'WhatIfTool'

try:
POOL_SIZE = max(multiprocessing.cpu_count() - 1, 1)
Expand Down Expand Up @@ -611,3 +621,220 @@ def extract_values(feat):

self.set_examples(filtered_examples)
return handle_selection

# service_region is required here
def _predict_vertex_ai_model(self, examples):
return self._predict_vertex_ai_impl(
examples,
self.config.get('inference_address'),
self.config.get('model_name'),
self.config.get('model_signature'),
self.config.get('force_json_input'),
self.adjust_example_fn,
self.adjust_prediction_fn,
self.adjust_attribution_fn,
self.config.get('aip_service_region'),
self.config.get('aip_service_name'),
self.config.get('aip_service_version'),
self.config.get('get_explanations'),
self.config.get('aip_batch_size'),
self.config.get('aip_api_key'))


def _predict_vertex_ai_compare_model(self, examples):
return self._predict_vertex_ai_impl(
examples,
self.config.get('inference_address_2'),
self.config.get('model_name_2'),
self.config.get('model_signature_2'),
self.config.get('compare_force_json_input'),
self.compare_adjust_example_fn,
self.compare_adjust_prediction_fn,
self.compare_adjust_attribution_fn,
self.config.get('compare_aip_service_region'), # Does this exist yet?
self.config.get('compare_aip_service_name'),
self.config.get('compare_aip_service_version'),
self.config.get('compare_get_explanations'),
self.config.get('compare_aip_batch_size'),
self.config.get('compare_aip_api_key'))


def _predict_vertex_ai_impl(self, examples, project, model, endpoint,
force_json, adjust_example, adjust_prediction,
adjust_attribution, service_region, service_name,
service_version, get_explanations, batch_size, api_key):
"""Custom prediction function for running inference through Vertex AI."""

# Set up environment for GCP call for specified project.
os.environ['GOOGLE_CLOUD_PROJECT'] = project

should_explain = get_explanations and not self.running_mutant_infer

# Regional endpoint for prediction
# For example, "us-central1-prediction-aiplatform.googleapis.com"
api_endpoint = (
('%s-prediction-aiplatform.googleapis.com')
% (service_region))

def predict_vertex(exs):
service_url = None
error_during_prediction = False
if api_key is not None: # Use provided api_key
# Create Credentials object
credentials = GoogleCredentials.get_application_default()
# Update credentials to use google_auth library
# credentials, proj = google.auth.default()
else: # Just build the service
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Preprocessing prediction examples
if self.config.get('uses_json_input') or force_json:
examples_for_predict = self._json_from_tf_examples(exs)
else:
examples_for_predict = [{'b64': base64.b64encode(
example.SerializeToString()).decode('utf-8') }
for example in exs]

# If there is a user-specified input example adjustment to make, make it.
if adjust_example:
examples_for_predict = [
adjust_example(ex) for ex in examples_for_predict]

# Send request and send user agent for tracking
endpoint = client.endpoint_path(
project=project, location=service_region, endpoint=endpoint
)
parameters_dict = {}
parameters = json_format.ParseDict(parameters_dict, Value())
request_builder = client.predict(
endpoint=endpoint, instances=examples_for_predict, parameters=parameters
)
user_agent = request_builder.headers.get('user-agent')
request_builder.headers['user-agent'] = (
USER_AGENT_FOR_VERTEX_AI_TRACKING +
('-' + user_agent if user_agent else ''))
try:
response = request_builder.execute()
except Exception as e:
error_during_prediction = True
response = {'error': str(e)}

if should_explain and not error_during_prediction:
try:
# The explain request is still on v1beta1
client_options = {"api_endpoint": api_endpoint}
client = aiplatform_v1beta1.PredictionServiceClient(client_options=client_options)
request_builder = client.explain(
endpoint=endpoint, instances=examples_for_predict, parameters=parameters
)
request_builder.headers['user-agent'] = (
USER_AGENT_FOR_VERTEX_AI_TRACKING +
('-' + user_agent if user_agent else ''))
explain_response = request_builder.execute()
explanations = explain_response.explanations
# Get a list of all the feature attributions from the explain response
attributions = [explanation.attributions for explanation in explanations]
baseline_scores = []
for i, explain in enumerate(explanations):
# Maybe use attribution.baseline_output_value
baseline_scores.append(
explain_response['explanations'][i][
'attributions_by_label'][0]['baseline_score'])
response.update(
{'explanations': attributions, 'baseline_scores': baseline_scores})
except Exception as e:
pass
return response

def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]

# Run prediction in batches in threads.
if batch_size is None:
batch_size = len(examples)
batched_examples = list(chunks(examples, batch_size))

pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE)
responses = pool.map(predict, batched_examples)
pool.close()
pool.join()

for response in responses:
if 'error' in response:
raise RuntimeError(response['error'])

# Parse the results from the responses and return them.
all_predictions = []
all_baseline_scores = []
all_attributions = []

for response in responses:
if 'explanations' in response:
# If an attribution adjustment function was provided, use it to adjust
# the attributions.
if adjust_attribution is not None:
all_attributions.extend([
adjust_attribution(attr) for attr in response['explanations']])
else:
all_attributions.extend(response['explanations'])

if 'baseline_scores' in response:
all_baseline_scores.extend(response['baseline_scores'])

# Use the specified key if one is provided.
key_to_use = self.config.get('predict_output_tensor')

for pred in response['predictions']:
# If the prediction contains a key to fetch the prediction, use it.
if isinstance(pred, dict):
if key_to_use is None:
# If the dictionary only contains one key, use it.
returned_keys = list(pred.keys())
if len(returned_keys) == 1:
key_to_use = returned_keys[0]
# Use a default key if necessary.
elif self.config.get('model_type') == 'classification':
key_to_use = 'probabilities'
else:
key_to_use = 'outputs'

if key_to_use not in pred:
raise KeyError(
'"%s" not found in model predictions dictionary' % key_to_use)

pred = pred[key_to_use]

# If the model is regression and the response is a list, extract the
# score by taking the first element.
if (self.config.get('model_type') == 'regression' and
isinstance(pred, list)):
pred = pred[0]

# If an prediction adjustment function was provided, use it to adjust
# the prediction.
if adjust_prediction:
pred = adjust_prediction(pred)

# If the model is classification and the response is a single number,
# treat that as the positive class score for a binary classification
# and convert it into a list of those two class scores. WIT only
# accepts lists of class scores as results from classification models.
if (self.config.get('model_type') == 'classification'):
if not isinstance(pred, list):
pred = [pred]
if len(pred) == 1:
pred = [1 - pred[0], pred[0]]

all_predictions.append(pred)

results = {'predictions': all_predictions}
if all_attributions:
results.update({'attributions': all_attributions})
if all_baseline_scores:
results.update({'baseline_score': all_baseline_scores})
return results
Loading