Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(taps): add argument for nan handling strategy to _flatten_record #2222

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class FlatteningOptions(t.NamedTuple):
max_level: int
flattening_enabled: bool = True
separator: str = DEFAULT_FLATTENING_SEPARATOR
nan_strategy: t.Literal["fail", "allow"] = "fail"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to support msgspec for serialization in the near future (#1784), and it defaults to making the values null. Do you think there's a way to make these options future-compatible with msgspec?

https://jcristharif.com/msgspec/supported-types.html#float

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify a little more details? I am not very familiar with msgspec lib. In simplejson which is currently used there is ignore_nan argument which could be used. I am thinking about adding one more nan_strategy "convert_null" which will set ignore_nan to True. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about adding one more nan_strategy "convert_null" which will set ignore_nan to True. WDYT?

@dgawlowsky that makes sense to me!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon
Added this! sorry for the delay!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dgawlowsky! I'll review this soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @edgarrmondragon! Can you share the status of the review? We are using meltano in one of our projects and will be deploying do production soon, hence it will be easier to pull library from official distribution. Thanks in advance!



def get_flattening_options(
Expand Down Expand Up @@ -81,13 +82,15 @@ def flatten_schema(
schema: dict,
max_level: int,
separator: str = "__",
nan_strategy: t.Literal["fail", "allow"] = "fail",
) -> dict:
"""Flatten the provided schema up to a depth of max_level.

Args:
schema: The schema definition to flatten.
separator: The string to use when concatenating key names.
max_level: The max recursion level (zero-based, exclusive).
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the provided schema definition.
Expand Down Expand Up @@ -274,6 +277,7 @@ def flatten_schema(
schema_node=new_schema,
max_level=max_level,
separator=separator,
nan_strategy=nan_strategy,
)
return new_schema

Expand All @@ -284,6 +288,7 @@ def _flatten_schema( # noqa: C901, PLR0912
separator: str = "__",
level: int = 0,
max_level: int = 0,
nan_strategy: t.Literal["fail", "allow"] = "fail",
) -> dict:
"""Flatten the provided schema node, recursively up to depth of `max_level`.

Expand All @@ -293,6 +298,7 @@ def _flatten_schema( # noqa: C901, PLR0912
separator: The string to use when concatenating key names.
level: The current recursion level (zero-based).
max_level: The max recursion level (zero-based, exclusive).
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the provided node.
Expand All @@ -319,6 +325,7 @@ def _flatten_schema( # noqa: C901, PLR0912
separator=separator,
level=level + 1,
max_level=max_level,
nan_strategy=nan_strategy,
).items(),
)
elif (
Expand Down Expand Up @@ -364,6 +371,7 @@ def flatten_record(
flattened_schema: dict,
max_level: int,
separator: str = "__",
nan_strategy: t.Literal["fail", "allow"] = "fail",
) -> dict:
"""Flatten a record up to max_level.

Expand All @@ -372,6 +380,7 @@ def flatten_record(
flattened_schema: The already flattened schema.
separator: The string used to separate concatenated key names. Defaults to "__".
max_level: The maximum depth of keys to flatten recursively.
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the record.
Expand All @@ -381,6 +390,7 @@ def flatten_record(
flattened_schema=flattened_schema,
separator=separator,
max_level=max_level,
nan_strategy=nan_strategy,
)


Expand All @@ -392,6 +402,7 @@ def _flatten_record(
separator: str = "__",
level: int = 0,
max_level: int = 0,
nan_strategy: t.Literal["fail", "allow"] = "fail",
) -> dict:
"""This recursive function flattens the record node.

Expand All @@ -405,6 +416,7 @@ def _flatten_record(
separator: The string to use when concatenating key names.
level: The current recursion level (zero-based).
max_level: The max recursion level (zero-based, exclusive).
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the provided node.
Expand All @@ -424,13 +436,23 @@ def _flatten_record(
separator=separator,
level=level + 1,
max_level=max_level,
nan_strategy=nan_strategy,
).items(),
)
else:
nan_strategies = {
"fail": False,
"allow": True,
}
items.append(
(
new_key,
json.dumps(v, use_decimal=True, default=str)
json.dumps(
v,
use_decimal=True,
default=str,
allow_nan=nan_strategies[nan_strategy],
)
if _should_jsondump_value(k, v, flattened_schema)
else v,
),
Expand Down
1 change: 1 addition & 0 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def flatten_record(self, record: dict) -> dict:
flattened_schema=self.transformed_schema,
max_level=self.flattening_options.max_level,
separator=self.flattening_options.separator,
nan_strategy=self.flattening_options.nan_strategy,
)

def flatten_schema(self, raw_schema: dict) -> dict:
Expand Down
Loading