Skip to content

Commit

Permalink
Merge pull request #36 from alexmaras/main
Browse files Browse the repository at this point in the history
Add Jsonl formatter, add ability to optionally use raw stream name
  • Loading branch information
crowemi authored Mar 19, 2024
2 parents 24d451a + bef8f8f commit c6d00e2
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com).
"include_process_date": true|false,
"append_date_to_prefix": true|false,
"partition_name_enabled": true|false,
"use_raw_stream_name": true|false,
"append_date_to_prefix_grain": "day",
"append_date_to_filename": true|false,
"append_date_to_filename_grain": "microsecond",
Expand Down
11 changes: 8 additions & 3 deletions target_s3/formats/format_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ def __init__(self, config: dict, context: dict, extension: str) -> None:
endpoint_url=aws_config.get("aws_endpoint_override", None),
)

steam_name: str = self.context["stream_name"]
self.prefix = config.get("prefix", None)
self.logger = context["logger"]
self.fully_qualified_key = self.create_key()
self.fully_qualified_key = (
f"{self.bucket}/{self.prefix}/{steam_name}"
if config.get("use_raw_stream_name")
else self.create_key()
)
self.logger.info(f"key: {self.fully_qualified_key}")

@abstractmethod
Expand All @@ -78,7 +83,7 @@ def _write(self, contents: str = None) -> None:
# TODO: create dynamic cloud
# TODO: is there a better way to handle write contents ?
with open(
f"s3://{self.fully_qualified_key}.{self.extension}.{self.compression}",
f"s3://{self.fully_qualified_key}",
"w",
transport_params={"client": self.client},
) as f:
Expand Down Expand Up @@ -120,7 +125,7 @@ def create_key(self) -> str:
grain = DATE_GRAIN[self.config["append_date_to_filename_grain"].lower()]
file_name += f"{self.create_file_structure(batch_start, grain)}"

return f"{folder_path}{file_name}"
return f"{folder_path}{file_name}.{self.extension}.{self.compression}"

def create_folder_structure(
self, batch_start: datetime, grain: int, partition_name_enabled: bool
Expand Down
33 changes: 33 additions & 0 deletions target_s3/formats/format_jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime

from bson import ObjectId
from simplejson import JSONEncoder, dumps

from target_s3.formats.format_base import FormatBase


class JsonSerialize(JSONEncoder):
def default(self, obj: any) -> any:
if isinstance(obj, ObjectId):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
else:
raise TypeError(f"Type {type(obj)} not serializable")


class FormatJsonl(FormatBase):
def __init__(self, config, context) -> None:
super().__init__(config, context, "jsonl")
pass

def _prepare_records(self):
# use default behavior, no additional prep needed
return super()._prepare_records()

def _write(self) -> None:
return super()._write('\n'.join(map(dumps, self.records)))

def run(self) -> None:
# use default behavior, no additional run steps needed
return super().run(self.context["records"])
3 changes: 2 additions & 1 deletion target_s3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from target_s3.formats.format_parquet import FormatParquet
from target_s3.formats.format_csv import FormatCsv
from target_s3.formats.format_json import FormatJson
from target_s3.formats.format_jsonl import FormatJsonl


LOGGER = logging.getLogger("target-s3")
FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson}
FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson, "jsonl": FormatJsonl}


class s3Sink(BatchSink):
Expand Down
9 changes: 8 additions & 1 deletion target_s3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Targets3(Target):
allowed_values=[
"parquet",
"json",
"jsonl",
], # TODO: configure this from class
),
th.Property(
Expand Down Expand Up @@ -137,6 +138,12 @@ class Targets3(Target):
description="A flag indicating whether to append _process_date to record.",
default=False,
),
th.Property(
"use_raw_stream_name",
th.BooleanType,
description="A flag to force the filename to be identical to the stream name.",
default=False,
),
th.Property(
"append_date_to_prefix",
th.BooleanType,
Expand Down Expand Up @@ -215,4 +222,4 @@ def deserialize_json(self, line: str) -> dict:


if __name__ == "__main__":
Targets3.cli()
Targets3.cli()

0 comments on commit c6d00e2

Please sign in to comment.