Skip to content

Commit

Permalink
upath.core: fix .name and glob
Browse files Browse the repository at this point in the history
  • Loading branch information
ap-- committed Nov 20, 2024
1 parent a1d0049 commit ee8f4f1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
5 changes: 4 additions & 1 deletion upath/_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ def split(self, path: UPathLike) -> tuple[str, str]:
stripped_path = self.strip_protocol(path)
head = self.parent(stripped_path) or self.root_marker
if head:
return head, stripped_path[len(head) + 1 :]
tail = stripped_path[len(head) + 1 :]
return head, tail
elif not head and self.netloc_is_anchor:
return stripped_path, ""
else:
return "", stripped_path

Expand Down
64 changes: 64 additions & 0 deletions upath/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ def __str__(self) -> str:
else:
return self.path

@property
def _stack(self):
"""
Split the path into a 2-tuple (anchor, parts), where *anchor* is the
uppermost parent of the path (equivalent to path.parents[-1]), and
*parts* is a reversed list of parts following the anchor.
"""
split = self.parser.split
path = self.parser.strip_protocol(self._raw_path)
parent, name = split(path)
names = []
while path != parent:
names.append(name)
path = parent
parent, name = split(path)
return path, names

@property
def name(self):
"""The final path component, if any."""
remainder, stack = self._stack
return next(filter(None, (*stack, *[remainder])), "")

def with_name(self, name):
"""Return a new path with the file name changed."""
if self.parser.sep in name:
raise ValueError(f"Invalid name {name!r}")
return self.with_segments(self.parser.split(self._raw_path)[0], name)


class UPath(PathBase, PureUPath):
"""a concrete version of UPath with filesystem access"""
Expand Down Expand Up @@ -600,6 +629,41 @@ def iterdir(self) -> Generator[Self, None, None]:
# only want the path name with iterdir
yield self.with_segments(name)

def glob(
self,
pattern: str,
*,
case_sensitive: bool | None = None,
recurse_symlinks: bool = _UNSET,
) -> Generator[UPath, None, None]:
path_pattern = self.joinpath(pattern).path
sep = self.parser.sep
base = self.fs._strip_protocol(self.path)
for name in self.fs.glob(path_pattern):
name = name.removeprefix(base).removeprefix(sep)
yield self.joinpath(name)

def rglob(
self,
pattern: str,
*,
case_sensitive: bool | None = None,
recurse_symlinks: bool = _UNSET,
) -> Generator[UPath, None, None]:
path_pattern = self.joinpath(pattern).path
r_path_pattern = self.joinpath("**", pattern).path
sep = self.parser.sep
base = self.fs._strip_protocol(self.path)
seen = set()
for p in (path_pattern, r_path_pattern):
for name in self.fs.glob(p):
name = name.removeprefix(base).removeprefix(sep)
if name in seen:
continue
else:
seen.add(name)
yield self.joinpath(name)

def absolute(self) -> Self:
return self

Expand Down

0 comments on commit ee8f4f1

Please sign in to comment.