Skip to content

Commit

Permalink
chore: Add a file (CSV) tap for testing (WIP!) (#2668)
Browse files Browse the repository at this point in the history
* chore: Add a file (CSV) tap for testing

* Implement and test tap

* Add fixture files

* Incremental replication

* Formalize default stream name

* Update cache keys

* Enable more tests

* gitignore cache file

* More tests
  • Loading branch information
edgarrmondragon authored Sep 17, 2024
1 parent 83b3d49 commit 3d3ac7b
Show file tree
Hide file tree
Showing 9 changed files with 1,296 additions and 3 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ jobs:
nox --version
- uses: actions/cache@v4
if: always() && (matrix.session == 'tests')
if: matrix.session == 'tests'
with:
path: http_cache.sqlite
key: http_cache-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.session }}-${{ matrix.sqlalchemy }}
key: http_cache-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.sqlalchemy }}

- name: Run Nox
env:
Expand All @@ -106,7 +106,7 @@ jobs:
if: always() && (matrix.session == 'tests')
with:
include-hidden-files: true
name: coverage-data-nox_${{ matrix.session }}-${{ matrix.os }}-py${{ matrix.python-version }}_sqlalchemy_${{ matrix.sqlalchemy }}
name: coverage-data-nox_-${{ matrix.os }}-py${{ matrix.python-version }}_sqlalchemy_${{ matrix.sqlalchemy }}
path: ".coverage.*"

tests-external:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# HTTP cache
http_cache.sqlite

# Local Poetry configuration file

poetry.toml
Expand Down
1,001 changes: 1,001 additions & 0 deletions fixtures/csv/customers.csv

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions fixtures/csv/employees.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
id,first_name,last_name,email,ip_address
1,Tobye,Tallach,ttallach0@icq.com,159.80.54.64
2,Bret,Auchterlonie,bauchterlonie1@t-online.de,63.179.228.179
3,Chester,Leban,cleban2@arstechnica.com,109.23.123.220
4,Weston,Venny,wvenny3@about.com,80.78.0.69
5,Alejoa,Hassen,ahassen4@narod.ru,193.70.126.231
6,Otes,Ioselevich,oioselevich5@pen.io,55.238.240.160
7,Dolley,Mc Ilwrick,dmcilwrick6@vistaprint.com,225.224.151.67
8,Cliff,Druitt,cdruitt7@earthlink.net,216.35.85.142
9,Alfreda,Parysiak,aparysiak8@google.com.br,234.124.93.69
10,Alfonso,Wotherspoon,awotherspoon9@cafepress.com,34.94.1.132
11,Jemmy,Gavriel,jgavriela@wikispaces.com,69.13.142.245
12,Ezechiel,Binion,ebinionb@princeton.edu,85.203.127.191
13,Burk,Blowfelde,bblowfeldec@theglobeandmail.com,74.133.42.177
14,Danette,Brealey,dbrealeyd@tumblr.com,249.85.157.243
15,Brent,Collcutt,bcollcutte@twitpic.com,68.202.67.52
16,Filbert,Wane,fwanef@mozilla.org,51.190.146.189
17,Amory,Brewers,abrewersg@sciencedirect.com,147.155.225.194
18,Giraud,Reen,greenh@themeforest.net,134.254.177.66
19,Burtie,Siebert,bsieberti@reuters.com,47.194.48.217
20,Adam,Maddick,amaddickj@theguardian.com,165.16.248.228
21,Callean,Vernall,cvernallk@google.pl,243.145.198.197
22,Olympie,Itzakovitz,oitzakovitzl@addthis.com,40.55.240.15
23,Jacky,Emney,jemneym@cbslocal.com,216.72.80.81
24,Isidoro,Novello,inovellon@symantec.com,153.171.11.150
25,Kora,Liversedge,kliversedgeo@godaddy.com,131.126.97.242
26,Salaidh,McMenamie,smcmenamiep@yolasite.com,228.231.31.219
27,Corey,Dowdeswell,cdowdeswellq@a8.net,203.73.30.64
28,Brodie,Holwell,bholwellr@china.com.cn,64.200.225.25
29,Trudey,Ungerer,tungerers@friendfeed.com,122.82.88.41
30,Doralin,Maxted,dmaxtedt@photobucket.com,37.78.14.199
31,Maurie,Marklin,mmarklinu@gizmodo.com,22.181.178.6
32,Hermann,Voase,hvoasev@gnu.org,155.126.157.84
33,Fanchette,Callaway,fcallawayw@naver.com,121.161.80.246
34,Sara-ann,Birdall,sbirdallx@hexun.com,35.64.166.83
35,Harriot,Clipsham,hclipshamy@redcross.org,78.103.253.219
36,Bonita,Woolway,bwoolwayz@artisteer.com,70.114.50.135
37,Arleyne,MacComiskey,amaccomiskey10@oracle.com,80.22.221.216
38,Ethelbert,Covill,ecovill11@blogs.com,52.66.186.124
39,Irita,Knee,iknee12@skyrock.com,239.247.34.120
40,Naoma,Janca,njanca13@washington.edu,189.63.152.60
41,Kayne,Mizzen,kmizzen14@jalbum.net,84.133.236.10
42,Estell,Stuckford,estuckford15@nationalgeographic.com,246.168.153.22
43,Larine,Stack,lstack16@bravesites.com,197.176.195.68
44,Rikki,Newbold,rnewbold17@omniture.com,27.245.43.243
45,Romonda,Charer,rcharer18@ifeng.com,137.144.236.93
46,Letizia,Monksfield,lmonksfield19@ebay.com,209.47.5.147
47,Sinclare,McAreavey,smcareavey1a@cnn.com,251.250.216.206
48,Athene,Haysham,ahaysham1b@ifeng.com,81.227.231.240
49,Gale,Tracy,gtracy1c@squidoo.com,93.138.226.205
50,Dareen,O'Shields,doshields1d@nymag.com,27.226.127.240
Empty file.
5 changes: 5 additions & 0 deletions samples/sample_tap_csv/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV

