Skip to content

Commit

Permalink
added yml config and better schema generation
Browse files Browse the repository at this point in the history
  • Loading branch information
Prakhar Srivastava authored and Prakhar Srivastava committed Oct 11, 2023
1 parent 8d1e413 commit 111ba6e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 46 deletions.
72 changes: 54 additions & 18 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,61 @@
version: 1
send_anonymous_usage_stats: false
project_id: "target-s3"
project_id: target-s3
default_environment: test
environments:
- name: test
- name: test
plugins:
extractors: []
loaders:
- name: "target-s3"
namespace: "target_s3"
pip_url: -e .
capabilities:
- about
- stream-maps
- record-flattening
config:
start_date: "2010-01-01T00:00:00Z"
settings:
# TODO: To configure using Meltano, declare settings and their types here:
- name: username
- name: password
kind: password
- name: start_date
value: "2010-01-01T00:00:00Z"
- name: target-s3
namespace: target_s3
pip_url: -e .
capabilities:
- about
- stream-maps
- record-flattening
settings:
- name: format.format_type
- name: format.format_parquet.validate
kind: boolean
value: false
- name: format.format_parquet.get_schema_from_tap
kind: boolean
value: false
- name: cloud_provider.cloud_provider_type
value: aws
- name: cloud_provider.aws.aws_access_key_id
kind: password
- name: cloud_provider.aws.aws_secret_access_key
kind: password
- name: cloud_provider.aws.aws_session_token
kind: password
- name: cloud_provider.aws.aws_region
kind: password
- name: cloud_provider.aws.aws_profile_name
kind: password
- name: cloud_provider.aws.aws_bucket
kind: password
- name: cloud_provider.aws.aws_endpoint_override
- name: prefix
- name: stream_name_path_override
- name: include_process_date
kind: boolean
value: false
- name: append_date_to_prefix
kind: boolean
value: true
- name: partition_name_enabled
kind: boolean
value: false
- name: append_date_to_prefix_grain
value: day
- name: append_date_to_filename
kind: boolean
value: true
- name: append_date_to_filename_grain
value: microsecond
- name: flatten_records
kind: boolean
value: false

61 changes: 34 additions & 27 deletions target_s3/formats/format_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self, config, context) -> None:
cloud_provider_config_type,
cloud_provider_config.get(cloud_provider_config_type, None),
)
self.stream_schema = context.get("stream_schema", {})
self.parquet_schema = None

def create_filesystem(
self,
Expand Down Expand Up @@ -160,7 +162,7 @@ def sanitize(self, value):
return None
return value

def create_batch_schema(self) -> pyarrow.schema:
def create_schema(self) -> pyarrow.schema:
"""Generates schema from the records schema present in the tap.
This is effective way to declare schema instead of relying on pyarrow to
detect schema type.
Expand Down Expand Up @@ -269,44 +271,49 @@ def get_schema_from_object(properties: dict, level: int = 0):
fields.append(pyarrow.field(key, pyarrow.struct(inner_fields)))
return fields

properties = self.context["batch_schema"].get("properties")
schema = pyarrow.schema(get_schema_from_object(properties=properties))
return schema
properties = self.stream_schema.get("properties")
parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties))
self.parquet_schema = parquet_schema
return parquet_schema

def create_dataframe(self) -> Table:
"""Creates a pyarrow Table object from the record set."""
try:
fields = set()
for d in self.records:
fields = fields.union(d.keys())

format_parquet = self.format.get("format_parquet", None)
if format_parquet and format_parquet.get("validate", None) == True:
# NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html
# and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it.
schema = dict()
input = {
f: [
self.validate(schema, self.sanitize(f), row.get(f))
for row in self.records
]
for f in fields
}
else:
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}

if format_parquet and format_parquet.get("get_schema_from_tap", False):
parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema()
fields = set([property.name for property in parquet_schema])
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}

ret = Table.from_pydict(
mapping=input, schema=self.create_batch_schema()
mapping=input, schema=parquet_schema
)
else:
fields = set()
for d in self.records:
fields = fields.union(d.keys())
if format_parquet and format_parquet.get("validate", None) == True:
# NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html
# and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it.
schema = dict()
input = {
f: [
self.validate(schema, self.sanitize(f), row.get(f))
for row in self.records
]
for f in fields
}
else:
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}
ret = Table.from_pydict(mapping=input)

except Exception as e:
self.logger.info(self.records)
self.logger.error("Failed to create parquet dataframe.")
self.logger.error(e)
raise e
Expand Down
2 changes: 1 addition & 1 deletion target_s3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def process_batch(self, context: dict) -> None:
# add stream name to context
context["stream_name"] = self.stream_name
context["logger"] = self.logger
context["batch_schema"] = self.schema
context["stream_schema"] = self.schema
# creates new object for each batch
format_type_client = format_type_factory(
FORMAT_TYPE[self.format_type], self.config, context
Expand Down

0 comments on commit 111ba6e

Please sign in to comment.