Viewing file: base.py (8.03 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import logging import os import re import zipfile from abc import ABCMeta, abstractmethod from configparser import ConfigParser from itertools import chain from pathlib import Path from tempfile import mkdtemp
from distlib.scripts import ScriptMaker, enquote_executable
from virtualenv.util.path import safe_delete
class PipInstall(metaclass=ABCMeta): def __init__(self, wheel, creator, image_folder): self._wheel = wheel self._creator = creator self._image_dir = image_folder self._extracted = False self.__dist_info = None self._console_entry_points = None
@abstractmethod def _sync(self, src, dst): raise NotImplementedError
def install(self, version_info): self._extracted = True self._uninstall_previous_version() # sync image for filename in self._image_dir.iterdir(): into = self._creator.purelib / filename.name self._sync(filename, into) # generate console executables consoles = set() script_dir = self._creator.script_dir for name, module in self._console_scripts.items(): consoles.update(self._create_console_entry_point(name, module, script_dir, version_info)) logging.debug("generated console scripts %s", " ".join(i.name for i in consoles))
def build_image(self): # 1. first extract the wheel logging.debug("build install image for %s to %s", self._wheel.name, self._image_dir) with zipfile.ZipFile(str(self._wheel)) as zip_ref: self._shorten_path_if_needed(zip_ref) zip_ref.extractall(str(self._image_dir)) self._extracted = True # 2. now add additional files not present in the distribution new_files = self._generate_new_files() # 3. finally fix the records file self._fix_records(new_files)
def _shorten_path_if_needed(self, zip_ref): if os.name == "nt": to_folder = str(self._image_dir) # https://docs.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation zip_max_len = max(len(i) for i in zip_ref.namelist()) path_len = zip_max_len + len(to_folder) if path_len > 260: self._image_dir.mkdir(exist_ok=True) # to get a short path must exist
from virtualenv.util.path import get_short_path_name
to_folder = get_short_path_name(to_folder) self._image_dir = Path(to_folder)
def _records_text(self, files): return "\n".join(f"{os.path.relpath(str(rec), str(self._image_dir))},," for rec in files)
def _generate_new_files(self): new_files = set() installer = self._dist_info / "INSTALLER" installer.write_text("pip\n", encoding="utf-8") new_files.add(installer) # inject a no-op root element, as workaround for bug in https://github.com/pypa/pip/issues/7226 marker = self._image_dir / f"{self._dist_info.stem}.virtualenv" marker.write_text("", encoding="utf-8") new_files.add(marker) folder = mkdtemp() try: to_folder = Path(folder) rel = os.path.relpath(str(self._creator.script_dir), str(self._creator.purelib)) version_info = self._creator.interpreter.version_info for name, module in self._console_scripts.items(): new_files.update( Path(os.path.normpath(str(self._image_dir / rel / i.name))) for i in self._create_console_entry_point(name, module, to_folder, version_info) ) finally: safe_delete(folder) return new_files
@property def _dist_info(self): if self._extracted is False: return None # pragma: no cover if self.__dist_info is None: files = [] for filename in self._image_dir.iterdir(): files.append(filename.name) if filename.suffix == ".dist-info": self.__dist_info = filename break else: raise RuntimeError(f"no .dist-info at {self._image_dir}, has {', '.join(files)}") # pragma: no cover return self.__dist_info
@abstractmethod def _fix_records(self, extra_record_data): raise NotImplementedError
@property def _console_scripts(self): if self._extracted is False: return None # pragma: no cover if self._console_entry_points is None: self._console_entry_points = {} entry_points = self._dist_info / "entry_points.txt" if entry_points.exists(): parser = ConfigParser() with entry_points.open(encoding="utf-8") as file_handler: parser.read_file(file_handler) if "console_scripts" in parser.sections(): for name, value in parser.items("console_scripts"): match = re.match(r"(.*?)-?\d\.?\d*", name) if match: name = match.groups(1)[0] self._console_entry_points[name] = value return self._console_entry_points
def _create_console_entry_point(self, name, value, to_folder, version_info): result = [] maker = ScriptMakerCustom(to_folder, version_info, self._creator.exe, name) specification = f"{name} = {value}" new_files = maker.make(specification) result.extend(Path(i) for i in new_files) return result
def _uninstall_previous_version(self): dist_name = self._dist_info.stem.split("-")[0] in_folders = chain.from_iterable([i.iterdir() for i in {self._creator.purelib, self._creator.platlib}]) paths = (p for p in in_folders if p.stem.split("-")[0] == dist_name and p.suffix == ".dist-info" and p.is_dir()) existing_dist = next(paths, None) if existing_dist is not None: self._uninstall_dist(existing_dist)
@staticmethod def _uninstall_dist(dist): dist_base = dist.parent logging.debug("uninstall existing distribution %s from %s", dist.stem, dist_base)
top_txt = dist / "top_level.txt" # add top level packages at folder level paths = ( {dist.parent / i.strip() for i in top_txt.read_text(encoding="utf-8").splitlines()} if top_txt.exists() else set() ) paths.add(dist) # add the dist-info folder itself
base_dirs, record = paths.copy(), dist / "RECORD" # collect entries in record that we did not register yet for name in ( (i.split(",")[0] for i in record.read_text(encoding="utf-8").splitlines()) if record.exists() else () ): path = dist_base / name if not any(p in base_dirs for p in path.parents): # only add if not already added as a base dir paths.add(path)
for path in sorted(paths): # actually remove stuff in a stable order if path.exists(): if path.is_dir() and not path.is_symlink(): safe_delete(path) else: path.unlink()
def clear(self): if self._image_dir.exists(): safe_delete(self._image_dir)
def has_image(self): return self._image_dir.exists() and next(self._image_dir.iterdir()) is not None
class ScriptMakerCustom(ScriptMaker): def __init__(self, target_dir, version_info, executable, name): super().__init__(None, str(target_dir)) self.clobber = True # overwrite self.set_mode = True # ensure they are executable self.executable = enquote_executable(str(executable)) self.version_info = version_info.major, version_info.minor self.variants = {"", "X", "X.Y"} self._name = name
def _write_script(self, names, shebang, script_bytes, filenames, ext): names.add(f"{self._name}{self.version_info[0]}.{self.version_info[1]}") super()._write_script(names, shebang, script_bytes, filenames, ext)
__all__ = [ "PipInstall", ]
|