-
Notifications
You must be signed in to change notification settings - Fork 362
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The PrefixFileSystem is a filesystem-wrapper. It assumes every path it is dealing with is relative to the `prefix`. After performing the necessary paths operation it delegates everything to the wrapped filesystem. Resolves #395
- Loading branch information
Luca Moschella
committed
Sep 9, 2021
1 parent
c15b586
commit 7686547
Showing
2 changed files
with
270 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import os | ||
from pathlib import Path | ||
from typing import Any, Iterable, Sequence, Union | ||
|
||
import fsspec | ||
from fsspec import AbstractFileSystem | ||
from fsspec.core import split_protocol | ||
from fsspec.utils import stringify_path | ||
|
||
|
||
class PrefixFileSystem(AbstractFileSystem): | ||
def __init__( | ||
self, | ||
prefix: str, | ||
filesystem: fsspec.AbstractFileSystem, | ||
*args, | ||
**storage_options, | ||
) -> None: | ||
super().__init__(*args, **storage_options) | ||
self.prefix = prefix | ||
self.filesystem = filesystem | ||
|
||
def _add_fs_prefix(self, path: Union[str, Path]) -> Union[str, Sequence[str]]: | ||
if isinstance(path, (str, Path)): | ||
path = stringify_path(path) | ||
protocol, path = split_protocol(path) | ||
path = os.path.join(self.prefix, path) | ||
return protocol + "://" + path if protocol is not None else path | ||
elif isinstance(path, Iterable): | ||
return [self._add_fs_prefix(x) for x in path] | ||
assert False | ||
|
||
def _remove_fs_prefix(self, path: Union[str, Path]) -> Union[str, Sequence[str]]: | ||
if isinstance(path, (str, Path)): | ||
path = stringify_path(path) | ||
protocol, path = split_protocol(path) | ||
path = os.path.relpath(path, start=self.prefix) | ||
return protocol + "://" + path if protocol is not None else path | ||
elif isinstance(path, Iterable): | ||
return [self._remove_fs_prefix(x) for x in path] | ||
assert False | ||
|
||
def mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None: | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.mkdir(path=path, create_parents=create_parents, **kwargs) | ||
|
||
def makedirs(self, path: str, exist_ok: bool = False): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.mkdirs(path=path, exist_ok=exist_ok) | ||
|
||
def rmdir(self, path: str): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.rmdir(path=path) | ||
|
||
def ls( | ||
self, | ||
path: str, | ||
detail=False, | ||
**kwargs, | ||
) -> Sequence[str]: | ||
path = self._add_fs_prefix(path) | ||
ls_out = self.filesystem.ls(path=path, detail=detail, **kwargs) | ||
if detail: | ||
for out in ls_out: | ||
out["name"] = self._remove_fs_prefix(out["name"]) | ||
return ls_out | ||
return self._remove_fs_prefix(ls_out) | ||
|
||
def glob(self, path: str, **kwargs): | ||
path = self._add_fs_prefix(path) | ||
glob_out = self.filesystem.glob(path=path, **kwargs) | ||
return [self._remove_fs_prefix(x) for x in glob_out] | ||
|
||
def info(self, path: str, **kwargs): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.info(path=path, **kwargs) | ||
|
||
def cp_file(self, path1: str, path2: str, **kwargs): | ||
path1 = self._add_fs_prefix(path1) | ||
path2 = self._add_fs_prefix(path2) | ||
return self.filesystem.cp_file(path1, path2, **kwargs) | ||
|
||
def get_file(self, path1: str, path2: str, callback=None, **kwargs): | ||
path1 = self._add_fs_prefix(path1) | ||
path2 = self._add_fs_prefix(path2) | ||
return self.filesystem.get_file(path1, path2, callback, **kwargs) | ||
|
||
def put_file(self, path1: str, path2: str, callback=None, **kwargs): | ||
path1 = self._add_fs_prefix(path1) | ||
path2 = self._add_fs_prefix(path2) | ||
return self.filesystem.put_file(path1, path2, callback, **kwargs) | ||
|
||
def mv_file(self, path1: str, path2: str, **kwargs): | ||
path1 = self._add_fs_prefix(path1) | ||
path2 = self._add_fs_prefix(path2) | ||
return self.filesystem.mv_file(path1, path2, **kwargs) | ||
|
||
def rm_file(self, path: str): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.rm_file(path) | ||
|
||
def rm(self, path: str, recursive=False, maxdepth=None): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.rm(path, recursive=False, maxdepth=None) | ||
|
||
def touch(self, path: str, **kwargs): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.touch(path, **kwargs) | ||
|
||
def created(self, path: str): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.created(path) | ||
|
||
def modified(self, path: str): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.modified(path) | ||
|
||
def sign(self, path: str, expiration=100, **kwargs): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.sign(path, expiration=100, **kwargs) | ||
|
||
def cat( | ||
self, | ||
path: str, | ||
recursive: bool = False, | ||
on_error: str = "raise", | ||
**kwargs: Any, | ||
): | ||
path = self._add_fs_prefix(path) | ||
return self.filesystem.cat( | ||
path, recursive=recursive, on_error=on_error, **kwargs | ||
) | ||
|
||
def __repr__(self) -> str: | ||
return f"{self.__class__.__qualname__}(prefix='{self.prefix}', filesystem={self.filesystem})" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
from __future__ import absolute_import, division, print_function | ||
|
||
import os | ||
import os.path | ||
import tempfile | ||
from contextlib import contextmanager | ||
|
||
import pytest | ||
|
||
import fsspec | ||
from fsspec.core import OpenFile | ||
from fsspec.implementations.local import make_path_posix | ||
from fsspec.implementations.prefix import PrefixFileSystem | ||
|
||
files = { | ||
".test.accounts.1.json": ( | ||
b'{"amount": 100, "name": "Alice"}\n' | ||
b'{"amount": 200, "name": "Bob"}\n' | ||
b'{"amount": 300, "name": "Charlie"}\n' | ||
b'{"amount": 400, "name": "Dennis"}\n' | ||
), | ||
".test.accounts.2.json": ( | ||
b'{"amount": 500, "name": "Alice"}\n' | ||
b'{"amount": 600, "name": "Bob"}\n' | ||
b'{"amount": 700, "name": "Charlie"}\n' | ||
b'{"amount": 800, "name": "Dennis"}\n' | ||
), | ||
} | ||
|
||
|
||
csv_files = { | ||
".test.fakedata.1.csv": (b"a,b\n" b"1,2\n"), | ||
".test.fakedata.2.csv": (b"a,b\n" b"3,4\n"), | ||
} | ||
odir = os.getcwd() | ||
|
||
|
||
@contextmanager | ||
def filetexts(d, open=open, mode="t"): | ||
"""Dumps a number of textfiles to disk | ||
d - dict | ||
a mapping from filename to text like {'a.csv': '1,1\n2,2'} | ||
Since this is meant for use in tests, this context manager will | ||
automatically switch to a temporary current directory, to avoid | ||
race conditions when running tests in parallel. | ||
""" | ||
dirname = tempfile.mkdtemp() | ||
try: | ||
os.chdir(dirname) | ||
for filename, text in d.items(): | ||
f = open(filename, "w" + mode) | ||
try: | ||
f.write(text) | ||
finally: | ||
try: | ||
f.close() | ||
except AttributeError: | ||
pass | ||
|
||
yield list(d) | ||
|
||
for filename in d: | ||
if os.path.exists(filename): | ||
try: | ||
os.remove(filename) | ||
except (IOError, OSError): | ||
pass | ||
finally: | ||
os.chdir(odir) | ||
|
||
|
||
def test_cats(): | ||
with filetexts(csv_files, mode="b"): | ||
fs = PrefixFileSystem(prefix=".", filesystem=fsspec.filesystem("file")) | ||
assert fs.cat(".test.fakedata.1.csv") == b"a,b\n" b"1,2\n" | ||
out = set(fs.cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"]).values()) | ||
assert out == {b"a,b\n" b"1,2\n", b"a,b\n" b"3,4\n"} | ||
assert fs.cat(".test.fakedata.1.csv", None, None) == b"a,b\n" b"1,2\n" | ||
assert fs.cat(".test.fakedata.1.csv", start=1, end=6) == b"a,b\n" b"1,2\n"[1:6] | ||
assert fs.cat(".test.fakedata.1.csv", start=-1) == b"a,b\n" b"1,2\n"[-1:] | ||
assert ( | ||
fs.cat(".test.fakedata.1.csv", start=1, end=-2) == b"a,b\n" b"1,2\n"[1:-2] | ||
) | ||
out = set( | ||
fs.cat( | ||
[".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1 | ||
).values() | ||
) | ||
assert out == {b"a,b\n" b"1,2\n"[1:-1], b"a,b\n" b"3,4\n"[1:-1]} | ||
|
||
|
||
def test_not_found(): | ||
fn = "not-a-file" | ||
fs = PrefixFileSystem(prefix=".", filesystem=fsspec.filesystem("file")) | ||
with pytest.raises((FileNotFoundError, OSError)): | ||
with OpenFile(fs, fn, mode="rb"): | ||
pass | ||
|
||
|
||
def test_isfile(): | ||
fs = PrefixFileSystem(prefix=".", filesystem=fsspec.filesystem("file")) | ||
with filetexts(files, mode="b"): | ||
for f in files.keys(): | ||
assert fs.isfile(f) | ||
assert fs.isfile("file://" + f) | ||
assert not fs.isfile("not-a-file") | ||
assert not fs.isfile("file://not-a-file") | ||
|
||
|
||
def test_isdir(): | ||
fs = PrefixFileSystem(prefix=".", filesystem=fsspec.filesystem("file")) | ||
with filetexts(files, mode="b"): | ||
for f in files.keys(): | ||
assert fs.isdir(os.path.dirname(os.path.abspath(f))) | ||
assert not fs.isdir(f) | ||
assert not fs.isdir("not-a-dir") | ||
|
||
|
||
@pytest.mark.parametrize("prefix", ["/", "/tmp"]) | ||
def test_directories(tmpdir, prefix): | ||
tmpdir = make_path_posix(str(tmpdir)) | ||
rel_tmpdir = os.path.relpath(tmpdir, prefix) | ||
|
||
fs = PrefixFileSystem(prefix=prefix, filesystem=fsspec.filesystem("file")) | ||
|
||
fs.mkdir(rel_tmpdir + "/dir") | ||
|
||
assert not fs.ls(tmpdir + "/dir") | ||
|
||
assert rel_tmpdir + "/dir" in fs.ls(rel_tmpdir) | ||
assert fs.ls(rel_tmpdir, True)[0]["type"] == "directory" | ||
fs.rmdir(rel_tmpdir + "/dir") | ||
assert not fs.ls(rel_tmpdir) |