Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(markov): implement higher order markov models #50

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ build/*
.vscode/*
.pre-commit-config.yaml
venv/
local_nbs/
270 changes: 149 additions & 121 deletions marketing_attribution_models/MAM.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,152 +1213,185 @@ def attribution_time_decay(

def attribution_markov(
self,
order=1,
start_state_name="(inicio)",
conversion_state_name="(conversion)",
null_state_name="(null)",
transition_to_same_state=False,
group_by_channels_models=True,
conversion_value_as_frequency=True,
):
"""Attribution using Markov."""
"""Attribution using Markov.

Parameters:
order =
The order of the Markov model, i.e. how many previous states we model
the transition to the next one to depend on.
start_state_name, conversion_state_name, null_state_name =
Names for some of the auxiliary states we create for the Markov model.
transition_to_same_state = False by default.
If False, will ignore same state transitions when building the
transition matrix.
group_by_channels_models = True by default.
Outputs the weighted conversion totals using the Markov model and
saves in the internal object.
conversion_value_as_frequency = True by default.
Uses the internal MAM conversion_value as a frequency count for each
state transition. If false, for each path we assume the frequency to
be 1.
"""
model_name = "attribution_markov"
model_type = "_algorithmic"
if transition_to_same_state:
model_name = model_name + "_same_state" + model_type
else:
model_name = model_name + model_type

def normalize_rows(matrix):
size = matrix.shape[0]
mean = matrix.sum(axis=1).reshape((size, 1))
mean = np.where(mean == 0, 1, mean)
return matrix / mean
# Maybe we should get the channels from an internal MAM variable instead
channels = list({channel for path in self.channels for channel in path})

def get_ngram_frequencies():
def list_to_ngram_series(l, n):
# pylint: disable=invalid-name
if start_state_name:
l = (n - 1) * [start_state_name] + l
return pd.Series(tuple(l[i : i + n]) for i in range(len(l) - n + 1))

extended_paths_frequencies = pd.DataFrame(
{
"paths": self.channels
+ self.journey_with_conv.apply(
lambda x: [conversion_state_name if x else null_state_name]
),
"count": (
self.conversion_value
if conversion_value_as_frequency
else self.conversion_value.apply(lambda x: 1)
),
}
)
ngram_frequencies = (
extended_paths_frequencies.apply(
lambda row: pd.Series(
[list_to_ngram_series(row["paths"], order + 1), row["count"]],
index=["ngrams", "count"],
),
axis=1,
)
.explode("ngrams")
.groupby(by="ngrams", as_index=False)
.sum()
)
return ngram_frequencies

def calc_total_conversion(matrix):
def get_transition_matrix_from_ngram_frequencies(ngram_frequencies):
# pylint: disable=invalid-name
# https://en.wikipedia.org/wiki/Absorbing_Markov_chain#Absorbing_probabilities
matrix = normalize_rows(matrix)

# Those indices follow from the construction, where we have the conversion
# and non-conversion states as the last 2
Q = matrix[:-2, :-2]
R = matrix[:-2, -2:]
N = np.linalg.inv(np.identity(len(Q)) - Q)

# We also assume the first row represents the starting state
return (N @ R)[0, 1]

def removal_effect(matrix):
size = matrix.shape[0]
conversions = np.zeros(size)
for column in range(1, size - 2):
temp = matrix.copy()
temp[:, -2] = temp[:, -2] + temp[:, column]
temp[:, column] = 0
conversions[column] = calc_total_conversion(temp)
conversion_orig = calc_total_conversion(matrix)
return 1 - (conversions / conversion_orig)

def path_to_matrix(paths):
channel_max = int(paths[:, 0:2].max()) + 1
matrix = np.zeros((channel_max, channel_max), dtype="float")
for x, y, val in paths:
matrix[int(x), int(y)] = val
matrix[-1, -1] = 1
matrix[-2, -2] = 1
def sum_of_powers(base, n):
return int((base - base ** (n + 1)) / (1 - base)) + 1

# The size of the matrix is a sum of powers from the channels set size until
# the specified order.
size = sum_of_powers(len(channels), order)
matrix = pd.DataFrame(np.zeros((size + 2, size + 2), dtype="float"))
# A ngram of form (s_1, ..., s_{n+1}
matrix.index = matrix.columns = pd.MultiIndex.from_tuples(
[
(start_state_name,) * (order - len(S)) + S
for i in range(order + 1)
for S in itertools.product(channels, repeat=i)
]
+ [(conversion_state_name,) * order, (null_state_name,) * order]
)
# A ngram of form (s_1, ..., s_{n+1}) represents a transition from the state
# S1 = (s_1, ..., s_{n}) to S2 = (s_2, ..., s_{n+1}) where n is the order of
# the chain
for _, (ngram, freq) in ngram_frequencies.iterrows():
S1, S2 = ngram[:-1], ngram[1:]
if not transition_to_same_state and S1 == S2:
continue
if conversion_state_name in S2:
S2 = (conversion_state_name,) * order
if null_state_name in S2:
S2 = (null_state_name,) * order
matrix.loc[S1, S2] += freq
matrix.iloc[-1, -1] = 1
matrix.iloc[-2, -2] = 1
matrix = matrix / matrix.values.sum(axis=1, keepdims=True)
# For states without transitions we just plug them to the null state
matrix[matrix.isnull().any(axis=1)] = np.eye(size + 2)[size + 1]
return matrix

temp = self.channels.apply(
lambda x: ["(inicio)"] + x
) + self.journey_with_conv.apply(lambda x: ["(conversion)" if x else "(null)"])

orig = []
dest = []
journey_length = []

def save_orig_dest(arr):
orig.extend(arr[:-1])
dest.extend(arr[1:])
journey_length.append(len(arr))

temp.apply(save_orig_dest)

# copying conversion_quantity to each new row
if type(self.conversion_value) in (int, float):
# we do not hava a frequency column yet so we are using
# self.conversion_value.apply(lambda x: 1) to count each line
conversion_quantity = self.conversion_value.apply(lambda x: 1)

else:
if conversion_value_as_frequency:
freq_values = self.conversion_value
else:
freq_values = self.conversion_value.apply(lambda x: 1)

conversion_quantity = []

for a, b in zip(freq_values, journey_length):
conversion_quantity.extend([a] * (b - 1))

temp = pd.DataFrame({"orig": orig, "dest": dest, "count": conversion_quantity})
temp = temp.groupby(["orig", "dest"], as_index=False).sum()
self._print(temp)

if not transition_to_same_state:
temp = temp[temp.orig != temp.dest]

# Converting channels_names to index and pass a numpy array foward
channels_names = (
["(inicio)"]
+ list(
(set(temp.orig) - set(["(inicio)"]))
| (set(temp.dest) - set(["(conversion)", "(null)"]))
def get_removal_effect_df(matrix):
def calc_modelled_conversion_rate(submatrix):
# pylint: disable=invalid-name
# https://en.wikipedia.org/wiki/Absorbing_Markov_chain#Absorbing_probabilities
# Those indices follow from the construction, where we have the conversion
# and non-conversion states as the last 2
Q = submatrix[:-2, :-2]
R = submatrix[:-2, -2:]
N = np.linalg.inv(np.identity(len(Q)) - Q)
# We also assume the first row represents the starting state
return (N @ R)[0, 0]

removal_effect_df = pd.DataFrame(index=channels + ["Baseline"])
for channel in channels:
temp = matrix.copy()
channel_cols = [channel in state for state in temp.columns.tolist()]
temp.iloc[:, -1] = temp.iloc[:, -1] + temp.loc[:, channel_cols].sum(
axis=1
)
temp.loc[:, channel_cols] = 0
removal_effect_df.loc[
channel, "conv_rate"
] = calc_modelled_conversion_rate(temp.values)
removal_effect_df.loc[
"Baseline", "conv_rate"
] = calc_modelled_conversion_rate(matrix.values)
removal_effect_df["removal_effect"] = 1 - (
removal_effect_df["conv_rate"]
/ removal_effect_df.loc["Baseline", "conv_rate"]
)
+ ["(null)", "(conversion)"]
)
temp["orig"] = temp.orig.apply(channels_names.index)
temp["dest"] = temp.dest.apply(channels_names.index)
matrix = path_to_matrix(temp[["orig", "dest", "count"]].values)
removal_effect_result = removal_effect(matrix)[1:-2]
results = removal_effect_result / removal_effect_result.sum(axis=0)

# Channels weights
frame = pd.DataFrame({"value": results}, index=channels_names[1:-2])
removal_effect_result = pd.DataFrame(
{"removal_effect": removal_effect_result}, index=channels_names[1:-2]
)

# Transition matrix
matrix = normalize_rows(matrix)
matrix = pd.DataFrame(matrix, columns=channels_names, index=channels_names)

# Apply weights back to each journey
chmap = {a: b[0] for a, b in zip(frame.index.values, frame.values)}
channels_value = self.channels.apply(lambda y: [chmap[x] for x in y])
removal_effect_df.loc["Baseline", "removal_effect"] = np.nan
return removal_effect_df

ngram_frequencies = get_ngram_frequencies()
matrix = get_transition_matrix_from_ngram_frequencies(ngram_frequencies)
re_df = get_removal_effect_df(matrix)
normalized_re = re_df["removal_effect"] / re_df["removal_effect"].sum()

# Apply weights back to each journey and add the results back into self.DataFrame
# Analyzing Markov Attribution results on a per journey basis is very debatable
# using this method, since all it does is assigning the conversion credit
# proportionally to the removal effect for every channel in the journey.
channels_value = self.channels.apply(lambda y: [normalized_re[x] for x in y])
channels_value = channels_value.apply(lambda x: list(np.array(x) / sum(x)))

# Adding the results to self.DataFrame
self.as_pd_dataframe()
self.data_frame[model_name] = channels_value.apply(
lambda x: self.sep.join([str(value) for value in x])
)

# Grouping the attributed values for each channel
total_conv_value = self.journey_with_conv * self.conversion_value
if group_by_channels_models:
total_conv_value = (self.journey_with_conv * self.conversion_value).sum()
attributed_conversions = pd.DataFrame(
{
"channels": normalized_re.index,
model_name: normalized_re * total_conv_value,
}
).reset_index(drop=True)
if isinstance(self.group_by_channels_models, pd.DataFrame):
frame = frame.reset_index()
frame.columns = ["channels", model_name]
frame[model_name] = frame[model_name] * total_conv_value.sum()
self.group_by_channels_models = pd.merge(
self.group_by_channels_models, frame, how="outer", on=["channels"]
self.group_by_channels_models,
attributed_conversions,
how="outer",
on=["channels"],
).fillna(0)
else:
frame = frame.reset_index()
frame.columns = ["channels", model_name]
frame[model_name] = frame[model_name] * total_conv_value.sum()
self.group_by_channels_models = frame
self.group_by_channels_models = attributed_conversions
else:
frame = "group_by_channels_models = False"
attributed_conversions = "group_by_channels_models = False"

return (channels_value, frame, matrix, removal_effect_result)
return (channels_value, attributed_conversions, matrix, re_df)

def journey_conversion_table(self, order=False, size=None):
"""Transforms journey channels in boolean columns, count the number of
Expand All @@ -1367,7 +1400,6 @@ def journey_conversion_table(self, order=False, size=None):
"""
# Creating Channels DF
df_temp = self.journey_id.copy()

if order:
df_temp["combinations"] = self.channels.apply(
lambda channels: sorted(list(set(channels)), key=channels.index)
Expand All @@ -1376,18 +1408,15 @@ def journey_conversion_table(self, order=False, size=None):
df_temp["combinations"] = self.channels.apply(
lambda channels: sorted(list(set(channels)))
).copy()

if size is not None:
df_temp["combinations"] = df_temp["combinations"].apply(
lambda channels: self.sep.join(channels[size * -1 :])
)
else:
df_temp["combinations"] = df_temp["combinations"].apply(self.sep.join)

# Adding journey_with_conv column
df_temp["journey_with_conv"] = self.journey_with_conv.apply(int)
df_temp["conversion_value"] = self.conversion_value

# Grouping journey_with_conv
conv_val = (
df_temp.groupby(["combinations"])["conversion_value"]
Expand All @@ -1402,7 +1431,6 @@ def journey_conversion_table(self, order=False, size=None):
df_temp["conversion_value"] = conv_val
# Calculating the conversion rate
df_temp["conv_rate"] = df_temp["conversions"] / df_temp["total_sequences"]

return df_temp

def coalitions(self, size=4, unique_channels=None, order=False):
Expand Down