Skip to content

Commit

Permalink
Last notebook adjustment to utility
Browse files Browse the repository at this point in the history
  • Loading branch information
cetagostini committed Nov 21, 2024
1 parent c7d834d commit 4d34c39
Show file tree
Hide file tree
Showing 7 changed files with 844 additions and 437 deletions.
297 changes: 193 additions & 104 deletions docs/source/notebooks/mmm/mmm_allocation_assessment.ipynb

Large diffs are not rendered by default.

175 changes: 103 additions & 72 deletions docs/source/notebooks/mmm/mmm_budget_allocation_example.ipynb

Large diffs are not rendered by default.

69 changes: 41 additions & 28 deletions pymc_marketing/mmm/budget_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from pymc_marketing.mmm.components.adstock import AdstockTransformation
from pymc_marketing.mmm.components.saturation import SaturationTransformation
from pymc_marketing.mmm.risk_assessment import ObjectiveFunction, average_response
from pymc_marketing.mmm.utility import UtilityFunction, average_response


class MinimizeException(Exception):
Expand Down Expand Up @@ -57,11 +57,15 @@ class BudgetOptimizer(BaseModel):
The number of time units.
parameters : dict
A dictionary of parameters for each channel.
scales : np.ndarray
The scale parameter for each channel variable.
response_scaler : float, optional
The scaling factor for the target response variable. Default is 1.
adstock_first : bool, optional
Whether to apply adstock transformation first or saturation transformation first.
Default is True.
objective_function : Callable[[np.ndarray], float], optional
The objective function to maximize. Default is the mean of the response distribution.
utility_function : UtilityFunction, optional
The utility function to maximize. Default is the mean of the response distribution.
"""

Expand All @@ -82,20 +86,27 @@ class BudgetOptimizer(BaseModel):
scales: np.ndarray = Field(
..., description="The scale parameter for each channel variable"
)
response_scaler: float = Field(
default=1.0,
description="Scaling factor for the target response variable. Defaults to 1.",
)
adstock_first: bool = Field(
True,
description="Whether to apply adstock transformation first or saturation transformation first.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)

objective_function: ObjectiveFunction = Field(
default=average_response,
description="Objective function to maximize.",
arbitrary_types_allowed=True,
response_scaler_sym: pt.TensorVariable = Field(
default=None,
exclude=True,
repr=False,
description="Response scaler tensor variable.",
)

scales_tensor: pt.TensorVariable = Field(
default=None, exclude=True, repr=False, description="Scales tensor variable."
utility_function: UtilityFunction = Field(
default=average_response,
description="Utility function to maximize.",
arbitrary_types_allowed=True,
)

DEFAULT_MINIMIZE_KWARGS: ClassVar[dict] = {
Expand All @@ -105,8 +116,7 @@ class BudgetOptimizer(BaseModel):

def __init__(self, **data):
super().__init__(**data)
# Convert scales to a PyTensor tensor
object.__setattr__(self, "scales_tensor", pt.as_tensor_variable(self.scales))
self.response_scaler_sym = pt.as_tensor_variable(self.response_scaler)
self._compiled_functions = {}
self._compile_objective_and_grad()

Expand All @@ -118,32 +128,32 @@ def _compile_objective_and_grad(self):

response_distribution = _response_distribution.sum(axis=(2, 3)).flatten()

objective_value = -self.objective_function(
objective_value = -self.utility_function(
samples=response_distribution, budgets=budgets_sym
)

# Compute gradient symbolically
grad_obj = pt.grad(objective_value, budgets_sym)

# Compile the functions
objective_func = function([budgets_sym], objective_value)
utility_func = function([budgets_sym], objective_value)
grad_func = function([budgets_sym], grad_obj)

# Cache the compiled functions
self._compiled_functions[self.objective_function] = {
"objective": objective_func,
self._compiled_functions[self.utility_function] = {
"objective": utility_func,
"gradient": grad_func,
}

def objective(self, budgets: pt.TensorVariable) -> float:
def _objective(self, budgets: pt.TensorVariable) -> float:
"""Objective function for the budget optimization."""
return self._compiled_functions[self.objective_function]["objective"](
return self._compiled_functions[self.utility_function]["objective"](
budgets
).item()

def gradient(self, budgets: pt.TensorVariable) -> pt.TensorVariable:
def _gradient(self, budgets: pt.TensorVariable) -> pt.TensorVariable:
"""Gradient of the objective function."""
return self._compiled_functions[self.objective_function]["gradient"](budgets)
return self._compiled_functions[self.utility_function]["gradient"](budgets)

def _estimate_response(self, budgets: list[float]) -> np.ndarray:
"""Calculate the total response during a period of time given the budgets.
Expand All @@ -157,8 +167,8 @@ def _estimate_response(self, budgets: list[float]) -> np.ndarray:
Returns
-------
float
The negative total response value.
np.ndarray
The estimated response distribution.
"""
first_transform, second_transform = (
Expand All @@ -167,9 +177,8 @@ def _estimate_response(self, budgets: list[float]) -> np.ndarray:
else (self.saturation, self.adstock)
)

# Ensure scales are tensor variables
scales_sym = pt.as_tensor_variable(self.scales)
budget = budgets / scales_sym
# Convert scales to a tensor variable when needed
budget = budgets / pt.as_tensor_variable(self.scales)

# Convert parameters to tensor variables if necessary
def convert_params(params):
Expand Down Expand Up @@ -201,7 +210,11 @@ def convert_params(params):
param_value = param_value.dimshuffle(0, 1, "x", 2)
second_params[param_name] = param_value

return second_transform.function(x=_response, **second_params)
# Multiply by the response_scaler_sym
return (
second_transform.function(x=_response, **second_params)
* self.response_scaler_sym
)

def allocate_budget(
self,
Expand Down Expand Up @@ -236,7 +249,7 @@ def allocate_budget(
Returns
-------
tuple[dict[str, float], float]
The optimal budgets for each channel and the negative total response value.
The optimal budgets for each channel and the optimization result object.
Raises
------
Expand Down Expand Up @@ -283,11 +296,11 @@ def allocate_budget(
minimize_kwargs = self.DEFAULT_MINIMIZE_KWARGS

result = minimize(
fun=self.objective,
fun=self._objective,
x0=initial_guess,
bounds=bounds,
constraints=constraints,
jac=self.gradient,
jac=self._gradient,
**minimize_kwargs,
)

Expand Down
118 changes: 110 additions & 8 deletions pymc_marketing/mmm/mmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pymc as pm
import pytensor.tensor as pt
import seaborn as sns
import xarray as xr
from pydantic import Field, InstanceOf, validate_call
from xarray import DataArray, Dataset

Expand All @@ -46,8 +47,8 @@
scale_lift_measurements,
)
from pymc_marketing.mmm.preprocessing import MaxAbsScaleChannels, MaxAbsScaleTarget
from pymc_marketing.mmm.risk_assessment import ObjectiveFunction, average_response
from pymc_marketing.mmm.tvp import create_time_varying_gp_multiplier, infer_time_index
from pymc_marketing.mmm.utility import UtilityFunction, average_response
from pymc_marketing.mmm.utils import (
apply_sklearn_transformer_across_dim,
create_new_spend_data,
Expand Down Expand Up @@ -2165,7 +2166,7 @@ def _create_synth_dataset(

return pd.DataFrame(new_rows)

def _sample_posterior_predictive_based_on_allocation(
def sample_response_distribution(
self,
allocation_strategy: dict[str, float],
time_granularity: str,
Expand Down Expand Up @@ -2203,13 +2204,103 @@ def _sample_posterior_predictive_based_on_allocation(
noise_level=noise_level,
)

constant_data = xr.Dataset(
data_vars={
"allocation": (["channel"], list(allocation_strategy.values())),
},
coords={
"channel": list(allocation_strategy.keys()),
},
)

return self.sample_posterior_predictive(
X_pred=synth_dataset,
extend_idata=False,
include_last_observations=True,
original_scale=False,
var_names=["y", "channel_contributions"],
progressbar=False,
).merge(constant_data)

def optimize_budget(
self,
budget: float | int,
num_periods: int,
budget_bounds: dict[str, tuple[float, float]] | None = None,
custom_constraints: dict[str, float] | None = None,
noise_level: float = 0.01,
response_scaler: float = 1.0,
utility_function: UtilityFunction = average_response,
**minimize_kwargs,
) -> az.InferenceData:
"""Optimize the given budget based on the specified utility function over a specified time period.
This function optimizes the allocation of a given budget across different channels
to maximize the response, considering adstock and saturation effects. It scales the
budget and budget bounds, performs the optimization, and generates a synthetic dataset
for posterior predictive sampling.
The function first scales the budget and budget bounds using the maximum scale
of the channel transformer. It then uses the `BudgetOptimizer` to allocate the
budget, and creates a synthetic dataset based on the optimal allocation. Finally,
it performs posterior predictive sampling on the synthetic dataset.
**Important**: When generating the posterior predicive distribution for the target with the optimized budget,
we are setting the control variables to zero! This is done because in many situations we do not have all the
control variables in the future (e.g. outlier control, special events).
Parameters
----------
budget : float or int
The total budget to be allocated.
num_periods : float
The number of time units over which the budget is to be allocated.
budget_bounds : dict[str, list[Any]], optional
A dictionary specifying the lower and upper bounds for the budget allocation
for each channel. If None, no bounds are applied.
custom_constraints : dict[str, float], optional
Custom constraints for the optimization. If None, no custom constraints are applied.
noise_level : float, optional
The level of noise added to the allocation strategy (by default 1%).
utility_function : UtilityFunction, optional
The utility function to maximize. Default is the mean of the response distribution.
**minimize_kwargs
Additional arguments to pass to the `BudgetOptimizer`.
Returns
-------
az.InferenceData
The posterior predictive samples generated from the synthetic dataset.
Raises
------
ValueError
If the time granularity is not supported.
ValueError
If the noise level is not a float.
"""
if not isinstance(noise_level, float):
raise ValueError("noise_level must be a float")

_parameters = self._format_parameters_for_budget_allocator()

allocator = BudgetOptimizer(
adstock=self.adstock,
saturation=self.saturation,
parameters=_parameters,
adstock_first=self.adstock_first,
num_periods=num_periods,
scales=self.channel_transformer["scaler"].scale_,
response_scaler=response_scaler,
utility_function=utility_function,
)

return allocator.allocate_budget(
total_budget=budget,
budget_bounds=budget_bounds,
custom_constraints=custom_constraints,
**minimize_kwargs,
)

def allocate_budget_to_maximize_response(
Expand All @@ -2220,11 +2311,15 @@ def allocate_budget_to_maximize_response(
budget_bounds: dict[str, tuple[float, float]] | None = None,
custom_constraints: dict[str, float] | None = None,
noise_level: float = 0.01,
objective_function: ObjectiveFunction = average_response,
utility_function: UtilityFunction = average_response,
**minimize_kwargs,
) -> az.InferenceData:
"""Allocate the given budget to maximize the response over a specified time period.
.. deprecated:: 0.1.0
This method is deprecated and will be removed in a future version.
Use :meth:`optimize_budget` instead.
This function optimizes the allocation of a given budget across different channels
to maximize the response, considering adstock and saturation effects. It scales the
budget and budget bounds, performs the optimization, and generates a synthetic dataset
Expand Down Expand Up @@ -2254,8 +2349,8 @@ def allocate_budget_to_maximize_response(
Custom constraints for the optimization. If None, no custom constraints are applied.
noise_level : float, optional
The level of noise added to the allocation strategy (by default 1%).
objective_function : ObjectiveFunction, optional
The objective function to maximize. Default is the mean of the response distribution.
utility_function : UtilityFunction, optional
The utility function to maximize. Default is the mean of the response distribution.
**minimize_kwargs
Additional arguments to pass to the `BudgetOptimizer`.
Expand All @@ -2272,6 +2367,13 @@ def allocate_budget_to_maximize_response(
ValueError
If the noise level is not a float.
"""
warnings.warn(
"This method is deprecated and will be removed in a future version. "
"Use optimize_budget() instead.",
DeprecationWarning,
stacklevel=2,
)

if not isinstance(noise_level, float):
raise ValueError("noise_level must be a float")

Expand All @@ -2284,7 +2386,7 @@ def allocate_budget_to_maximize_response(
adstock_first=self.adstock_first,
num_periods=num_periods,
scales=self.channel_transformer["scaler"].scale_,
objective_function=objective_function,
utility_function=utility_function,
)

self.optimal_allocation_dict, _ = allocator.allocate_budget(
Expand All @@ -2294,7 +2396,7 @@ def allocate_budget_to_maximize_response(
**minimize_kwargs,
)

return self._sample_posterior_predictive_based_on_allocation(
return self.sample_response_distribution(
allocation_strategy=self.optimal_allocation_dict,
time_granularity=time_granularity,
num_periods=num_periods,
Expand Down Expand Up @@ -2335,7 +2437,7 @@ def plot_budget_allocation(
if original_scale:
channel_contributions *= self.get_target_transformer()["scaler"].scale_

allocated_spend = np.array(list(self.optimal_allocation_dict.values()))
allocated_spend = samples.allocation.to_numpy()

if ax is None:
fig, ax = plt.subplots(figsize=figsize)
Expand Down
Loading

0 comments on commit 4d34c39

Please sign in to comment.