From dc3105f1ebbb76da56a483268de0f19f07330738 Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Fri, 16 Feb 2024 15:06:03 +0800 Subject: [PATCH 1/7] feat: allow raw stream name to be used instead of one calculated by plugin --- target_s3/formats/format_base.py | 10 +++++++--- target_s3/target.py | 6 ++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/target_s3/formats/format_base.py b/target_s3/formats/format_base.py index df9caf3..9ffdf03 100644 --- a/target_s3/formats/format_base.py +++ b/target_s3/formats/format_base.py @@ -69,7 +69,11 @@ def __init__(self, config: dict, context: dict, extension: str) -> None: self.prefix = config.get("prefix", None) self.logger = context["logger"] - self.fully_qualified_key = self.create_key() + self.fully_qualified_key = ( + self.context["stream_name"] + if config.get("use_raw_stream_name") + else self.create_key() + ) self.logger.info(f"key: {self.fully_qualified_key}") @abstractmethod @@ -78,7 +82,7 @@ def _write(self, contents: str = None) -> None: # TODO: create dynamic cloud # TODO: is there a better way to handle write contents ? with open( - f"s3://{self.fully_qualified_key}.{self.extension}.{self.compression}", + f"s3://{self.fully_qualified_key}", "w", transport_params={"client": self.client}, ) as f: @@ -120,7 +124,7 @@ def create_key(self) -> str: grain = DATE_GRAIN[self.config["append_date_to_filename_grain"].lower()] file_name += f"{self.create_file_structure(batch_start, grain)}" - return f"{folder_path}{file_name}" + return f"{folder_path}{file_name}.{self.extension}.{self.compression}" def create_folder_structure( self, batch_start: datetime, grain: int, partition_name_enabled: bool diff --git a/target_s3/target.py b/target_s3/target.py index dc19623..164268b 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -137,6 +137,12 @@ class Targets3(Target): description="A flag indicating whether to append _process_date to record.", default=False, ), + th.Property( + "use_raw_stream_name", + th.BooleanType, + description="A flag to force the filename to be identical to the stream name.", + default=False, + ), th.Property( "append_date_to_prefix", th.BooleanType, From 96425cbf3bdff565ee4f3e0ecda0dc0868f7ec1d Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Fri, 16 Feb 2024 15:12:41 +0800 Subject: [PATCH 2/7] fix: keep bucket and prefix in fully qualified name --- target_s3/formats/format_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/target_s3/formats/format_base.py b/target_s3/formats/format_base.py index 9ffdf03..163fc1c 100644 --- a/target_s3/formats/format_base.py +++ b/target_s3/formats/format_base.py @@ -67,10 +67,11 @@ def __init__(self, config: dict, context: dict, extension: str) -> None: endpoint_url=aws_config.get("aws_endpoint_override", None), ) + steam_name: str = self.context["stream_name"] self.prefix = config.get("prefix", None) self.logger = context["logger"] self.fully_qualified_key = ( - self.context["stream_name"] + f"{self.bucket}/{self.prefix}/{steam_name}" if config.get("use_raw_stream_name") else self.create_key() ) From ffc453ff67de107be468e0e6b5801c90fd0c7b9b Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Fri, 16 Feb 2024 15:40:10 +0800 Subject: [PATCH 3/7] feat: add plaintext format option to not serialize output --- target_s3/formats/format_plaintext.py | 22 ++++++++++++++++++++++ target_s3/sinks.py | 3 ++- target_s3/target.py | 1 + 3 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 target_s3/formats/format_plaintext.py diff --git a/target_s3/formats/format_plaintext.py b/target_s3/formats/format_plaintext.py new file mode 100644 index 0000000..54d0ce4 --- /dev/null +++ b/target_s3/formats/format_plaintext.py @@ -0,0 +1,22 @@ +from datetime import datetime + +from bson import ObjectId + +from target_s3.formats.format_base import FormatBase + + +class FormatPlaintext(FormatBase): + def __init__(self, config, context) -> None: + super().__init__(config, context, "plaintext") + pass + + def _prepare_records(self): + # use default behavior, no additional prep needed + return super()._prepare_records() + + def _write(self) -> None: + return super()._write(self.records) + + def run(self) -> None: + # use default behavior, no additional run steps needed + return super().run(self.context["records"]) diff --git a/target_s3/sinks.py b/target_s3/sinks.py index f84ecb4..0064033 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -9,10 +9,11 @@ from target_s3.formats.format_parquet import FormatParquet from target_s3.formats.format_csv import FormatCsv from target_s3.formats.format_json import FormatJson +from target_s3.formats.format_plaintext import FormatPlaintext LOGGER = logging.getLogger("target-s3") -FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson} +FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson, "plaintext": FormatPlaintext} class s3Sink(BatchSink): diff --git a/target_s3/target.py b/target_s3/target.py index 164268b..362d2e7 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -29,6 +29,7 @@ class Targets3(Target): allowed_values=[ "parquet", "json", + "plaintext", ], # TODO: configure this from class ), th.Property( From 1ad99dcc04ecc404ee7fb1425e8ba6f8f171000d Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Fri, 16 Feb 2024 15:44:44 +0800 Subject: [PATCH 4/7] fix: concatenate all records into a single newline-separated string for plaintext --- target_s3/formats/format_plaintext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_s3/formats/format_plaintext.py b/target_s3/formats/format_plaintext.py index 54d0ce4..40b48a3 100644 --- a/target_s3/formats/format_plaintext.py +++ b/target_s3/formats/format_plaintext.py @@ -15,7 +15,7 @@ def _prepare_records(self): return super()._prepare_records() def _write(self) -> None: - return super()._write(self.records) + return super()._write("\n".join(self.records)) def run(self) -> None: # use default behavior, no additional run steps needed From 6d589cb3d3763a8cc2df934fc1b2fd4ff3056920 Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Fri, 16 Feb 2024 15:57:20 +0800 Subject: [PATCH 5/7] Revert "fix: concatenate all records into a single newline-separated string for plaintext" This reverts commit 1ad99dcc04ecc404ee7fb1425e8ba6f8f171000d. --- target_s3/formats/format_plaintext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_s3/formats/format_plaintext.py b/target_s3/formats/format_plaintext.py index 40b48a3..54d0ce4 100644 --- a/target_s3/formats/format_plaintext.py +++ b/target_s3/formats/format_plaintext.py @@ -15,7 +15,7 @@ def _prepare_records(self): return super()._prepare_records() def _write(self) -> None: - return super()._write("\n".join(self.records)) + return super()._write(self.records) def run(self) -> None: # use default behavior, no additional run steps needed From 526dd616132f398dd9ad23162bcacbdb4a299ee0 Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Fri, 16 Feb 2024 15:58:11 +0800 Subject: [PATCH 6/7] fix: replace plaintext formatter with jsonl --- target_s3/formats/format_jsonl.py | 33 +++++++++++++++++++++++++++ target_s3/formats/format_plaintext.py | 22 ------------------ target_s3/sinks.py | 4 ++-- target_s3/target.py | 4 ++-- 4 files changed, 37 insertions(+), 26 deletions(-) create mode 100644 target_s3/formats/format_jsonl.py delete mode 100644 target_s3/formats/format_plaintext.py diff --git a/target_s3/formats/format_jsonl.py b/target_s3/formats/format_jsonl.py new file mode 100644 index 0000000..d324a21 --- /dev/null +++ b/target_s3/formats/format_jsonl.py @@ -0,0 +1,33 @@ +from datetime import datetime + +from bson import ObjectId +from simplejson import JSONEncoder, dumps + +from target_s3.formats.format_base import FormatBase + + +class JsonSerialize(JSONEncoder): + def default(self, obj: any) -> any: + if isinstance(obj, ObjectId): + return str(obj) + if isinstance(obj, datetime): + return obj.isoformat() + else: + raise TypeError(f"Type {type(obj)} not serializable") + + +class FormatJsonl(FormatBase): + def __init__(self, config, context) -> None: + super().__init__(config, context, "jsonl") + pass + + def _prepare_records(self): + # use default behavior, no additional prep needed + return super()._prepare_records() + + def _write(self) -> None: + return super()._write('\n'.join(map(dumps, self.records))) + + def run(self) -> None: + # use default behavior, no additional run steps needed + return super().run(self.context["records"]) diff --git a/target_s3/formats/format_plaintext.py b/target_s3/formats/format_plaintext.py deleted file mode 100644 index 54d0ce4..0000000 --- a/target_s3/formats/format_plaintext.py +++ /dev/null @@ -1,22 +0,0 @@ -from datetime import datetime - -from bson import ObjectId - -from target_s3.formats.format_base import FormatBase - - -class FormatPlaintext(FormatBase): - def __init__(self, config, context) -> None: - super().__init__(config, context, "plaintext") - pass - - def _prepare_records(self): - # use default behavior, no additional prep needed - return super()._prepare_records() - - def _write(self) -> None: - return super()._write(self.records) - - def run(self) -> None: - # use default behavior, no additional run steps needed - return super().run(self.context["records"]) diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 0064033..c1fd7c3 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -9,11 +9,11 @@ from target_s3.formats.format_parquet import FormatParquet from target_s3.formats.format_csv import FormatCsv from target_s3.formats.format_json import FormatJson -from target_s3.formats.format_plaintext import FormatPlaintext +from target_s3.formats.format_jsonl import FormatJsonl LOGGER = logging.getLogger("target-s3") -FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson, "plaintext": FormatPlaintext} +FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson, "jsonl": FormatJsonl} class s3Sink(BatchSink): diff --git a/target_s3/target.py b/target_s3/target.py index 362d2e7..66e6f0f 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -29,7 +29,7 @@ class Targets3(Target): allowed_values=[ "parquet", "json", - "plaintext", + "jsonl", ], # TODO: configure this from class ), th.Property( @@ -222,4 +222,4 @@ def deserialize_json(self, line: str) -> dict: if __name__ == "__main__": - Targets3.cli() \ No newline at end of file + Targets3.cli() From bef8f8fe6a8a3a0777e97ae9f804337c6ad97b98 Mon Sep 17 00:00:00 2001 From: Alex Maras Date: Tue, 19 Mar 2024 17:30:26 +0800 Subject: [PATCH 7/7] doc: add use_raw_stream_name to configuration --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 97cf0ae..4475ac6 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com). "include_process_date": true|false, "append_date_to_prefix": true|false, "partition_name_enabled": true|false, + "use_raw_stream_name": true|false, "append_date_to_prefix_grain": "day", "append_date_to_filename": true|false, "append_date_to_filename_grain": "microsecond",