diff --git a/tap_linear/client.py b/tap_linear/client.py index 5a25c8b..5cc925d 100644 --- a/tap_linear/client.py +++ b/tap_linear/client.py @@ -8,6 +8,8 @@ from singer_sdk.helpers._classproperty import classproperty from singer_sdk.streams import GraphQLStream +from tap_linear.utils import flatten_node_lists + class LinearStream(GraphQLStream): """Linear stream class.""" @@ -59,3 +61,14 @@ def get_url_params( params["replicationKeyValue"] = replication_key_value return params + + def post_process( + self, + row: dict, + context: dict | None = None, # noqa: ARG002 + ) -> dict | None: + """Post-process row. + + Flatten nested nodes lists. + """ + return flatten_node_lists(row) diff --git a/tap_linear/streams.py b/tap_linear/streams.py index 5239b02..9f9c5a1 100644 --- a/tap_linear/streams.py +++ b/tap_linear/streams.py @@ -31,15 +31,10 @@ class CyclesStream(LinearStream): th.Property("updatedAt", th.DateTimeType), th.Property( "uncompletedIssuesUponClose", - th.ObjectType( - th.Property( - "nodes", - th.ArrayType( - th.ObjectType( - th.Property("id", th.StringType), - th.Property("identifier", th.StringType), - ), - ), + th.ArrayType( + th.ObjectType( + th.Property("id", th.StringType), + th.Property("identifier", th.StringType), ), ), ), @@ -52,7 +47,7 @@ class CyclesStream(LinearStream): cycles( first: 100 after: $next - filter: { updatedAt: {gt: $replicationKeyValue } } + filter: { updatedAt: { gt: $replicationKeyValue } } ) { nodes { id @@ -90,6 +85,172 @@ class CyclesStream(LinearStream): ) +class IssuesStream(LinearStream): + """Issues stream.""" + + name = "issues" + schema = th.PropertiesList( + th.Property("id", th.StringType), + th.Property("identifier", th.StringType), + th.Property("title", th.StringType), + th.Property("description", th.StringType), + th.Property("priority", th.NumberType), + th.Property("type", th.StringType), + th.Property( + "state", + th.ObjectType( + th.Property("id", th.StringType), + th.Property("name", th.StringType), + th.Property("type", th.StringType), + ), + ), + th.Property("estimate", th.NumberType), + th.Property("createdAt", th.DateTimeType), + th.Property("updatedAt", th.DateTimeType), + th.Property("completedAt", th.DateTimeType), + th.Property("archivedAt", th.DateTimeType), + th.Property("assignee", UserType), + th.Property("creator", UserType), + th.Property( + "team", + th.ObjectType( + th.Property("id", th.StringType), + th.Property("name", th.StringType), + th.Property("key", th.StringType), + ), + ), + th.Property( + "project", + th.ObjectType( + th.Property("id", th.StringType), + th.Property("name", th.StringType), + th.Property("key", th.StringType), + ), + ), + th.Property( + "parent", + th.ObjectType( + th.Property("id", th.StringType), + th.Property("title", th.StringType), + ), + ), + th.Property( + "labels", + th.ArrayType( + th.ObjectType( + th.Property("id", th.StringType), + th.Property("name", th.StringType), + th.Property("color", th.StringType), + ), + ), + ), + th.Property( + "history", + th.ArrayType( + th.ObjectType( + th.Property("id", th.StringType), + th.Property("createdAt", th.DateTimeType), + th.Property("updatedAt", th.DateTimeType), + th.Property( + "issue", + th.ObjectType( + th.Property("id", th.StringType), + ), + ), + th.Property("actor", UserType), + th.Property("fromCycleId", th.StringType), + th.Property("toCycleId", th.StringType), + th.Property("toAssigneeId", th.StringType), + th.Property("fromStateId", th.StringType), + th.Property("toStateId", th.StringType), + ), + ), + ), + ).to_dict() + + primary_keys: t.ClassVar[list[str]] = ["id"] + replication_key = "updatedAt" + query = """ + query Issues($next: String, $replicationKeyValue: DateTime) { + issues( + first: 35 + after: $next + filter: { updatedAt: { gt: $replicationKeyValue } } + ) { + nodes { + id + identifier + title + description + priority + state { + id + name + type + } + estimate + createdAt + updatedAt + completedAt + archivedAt + assignee { + id + name + email + } + creator { + id + name + email + } + team { + id + name + key + } + project { + id + name + } + parent { + id + title + } + labels { + nodes { + id + name + color + } + } + history { + nodes { + id + createdAt + updatedAt + issue { + id + } + actor { + id + } + fromCycleId + toCycleId + toAssigneeId + fromStateId + toStateId + } + } + } + pageInfo { + hasNextPage + endCursor + } + } + } + """ + + class CommentStream(LinearStream): """Comment stream.""" diff --git a/tap_linear/tap.py b/tap_linear/tap.py index efed286..bfe0675 100644 --- a/tap_linear/tap.py +++ b/tap_linear/tap.py @@ -44,6 +44,7 @@ def discover_streams(self) -> list[streams.LinearStream]: """ return [ streams.CyclesStream(self), + streams.IssuesStream(self), streams.CommentStream(self), ] diff --git a/tap_linear/utils.py b/tap_linear/utils.py new file mode 100644 index 0000000..8847bf1 --- /dev/null +++ b/tap_linear/utils.py @@ -0,0 +1,12 @@ +from __future__ import annotations + + +def flatten_node_lists(obj): # noqa: ANN001, ANN201 + """Flatten nodes lists.""" + if isinstance(obj, dict) and obj.keys() == {"nodes"}: + return [flatten_node_lists(item) for item in obj["nodes"]] + + if isinstance(obj, dict): + return {key: flatten_node_lists(value) for key, value in obj.items()} + + return obj diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..776d3bd --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from tap_linear.utils import flatten_node_lists + + +def test_flatten_node_lists(): + assert flatten_node_lists( + { + "things": { + "nodes": [{"id": 1}, {"id": 2}], + }, + "object": { + "id": 3, + }, + "scalar": "value", + "list": [1, 2, 3], + "nested": { + "things": { + "nodes": [{"id": 4}, {"id": 5}], + }, + }, + }, + ) == { + "things": [{"id": 1}, {"id": 2}], + "object": { + "id": 3, + }, + "scalar": "value", + "list": [1, 2, 3], + "nested": { + "things": [{"id": 4}, {"id": 5}], + }, + }