SampleTapCSV.cli()
91 changes: 91 additions & 0 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

import csv
import datetime
import os
import typing as t

from singer_sdk import Stream
from singer_sdk.streams.core import REPLICATION_INCREMENTAL

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
from singer_sdk.tap_base import Tap

SDC_META_FILEPATH = "_sdc_path"
SDC_META_MODIFIED_AT = "_sdc_modified_at"


def _to_datetime(value: float) -> str:
return datetime.datetime.fromtimestamp(value).astimezone()


class CSVStream(Stream):
"""CSV stream class."""

def __init__(
self,
tap: Tap,
name: str | None = None,
*,
partitions: list[str] | None = None,
) -> None:
# TODO(edgarmondragon): Build schema from CSV file.
schema = {
"type": ["object"],
"properties": {
SDC_META_FILEPATH: {"type": "string"},
SDC_META_MODIFIED_AT: {"type": "string", "format": "date-time"},
},
"required": [],
"additionalProperties": {"type": "string"},
}
super().__init__(tap, schema, name)

# TODO(edgarrmondragon): Make this None if the filesytem does not support it.
self.replication_key = SDC_META_MODIFIED_AT

self._partitions = partitions or []

@property
def partitions(self) -> list[Context]:
return self._partitions

def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301
# Make these configurable.
delimiter = ","
quotechar = '"'
escapechar = None
doublequote = True
lineterminator = "\r\n"

# TODO: Use filesytem-specific file open method.
with open(path, encoding="utf-8") as file: # noqa: PTH123
reader = csv.DictReader(
file,
delimiter=delimiter,
quotechar=quotechar,
escapechar=escapechar,
doublequote=doublequote,
lineterminator=lineterminator,
)
yield from reader

def get_records(
self,
context: Context | None,
) -> t.Iterable[Record | tuple[Record, Context | None]]:
path: str = context[SDC_META_FILEPATH]
mtime = os.path.getmtime(path) # noqa: PTH204

if (
self.replication_method is REPLICATION_INCREMENTAL
and (previous_bookmark := self.get_starting_timestamp(context))
and _to_datetime(mtime) < previous_bookmark
):
self.logger.info("File has not been modified since last read, skipping")
return

for record in self._read_file(path):
record[SDC_META_MODIFIED_AT] = _to_datetime(mtime)
yield record
106 changes: 106 additions & 0 deletions samples/sample_tap_csv/sample_tap_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Sample Tap for CSV files."""

from __future__ import annotations

import enum
import functools
import os

import singer_sdk.typing as th
from samples.sample_tap_csv.client import SDC_META_FILEPATH, CSVStream
from singer_sdk import Tap

DEFAULT_MERGE_STREAM_NAME = "files"


def file_path_to_stream_name(file_path: str) -> str:
"""Convert a file path to a stream name."""
return os.path.basename(file_path).replace(".csv", "").replace(os.sep, "__") # noqa: PTH119


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


class SampleTapCSV(Tap):
"""Sample Tap for CSV files."""

name = "sample-tap-csv"

config_jsonschema = th.PropertiesList(
th.Property(
"path",
th.StringType,
required=True,
description="Path to CSV files.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=[
ReadMode.one_stream_per_file,
ReadMode.merge,
],
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
# TODO(edgarmondragon): Other configuration options.
).to_dict()

@functools.cached_property
def read_mode(self) -> ReadMode:
return ReadMode(self.config["read_mode"])

def discover_streams(self) -> list:
# TODO(edgarmondragon): Implement stream discovery, based on the configured path
# and read mode.
path: str = self.config[
"path"
] # a directory for now, but could be a glob pattern

# One stream per file
if self.read_mode == ReadMode.one_stream_per_file:
if os.path.isdir(path): # noqa: PTH112
return [
CSVStream(
tap=self,
name=file_path_to_stream_name(member),
partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118
)
for member in os.listdir(path)
if member.endswith(".csv")
]

msg = f"Path {path} is not a directory."
raise ValueError(msg)

# Merge
if os.path.isdir(path): # noqa: PTH112
contexts = [
{
SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118
}
for member in os.listdir(path)
if member.endswith(".csv")
]
return [
CSVStream(
tap=self,
name=self.config["stream_name"],
partitions=contexts,
)
]
return []
36 changes: 36 additions & 0 deletions tests/samples/test_tap_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import pytest

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV
from singer_sdk.testing import get_tap_test_class

_TestCSVMerge = get_tap_test_class(
tap_class=SampleTapCSV,
config={
"path": "fixtures/csv",
"read_mode": "merge",
"stream_name": "people",
},
)


class TestCSVMerge(_TestCSVMerge):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)


TestCSVOneStreamPerFile = get_tap_test_class(
tap_class=SampleTapCSV,
config={
"path": "fixtures/csv",
"read_mode": "one_stream_per_file",
},
)


class TestCSVOneStreamPerFile(TestCSVOneStreamPerFile):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)

0 comments on commit 3d3ac7b

Please sign in to comment.