diff --git a/pydantic_pkgr/__init__.py b/pydantic_pkgr/__init__.py index 4a67897..d9ccfb7 100644 --- a/pydantic_pkgr/__init__.py +++ b/pydantic_pkgr/__init__.py @@ -2,6 +2,8 @@ SemVer, ) from .binprovider import ( + PATHStr, + BinDirPath, BinName, BinProviderName, ProviderLookupDict, @@ -10,6 +12,7 @@ AptProvider, BrewProvider, PipProvider, + NpmProvider, InstallStr, ShallowBinary, bin_version, diff --git a/pydantic_pkgr/binary.py b/pydantic_pkgr/binary.py index 5c4c4d3..49c81d6 100644 --- a/pydantic_pkgr/binary.py +++ b/pydantic_pkgr/binary.py @@ -6,7 +6,7 @@ from pathlib import Path -from typing import Any, Optional, Dict, List +from typing import Any, Optional, Dict, List, Iterable from typing_extensions import Self from subprocess import run, PIPE, CompletedProcess @@ -19,6 +19,7 @@ from .binprovider import ( BinName, BinProviderName, + BinDirPath, HostBinPath, ShallowBinary, BinProvider, @@ -29,6 +30,7 @@ ProviderLookupDict, bin_name, bin_abspath, + bin_abspaths, path_is_script, path_is_executable, ) @@ -91,47 +93,32 @@ def serialize_overrides(self, provider_overrides: Dict[BinProviderName, Provider for provider_name, overrides in provider_overrides.items() } - @computed_field # type: ignore[misc] # see mypy issue #1362 + @computed_field @property - def bin_filename(self) -> BinName: - if self.is_script: - # e.g. '.../Python.framework/Versions/3.11/lib/python3.11/sqlite3/__init__.py' -> sqlite - name = self.name - elif self.loaded_abspath: - # e.g. '/opt/homebrew/bin/wget' -> wget - name = bin_name(self.loaded_abspath) - else: - # e.g. 'ytdlp' -> 'yt-dlp' - name = bin_name(self.name) - return name - - @computed_field # type: ignore[misc] # see mypy issue #1362 - @property - def is_executable(self) -> bool: - try: - assert self.loaded_abspath and path_is_executable(self.loaded_abspath) - return True - except (ValidationError, AssertionError): - return False - - @computed_field # type: ignore[misc] # see mypy issue #1362 - @property - def is_script(self) -> bool: - try: - assert self.loaded_abspath and path_is_script(self.loaded_abspath) - return True - except (ValidationError, AssertionError): - return False - - @computed_field # type: ignore[misc] # see mypy issue #1362 + def loaded_abspaths(self) -> Dict[BinProviderName, List[HostBinPath]]: + assert self.loaded_abspath, 'Binary must be loaded before getting abspath list' + all_bin_abspaths = {self.loaded_provider: [self.loaded_abspath]} if self.loaded_provider else {} + for provider in self.providers_supported: + if not provider.PATH: + # print('skipping provider', provider.name, provider.PATH) + continue + for bin_abspath in bin_abspaths(self.name, PATH=provider.PATH): + existing = all_bin_abspaths.get(provider.name, []) + if bin_abspath not in existing: + all_bin_abspaths[provider.name] = [ + *existing, + bin_abspath, + ] + return all_bin_abspaths + + + @computed_field @property - def is_valid(self) -> bool: - return bool( - self.name - and self.loaded_abspath - and self.loaded_version - and (self.is_executable or self.is_script) - ) + def loaded_bin_dirs(self) -> Dict[BinProviderName, BinDirPath]: + return { + provider_name: ':'.join([str(bin_abspath.parent) for bin_abspath in bin_abspaths]) + for provider_name, bin_abspaths in self.loaded_abspaths.items() + } @validate_call def install(self) -> Self: @@ -199,14 +186,6 @@ def load_or_install(self, cache=True) -> Self: inner_exc = err raise outer_exc from inner_exc - @validate_call - def exec(self, args=(), pwd='.') -> CompletedProcess: - assert self.loaded_abspath - assert self.loaded_version - return run([self.loaded_abspath, *args], stdout=PIPE, stderr=PIPE, pwd=pwd) - - - class SystemPythonHelpers: @staticmethod diff --git a/pydantic_pkgr/binprovider.py b/pydantic_pkgr/binprovider.py index 102f301..630f6c3 100644 --- a/pydantic_pkgr/binprovider.py +++ b/pydantic_pkgr/binprovider.py @@ -2,15 +2,17 @@ import sys import shutil import operator +import site +import sysconfig -from typing import Callable, Any, Optional, Type, List, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING +from typing import Callable, Iterable, Any, Optional, Type, List, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING from typing_extensions import Self from collections import namedtuple from pathlib import Path from subprocess import run, PIPE, CompletedProcess from pydantic_core import core_schema, ValidationError -from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, validate_call, GetCoreSchemaHandler, ConfigDict, computed_field +from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, BeforeValidator, validate_call, GetCoreSchemaHandler, ConfigDict, computed_field, field_validator def validate_bin_provider_name(name: str) -> str: @@ -26,6 +28,20 @@ def validate_bin_provider_name(name: str) -> str: from .semver import SemVer +def validate_bin_dir(path: Path) -> Path: + path = path.expanduser().absolute() + assert path.resolve() + assert path.is_dir(), f'path entries to add to $PATH must be absolute paths to directories {dir}' + return path + +BinDirPath = Annotated[Path, AfterValidator(validate_bin_dir)] + +def validate_PATH(PATH: str | List[str]) -> str: + paths = PATH.split(':') if isinstance(PATH, str) else list(PATH) + assert all(Path(bin_dir) for bin_dir in paths) + return ':'.join(paths) + +PATHStr = Annotated[str, BeforeValidator(validate_PATH)] def func_takes_args_or_kwargs(lambda_func: Callable[..., Any]) -> bool: """returns True if a lambda func takes args/kwargs of any kind, otherwise false if it's pure/argless""" @@ -79,16 +95,40 @@ def path_is_abspath(path: Path) -> Path: # not all bins need to be executable to be bins, some are scripts @validate_call -def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None: +def bin_abspath(bin_path_or_name: BinName | Path, PATH: PATHStr | None=None) -> HostBinPath | None: assert bin_path_or_name + if PATH is None: + PATH = os.environ.get('PATH', '/bin') + if PATH: + PATH = str(PATH) + else: + return None if str(bin_path_or_name).startswith('/'): # already a path, get its absolute form abspath = Path(bin_path_or_name).expanduser().absolute() else: # not a path yet, get path using shutil.which - binpath = shutil.which(bin_path_or_name) + binpath = shutil.which(bin_path_or_name, mode=os.X_OK, path=PATH) + # print(bin_path_or_name, PATH.split(':'), binpath, 'GOPINGNGN') if not binpath: + # some bins dont show up with shutil.which (e.g. django-admin.py) + for path in PATH.split(':'): + bin_dir = Path(path) + # print('BIN_DIR', bin_dir, bin_dir.is_dir()) + if not bin_dir.is_dir(): + # raise Exception(f'Found invalid dir in $PATH: {bin_dir}') + continue + bin_file = bin_dir / bin_path_or_name + # print(bin_file, path, bin_file.exists(), bin_file.is_file(), bin_file.is_symlink()) + if bin_file.exists(): + return bin_file + + return None + # print(binpath, PATH) + if str(Path(binpath).parent) not in PATH: + # print('WARNING, found bin but not in PATH', binpath, PATH) + # found bin but it was outside our search $PATH return None abspath = Path(binpath).expanduser().absolute() @@ -97,6 +137,28 @@ def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None: except ValidationError: return None +@validate_call +def bin_abspaths(bin_path_or_name: BinName | Path, PATH: PATHStr | None=None) -> List[HostBinPath]: + assert bin_path_or_name + + PATH = PATH or os.environ.get('PATH', '/bin') + abspaths = [] + + if str(bin_path_or_name).startswith('/'): + # already a path, get its absolute form + abspaths.append(Path(bin_path_or_name).expanduser().absolute()) + else: + # not a path yet, get path using shutil.which + for path in PATH.split(':'): + binpath = shutil.which(bin_path_or_name, mode=os.X_OK, path=path) + if binpath and str(Path(binpath).parent) in PATH: + abspaths.append(binpath) + + try: + return TypeAdapter(List[HostBinPath]).validate_python(abspaths) + except ValidationError: + return [] + @validate_call def bin_version(bin_path: HostBinPath, args=('--version',)) -> SemVer | None: @@ -110,7 +172,6 @@ class ShallowBinary(BaseModel): """ model_config = ConfigDict(extra='ignore', populate_by_name=True, validate_defaults=True) - name: BinName = '' providers_supported: List['BinProvider'] = Field(default=[], alias='providers') @@ -119,6 +180,20 @@ class ShallowBinary(BaseModel): loaded_abspath: HostBinPath = Field(alias='abspath') loaded_version: SemVer = Field(alias='version') + @computed_field # type: ignore[misc] # see mypy issue #1362 + @property + def bin_filename(self) -> BinName: + if self.is_script: + # e.g. '.../Python.framework/Versions/3.11/lib/python3.11/sqlite3/__init__.py' -> sqlite + name = self.name + elif self.loaded_abspath: + # e.g. '/opt/homebrew/bin/wget' -> wget + name = bin_name(self.loaded_abspath) + else: + # e.g. 'ytdlp' -> 'yt-dlp' + name = bin_name(self.name) + return name + @computed_field # type: ignore[misc] # see mypy issue #1362 @property def is_executable(self) -> bool: @@ -147,13 +222,27 @@ def is_valid(self) -> bool: and (self.is_executable or self.is_script) ) + @computed_field + @property + def bin_dir(self) -> BinDirPath | None: + if not self.loaded_abspath: + return None + return TypeAdapter(BinDirPath).validate_python(self.loaded_abspath.parent) + @computed_field @property def loaded_respath(self) -> HostBinPath | None: return self.loaded_abspath and self.loaded_abspath.resolve() - def exec(self, args) -> CompletedProcess: - return run([self.abspath, *args], stdout=PIPE, stderr=PIPE, text=True) + @validate_call + def exec(self, bin_name: BinName | HostBinPath=None, cmd: Iterable[str | Path | int | float | bool]=(), cwd: str | Path='.', **kwargs) -> CompletedProcess: + bin_name = str(bin_name or self.loaded_abspath or self.name) + if bin_name == self.name: + assert self.loaded_abspath, 'Binary must have a loaded_abspath, make sure to load_or_install() first' + assert self.loaded_version, 'Binary must have a loaded_version, make sure to load_or_install() first' + assert Path(cwd).is_dir(), f'cwd must be a valid directory: {cwd}' + cmd = [str(bin_name), *(str(arg) for arg in cmd)] + return run(cmd, stdout=PIPE, stderr=PIPE, text=True, cwd=str(cwd), **kwargs) def is_valid_install_string(pkgs_str: str) -> str: @@ -191,6 +280,9 @@ class BinProvider(BaseModel): model_config = ConfigDict(extra='ignore', populate_by_name=True, validate_defaults=True) name: BinProviderName = '' + + PATH: PATHStr = Field(default='') # e.g. '/opt/homebrew/bin:/opt/archivebox/bin' + BIN: BinName = 'env' abspath_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_abspath'}, exclude=True) version_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_version'}, exclude=True) @@ -220,6 +312,23 @@ class BinProvider(BaseModel): # in_qemu=os.environ.get('IN_QEMU', '').lower() == 'true', # ) + @validate_call + def exec(self, bin_name: BinName | HostBinPath, cmd: Iterable[str | Path | int | float | bool]=(), cwd: Path | str='.', **kwargs) -> CompletedProcess: + bin_name = self.get_abspath(bin_name) if isinstance(bin_name, str) else bin_name + assert bin_name, f'Binary must have a reachable path, make sure to load_or_install() first: {bin_name}' + assert Path(cwd).is_dir(), f'cwd must be a valid directory: {cwd}' + cmd = [str(bin_name), *(str(arg) for arg in cmd)] + return run(cmd, stdout=PIPE, stderr=PIPE, text=True, cwd=str(cwd), **kwargs) + + @field_validator('PATH', mode='after') + @classmethod + def load_PATH(cls, PATH: PATHStr) -> PATHStr: + python_bin_dir = str(Path(sys.executable).parent) + + if python_bin_dir not in PATH: + PATH = ':'.join([python_bin_dir, *PATH.split(':')]) + return TypeAdapter(PATHStr).validate_pythong(PATH) + def get_default_providers(self): return self.get_providers_for_bin('*') @@ -308,13 +417,19 @@ def call_provider_for_action(self, bin_name: BinName, provider_type: ProviderTyp provider_func = cast(Callable[..., Any], provider_func) return provider_func(bin_name, **kwargs) + def setup_PATH(self): + for path in reversed(self.PATH.split(':')): + if path not in sys.path: + sys.path.insert(0, path) # e.g. /opt/archivebox/bin:/bin:/usr/local/bin:... - - def on_get_abspath(self, bin_name: BinName, **context) -> HostBinPath | None: + def on_get_abspath(self, bin_name: BinName | HostBinPath, **context) -> HostBinPath | None: # print(f'[*] {self.__class__.__name__}: Getting abspath for {bin_name}...') + if not self.PATH: + return None try: - return bin_abspath(bin_name) + return bin_abspath(bin_name, PATH=self.PATH) except ValidationError: + # raise return None def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **context) -> SemVer | None: @@ -322,9 +437,11 @@ def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, if not abspath: return None # print(f'[*] {self.__class__.__name__}: Getting version for {bin_name}...') + version_stdout_str = self.exec(bin_name=abspath, cmd=['--version']).stdout.strip() try: - return bin_version(abspath) + return SemVer.parse(version_stdout_str) except ValidationError: + raise return None def on_get_subdeps(self, bin_name: BinName, **context) -> InstallStr: @@ -335,13 +452,21 @@ def on_get_subdeps(self, bin_name: BinName, **context) -> InstallStr: def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **context): subdeps = subdeps or self.get_subdeps(bin_name) + if not shutil.which(self.BIN): + raise Exception(f'{self.__class__.__name__}.BIN is not avaialable on this host: {self.BIN}') + print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})') # ... install logic here assert True + @validate_call + def get_abspaths(self, bin_name: BinName) -> List[HostBinPath]: + return bin_abspaths(bin_name, PATH=self.PATH) + @validate_call def get_abspath(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> HostBinPath | None: + self.setup_PATH() abspath = self.call_provider_for_action( bin_name=bin_name, provider_type='abspath', @@ -365,7 +490,7 @@ def get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, ov ) if not version: return None - result = SemVer(version) + result = SemVer.parse(version) self._version_cache[bin_name] = result return result @@ -385,7 +510,7 @@ def get_subdeps(self, bin_name: BinName, overrides: Optional[ProviderLookupDict] @validate_call def install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> ShallowBinary | None: subdeps = self.get_subdeps(bin_name, overrides=overrides) - + self.setup_PATH() self.call_provider_for_action( bin_name=bin_name, provider_type='install', @@ -441,56 +566,112 @@ def load(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, @validate_call def load_or_install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=True) -> ShallowBinary | None: - installed = self.load(bin_name, overrides=overrides, cache=cache) + installed = self.load(bin_name=bin_name, overrides=overrides, cache=cache) if not installed: - installed = self.install(bin_name, overrides=overrides) + installed = self.install(bin_name=bin_name, overrides=overrides) return installed class PipProvider(BinProvider): name: BinProviderName = 'pip' + BIN: BinName = 'pip' + + @field_validator('PATH', mode='after') + @classmethod + def load_PATH(cls, PATH: PATHStr) -> PATHStr: + paths = { + str(Path(site.getsitepackages()).parent.parent.parent / 'bin'), # /opt/homebrew/opt/python@3.11/Frameworks/Python.framework/Versions/3.11/bin + str(Path(site.getusersitepackages()).parent.parent.parent / 'bin'), # /Users/squash/Library/Python/3.9/bin + sysconfig.get_path('scripts'), # /opt/homebrew/bin + } + for bin_dir in paths: + if bin_dir not in PATH: + PATH = ':'.join([bin_dir, *PATH.split(':')]) + return TypeAdapter(PATHStr).validate_pythong(PATH) def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **context): subdeps = subdeps or self.on_get_subdeps(bin_name) + if not shutil.which(self.BIN): + raise Exception(f'{self.__class__.__name__}.BIN is not avaialable on this host: {self.BIN}') + print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})') - proc = run(['pip', 'install', '--upgrade', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE) + proc = self.exec(bin_name=self.BIN, cmd=['install', '--upgrade', *subdeps.split(' ')]) if proc.returncode != 0: - print(proc.stdout.strip().decode()) - print(proc.stderr.strip().decode()) + print(proc.stdout.strip()) + print(proc.stderr.strip()) raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}') class NpmProvider(BinProvider): name: BinProviderName = 'npm' + BIN: BinName = 'npm' + + @field_validator('PATH', mode='after') + @classmethod + def load_PATH(cls, PATH: PATHStr) -> PATHStr: + npm_global_dir = self.exec(bin_name=self.BIN, cmd=['prefix', '-g']).stdout.strip() + '/bin' # /opt/homebrew/bin + npm_bin_dirs = {npm_global_dir} + + search_dir = Path(self.exec(bin_name=self.BIN, cmd=['prefix']).stdout.strip()) + stop_if_reached = [str(Path('/')), str(Path('~').expanduser().absolute())] + num_hops, max_hops = 0, 6 + while num_hops < max_hops and str(search_dir) not in stop_if_reached: + try: + npm_bin_dirs.add(search_dir.glob('node_modules/.bin')[0]) + break + except (IndexError, OSError, Exception): + pass + search_dir = search_dir.parent + num_hops += 1 + + for bin_dir in npm_bin_dirs: + if bin_dir not in PATH: + PATH = ':'.join([bin_dir, *PATH.split(':')]) + return TypeAdapter(PATHStr).validate_pythong(PATH) def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **context): subdeps = subdeps or self.on_get_subdeps(bin_name) + if not shutil.which(self.BIN): + raise Exception(f'{self.__class__.__name__}.BIN is not avaialable on this host: {self.BIN}') + print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})') - - proc = run(['npm', 'install', '-g', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE) + proc = self.exec(bin_name=self.BIN, cmd=['install', '-g', *subdeps.split(' ')]) if proc.returncode != 0: - print(proc.stdout.strip().decode()) - print(proc.stderr.strip().decode()) + print(proc.stdout.strip()) + print(proc.stderr.strip()) raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}') class AptProvider(BinProvider): name: BinProviderName = 'apt' + BIN: BinName = 'apt' subdeps_provider: ProviderLookupDict = { **BinProvider.__fields__['subdeps_provider'].default, 'yt-dlp': lambda: 'yt-dlp ffmpeg', } + @field_validator('PATH', mode='after') + @classmethod + def load_PATH(cls, PATH: PATHStr) -> PATHStr: + dpkg_install_dirs = self.exec(bin_name='dpkg', cmd=['-L', 'bash']).stdout.strip().split('\n') + dpkg_bin_dirs = [path for path in dpkg_install_dirs if path.endswith('/bin')] + for bin_dir in dpkg_bin_dirs: + if bin_dir not in PATH: + PATH = ':'.join([bin_dir, *PATH.split(':')]) + return TypeAdapter(PATHStr).validate_pythong(PATH) + + def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **context): subdeps = subdeps or self.on_get_subdeps(bin_name) + if not (shutil.which(self.BIN) and shutil.which('dpkg')): + raise Exception(f'{self.__class__.__name__}.BIN is not avaialable on this host: {self.BIN}') + print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})') - try: - # if pyinfra is installed, use it - + # if pyinfra is installed, use it from pyinfra.operations import apt apt.update( @@ -505,32 +686,54 @@ def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **co update=True, _sudo=True, ) - except ImportError: - run(['apt-get', 'update', '-qq']) - proc = run(['apt-get', 'install', '-y', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE) + except (ImportError, ModuleNotFoundError): + self.exec(bin_name='apt-get', cmd=['update', '-qq']) + proc = self.exec(bin_name='apt-get', cmd=['install', '-y', *subdeps.split(' ')]) if proc.returncode != 0: - print(proc.stdout.strip().decode()) - print(proc.stderr.strip().decode()) + print(proc.stdout.strip()) + print(proc.stderr.strip()) raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}') class BrewProvider(BinProvider): name: BinProviderName = 'brew' + BIN: BinName = 'brew' + PATH: PATHStr = '/opt/homebrew/bin:/usr/local/bin' + + @field_validator('PATH', mode='after') + @classmethod + def load_PATH(cls, PATH: PATHStr) -> PATHStr: + brew_bin_dir = self.exec(bin_name=self.BIN, cmd=['--prefix']).stdout.strip() + '/bin' + if brew_bin_dir not in PATH: + PATH = ':'.join([brew_bin_dir, *PATH.split(':')]) + return TypeAdapter(PATHStr).validate_pythong(PATH) def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **context): subdeps = subdeps or self.on_get_subdeps(bin_name) - print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})') - proc = run(['brew', 'install', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE) + if not shutil.which(self.BIN): + raise Exception(f'{self.__class__.__name__}.BIN is not avaialable on this host: {self.BIN}') + + print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})') + proc = self.exec(bin_name=self.BIN, cmd=['install', *subdeps.split(' ')]) if proc.returncode != 0: - print(proc.stdout.strip().decode()) - print(proc.stderr.strip().decode()) + print(proc.stdout.strip()) + print(proc.stderr.strip()) raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}') +DEFAULT_ENV_PATH = os.environ.get('PATH', '/bin') +PYTHON_BIN_DIR = str(Path(sys.executable).parent) + +if PYTHON_BIN_DIR not in DEFAULT_ENV_PATH: + DEFAULT_ENV_PATH = PYTHON_BIN_DIR + ':' + DEFAULT_ENV_PATH + + class EnvProvider(BinProvider): name: BinProviderName = 'env' + BIN: BinName = 'env' + PATH: PATHStr = Field(default=DEFAULT_ENV_PATH) # add dir containing python to $PATH abspath_provider: ProviderLookupDict = { **BinProvider.__fields__['abspath_provider'].default, diff --git a/tests.py b/tests.py index 020de36..0723aec 100644 --- a/tests.py +++ b/tests.py @@ -4,7 +4,10 @@ from pathlib import Path -from pydantic_pkgr import BinProvider, EnvProvider, Binary, SemVer, ProviderLookupDict, bin_version +from pydantic_pkgr import ( + BinProvider, EnvProvider, Binary, SemVer, ProviderLookupDict, bin_version, + PipProvider, NpmProvider, AptProvider, BrewProvider, EnvProvider, +) class TestSemVer(unittest.TestCase): @@ -155,5 +158,75 @@ def test_python_bin(self): self.assertTrue(bool(str(python_bin))) # easy way to make sure serializing doesnt throw an error +def flatten(xss): + return [x for xs in xss for x in xs] + +class InstallTest(unittest.TestCase): + + def install_with_provider(self, provider, binary): + + binary_bin = binary.load_or_install() + provider_bin = provider.load_or_install(bin_name=binary.name) + # print(binary_bin, binary_bin.bin_dir, binary_bin.loaded_abspath) + # print('\n'.join(f'{provider}={path}' for provider, path in binary.loaded_abspaths.items()), '\n') + # print() + + self.assertEqual(binary_bin.loaded_provider, provider_bin.loaded_provider) + self.assertEqual(binary_bin.loaded_abspath, provider_bin.loaded_abspath) + self.assertEqual(binary_bin.loaded_version, provider_bin.loaded_version) + + + self.assertIn(binary_bin.loaded_abspath, flatten(binary_bin.loaded_abspaths.values())) + self.assertIn(str(binary_bin.bin_dir), flatten(PATH.split(':') for PATH in binary_bin.loaded_bin_dirs.values())) + + self.assertEqual(binary_bin.loaded_version, SemVer('{}.{}.{}'.format(*sys.version_info[:3]))) + self.assertIn(binary_bin.loaded_abspath, provider.get_abspaths(binary_bin.name)) + # self.assertIn(binary_bin.loaded_respath, provider.get_abspaths(binary_bin.name)) + self.assertTrue(binary_bin.is_valid) + self.assertTrue(binary_bin.is_executable) + self.assertFalse(binary_bin.is_script) + self.assertTrue(bool(str(binary_bin))) # easy way to make sure serializing doesnt throw an error + # print(provider.PATH) + # print() + # print() + # print(binary.bin_filename, binary.bin_dir, binary.loaded_abspaths) + # print() + # print() + # print(provider.name, 'PATH=', provider.PATH, 'ABSPATHS=', provider.get_abspaths(bin_name=binary_bin.name)) + + def test_env_provider(self): + provider = EnvProvider() + binary = Binary(name='python3.10', providers=[provider]).load() + self.install_with_provider(provider, binary) + + def test_pip_provider(self): + provider = PipProvider() + # print(provider.PATH) + binary = Binary(name='python3.10', providers=[provider]) + self.install_with_provider(provider, binary) + + def test_npm_provider(self): + provider = NpmProvider() + # print(provider.PATH) + binary = Binary(name='python3.10', providers=[provider]) + self.install_with_provider(provider, binary) + + def test_brew_provider(self): + provider = BrewProvider() + # print(provider.PATH) + binary = Binary(name='python3.10', providers=[provider]) + self.install_with_provider(provider, binary) + + def test_apt_provider(self): + provider = AptProvider() + # print(provider.PATH) + binary = Binary(name='python3.10', providers=[provider]) + try: + result = self.install_with_provider(provider, binary) + self.assertFalse(bool(result)) + except Exception: + pass + + if __name__ == '__main__': unittest.main()