From a945d899a346231cb11d55fe2cd1846e23e3538b Mon Sep 17 00:00:00 2001 From: taksqth Date: Tue, 23 Nov 2021 13:10:32 -0200 Subject: [PATCH 1/2] feat(markov): implement higher order markov models --- .gitignore | 1 + marketing_attribution_models/MAM.py | 270 +++++++++++++++------------- 2 files changed, 150 insertions(+), 121 deletions(-) diff --git a/.gitignore b/.gitignore index d1ee04e..b2a829f 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ build/* .vscode/* .pre-commit-config.yaml venv/ +local_nbs/ \ No newline at end of file diff --git a/marketing_attribution_models/MAM.py b/marketing_attribution_models/MAM.py index d904b69..2da1bc3 100644 --- a/marketing_attribution_models/MAM.py +++ b/marketing_attribution_models/MAM.py @@ -1213,11 +1213,33 @@ def attribution_time_decay( def attribution_markov( self, + order=1, + start_state_name='(inicio)', + conversion_state_name='(conversion)', + null_state_name='(null)', transition_to_same_state=False, group_by_channels_models=True, conversion_value_as_frequency=True, ): - """Attribution using Markov.""" + """Attribution using Markov. + + Parameters: + order = + The order of the Markov model, i.e. how many previous states we model + the transition to the next one to depend on. + start_state_name, conversion_state_name, null_state_name = + Names for some of the auxiliary states we create for the Markov model. + transition_to_same_state = False by default. + If False, will ignore same state transitions when building the + transition matrix. + group_by_channels_models = True by default. + Outputs the weighted conversion totals using the Markov model and + saves in the internal object. + conversion_value_as_frequency = True by default. + Uses the internal MAM conversion_value as a frequency count for each + state transition. If false, for each path we assume the frequency to + be 1. + """ model_name = "attribution_markov" model_type = "_algorithmic" if transition_to_same_state: @@ -1225,140 +1247,151 @@ def attribution_markov( else: model_name = model_name + model_type - def normalize_rows(matrix): - size = matrix.shape[0] - mean = matrix.sum(axis=1).reshape((size, 1)) - mean = np.where(mean == 0, 1, mean) - return matrix / mean + # Maybe we should get the channels from an internal MAM variable instead + channels = list({channel for path in self.channels for channel in path}) + + def get_ngram_frequencies(): + def list_to_ngram_series(l, n): + # pylint: disable=invalid-name + if start_state_name: + l = (n - 1) * [start_state_name] + l + return pd.Series(tuple(l[i : i + n]) for i in range(len(l) - n + 1)) + + extended_paths_frequencies = pd.DataFrame( + { + "paths": self.channels + + self.journey_with_conv.apply( + lambda x: [conversion_state_name if x else null_state_name] + ), + "count": ( + self.conversion_value + if conversion_value_as_frequency + else self.conversion_value.apply(lambda x: 1) + ), + } + ) + ngram_frequencies = ( + extended_paths_frequencies.apply( + lambda row: pd.Series( + [list_to_ngram_series(row["paths"], order + 1), row["count"]], + index=["ngrams", "count"], + ), + axis=1, + ) + .explode("ngrams") + .groupby(by="ngrams", as_index=False) + .sum() + ) + return ngram_frequencies - def calc_total_conversion(matrix): + def get_transition_matrix_from_ngram_frequencies(ngram_frequencies): # pylint: disable=invalid-name - # https://en.wikipedia.org/wiki/Absorbing_Markov_chain#Absorbing_probabilities - matrix = normalize_rows(matrix) - - # Those indices follow from the construction, where we have the conversion - # and non-conversion states as the last 2 - Q = matrix[:-2, :-2] - R = matrix[:-2, -2:] - N = np.linalg.inv(np.identity(len(Q)) - Q) - - # We also assume the first row represents the starting state - return (N @ R)[0, 1] - - def removal_effect(matrix): - size = matrix.shape[0] - conversions = np.zeros(size) - for column in range(1, size - 2): - temp = matrix.copy() - temp[:, -2] = temp[:, -2] + temp[:, column] - temp[:, column] = 0 - conversions[column] = calc_total_conversion(temp) - conversion_orig = calc_total_conversion(matrix) - return 1 - (conversions / conversion_orig) - - def path_to_matrix(paths): - channel_max = int(paths[:, 0:2].max()) + 1 - matrix = np.zeros((channel_max, channel_max), dtype="float") - for x, y, val in paths: - matrix[int(x), int(y)] = val - matrix[-1, -1] = 1 - matrix[-2, -2] = 1 + def sum_of_powers(base, n): + return int((base - base ** (n + 1)) / (1 - base)) + 1 + + # The size of the matrix is a sum of powers from the channels set size until + # the specified order. + size = sum_of_powers(len(channels), order) + matrix = pd.DataFrame(np.zeros((size + 2, size + 2), dtype="float")) + # A ngram of form (s_1, ..., s_{n+1} + matrix.index = matrix.columns = pd.MultiIndex.from_tuples( + [ + (start_state_name,) * (order - len(S)) + S + for i in range(order + 1) + for S in itertools.product(channels, repeat=i) + ] + + [(conversion_state_name,) * order, (null_state_name,) * order] + ) + # A ngram of form (s_1, ..., s_{n+1}) represents a transition from the state + # S1 = (s_1, ..., s_{n}) to S2 = (s_2, ..., s_{n+1}) where n is the order of + # the chain + for _, (ngram, freq) in ngram_frequencies.iterrows(): + S1, S2 = ngram[:-1], ngram[1:] + if not transition_to_same_state and S1 == S2: + continue + if conversion_state_name in S2: + S2 = (conversion_state_name,) * order + if null_state_name in S2: + S2 = (null_state_name,) * order + matrix.loc[S1, S2] += freq + matrix.iloc[-1, -1] = 1 + matrix.iloc[-2, -2] = 1 + matrix = matrix / matrix.values.sum(axis=1, keepdims=True) + # For states without transitions we just plug them to the null state + matrix[matrix.isnull().any(axis=1)] = np.eye(size + 2)[size + 1] return matrix - temp = self.channels.apply( - lambda x: ["(inicio)"] + x - ) + self.journey_with_conv.apply(lambda x: ["(conversion)" if x else "(null)"]) - - orig = [] - dest = [] - journey_length = [] - - def save_orig_dest(arr): - orig.extend(arr[:-1]) - dest.extend(arr[1:]) - journey_length.append(len(arr)) - - temp.apply(save_orig_dest) - - # copying conversion_quantity to each new row - if type(self.conversion_value) in (int, float): - # we do not hava a frequency column yet so we are using - # self.conversion_value.apply(lambda x: 1) to count each line - conversion_quantity = self.conversion_value.apply(lambda x: 1) - - else: - if conversion_value_as_frequency: - freq_values = self.conversion_value - else: - freq_values = self.conversion_value.apply(lambda x: 1) - - conversion_quantity = [] - - for a, b in zip(freq_values, journey_length): - conversion_quantity.extend([a] * (b - 1)) - - temp = pd.DataFrame({"orig": orig, "dest": dest, "count": conversion_quantity}) - temp = temp.groupby(["orig", "dest"], as_index=False).sum() - self._print(temp) - - if not transition_to_same_state: - temp = temp[temp.orig != temp.dest] - - # Converting channels_names to index and pass a numpy array foward - channels_names = ( - ["(inicio)"] - + list( - (set(temp.orig) - set(["(inicio)"])) - | (set(temp.dest) - set(["(conversion)", "(null)"])) + def get_removal_effect_df(matrix): + def calc_modelled_conversion_rate(submatrix): + # pylint: disable=invalid-name + # https://en.wikipedia.org/wiki/Absorbing_Markov_chain#Absorbing_probabilities + # Those indices follow from the construction, where we have the conversion + # and non-conversion states as the last 2 + Q = submatrix[:-2, :-2] + R = submatrix[:-2, -2:] + N = np.linalg.inv(np.identity(len(Q)) - Q) + # We also assume the first row represents the starting state + return (N @ R)[0, 0] + + removal_effect_df = pd.DataFrame(index=channels + ["Baseline"]) + for channel in channels: + temp = matrix.copy() + channel_cols = [channel in state for state in temp.columns.tolist()] + temp.iloc[:, -1] = temp.iloc[:, -1] + temp.loc[:, channel_cols].sum( + axis=1 + ) + temp.loc[:, channel_cols] = 0 + removal_effect_df.loc[ + channel, "conv_rate" + ] = calc_modelled_conversion_rate(temp.values) + removal_effect_df.loc[ + "Baseline", "conv_rate" + ] = calc_modelled_conversion_rate(matrix.values) + removal_effect_df["removal_effect"] = 1 - ( + removal_effect_df["conv_rate"] + / removal_effect_df.loc["Baseline", "conv_rate"] ) - + ["(null)", "(conversion)"] - ) - temp["orig"] = temp.orig.apply(channels_names.index) - temp["dest"] = temp.dest.apply(channels_names.index) - matrix = path_to_matrix(temp[["orig", "dest", "count"]].values) - removal_effect_result = removal_effect(matrix)[1:-2] - results = removal_effect_result / removal_effect_result.sum(axis=0) - - # Channels weights - frame = pd.DataFrame({"value": results}, index=channels_names[1:-2]) - removal_effect_result = pd.DataFrame( - {"removal_effect": removal_effect_result}, index=channels_names[1:-2] - ) - - # Transition matrix - matrix = normalize_rows(matrix) - matrix = pd.DataFrame(matrix, columns=channels_names, index=channels_names) - - # Apply weights back to each journey - chmap = {a: b[0] for a, b in zip(frame.index.values, frame.values)} - channels_value = self.channels.apply(lambda y: [chmap[x] for x in y]) + removal_effect_df.loc["Baseline", "removal_effect"] = np.nan + return removal_effect_df + + ngram_frequencies = get_ngram_frequencies() + matrix = get_transition_matrix_from_ngram_frequencies(ngram_frequencies) + re_df = get_removal_effect_df(matrix) + normalized_re = re_df["removal_effect"] / re_df["removal_effect"].sum() + + # Apply weights back to each journey and add the results back into self.DataFrame + # Analyzing Markov Attribution results on a per journey basis is very debatable + # using this method, since all it does is assigning the conversion credit + # proportionally to the removal effect for every channel in the journey. + channels_value = self.channels.apply(lambda y: [normalized_re[x] for x in y]) channels_value = channels_value.apply(lambda x: list(np.array(x) / sum(x))) - - # Adding the results to self.DataFrame self.as_pd_dataframe() self.data_frame[model_name] = channels_value.apply( lambda x: self.sep.join([str(value) for value in x]) ) # Grouping the attributed values for each channel - total_conv_value = self.journey_with_conv * self.conversion_value if group_by_channels_models: + total_conv_value = (self.journey_with_conv * self.conversion_value).sum() + attributed_conversions = pd.DataFrame( + { + "channels": normalized_re.index, + model_name: normalized_re * total_conv_value, + } + ).reset_index(drop=True) if isinstance(self.group_by_channels_models, pd.DataFrame): - frame = frame.reset_index() - frame.columns = ["channels", model_name] - frame[model_name] = frame[model_name] * total_conv_value.sum() self.group_by_channels_models = pd.merge( - self.group_by_channels_models, frame, how="outer", on=["channels"] + self.group_by_channels_models, + attributed_conversions, + how="outer", + on=["channels"], ).fillna(0) else: - frame = frame.reset_index() - frame.columns = ["channels", model_name] - frame[model_name] = frame[model_name] * total_conv_value.sum() - self.group_by_channels_models = frame + self.group_by_channels_models = attributed_conversions else: - frame = "group_by_channels_models = False" + attributed_conversions = "group_by_channels_models = False" - return (channels_value, frame, matrix, removal_effect_result) + return (channels_value, attributed_conversions, matrix, re_df) def journey_conversion_table(self, order=False, size=None): """Transforms journey channels in boolean columns, count the number of @@ -1367,7 +1400,6 @@ def journey_conversion_table(self, order=False, size=None): """ # Creating Channels DF df_temp = self.journey_id.copy() - if order: df_temp["combinations"] = self.channels.apply( lambda channels: sorted(list(set(channels)), key=channels.index) @@ -1376,18 +1408,15 @@ def journey_conversion_table(self, order=False, size=None): df_temp["combinations"] = self.channels.apply( lambda channels: sorted(list(set(channels))) ).copy() - if size is not None: df_temp["combinations"] = df_temp["combinations"].apply( lambda channels: self.sep.join(channels[size * -1 :]) ) else: df_temp["combinations"] = df_temp["combinations"].apply(self.sep.join) - # Adding journey_with_conv column df_temp["journey_with_conv"] = self.journey_with_conv.apply(int) df_temp["conversion_value"] = self.conversion_value - # Grouping journey_with_conv conv_val = ( df_temp.groupby(["combinations"])["conversion_value"] @@ -1402,7 +1431,6 @@ def journey_conversion_table(self, order=False, size=None): df_temp["conversion_value"] = conv_val # Calculating the conversion rate df_temp["conv_rate"] = df_temp["conversions"] / df_temp["total_sequences"] - return df_temp def coalitions(self, size=4, unique_channels=None, order=False): From a22c03c5fb21feceac73db8b696fd11dd44baf12 Mon Sep 17 00:00:00 2001 From: taksqth Date: Tue, 23 Nov 2021 13:15:44 -0200 Subject: [PATCH 2/2] style: forgot to run 'black' --- marketing_attribution_models/MAM.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/marketing_attribution_models/MAM.py b/marketing_attribution_models/MAM.py index 2da1bc3..86a1cd0 100644 --- a/marketing_attribution_models/MAM.py +++ b/marketing_attribution_models/MAM.py @@ -1214,9 +1214,9 @@ def attribution_time_decay( def attribution_markov( self, order=1, - start_state_name='(inicio)', - conversion_state_name='(conversion)', - null_state_name='(null)', + start_state_name="(inicio)", + conversion_state_name="(conversion)", + null_state_name="(null)", transition_to_same_state=False, group_by_channels_models=True, conversion_value_as_frequency=True,