Viewing file: progress_bars.py (4.33 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import functools import sys from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple, TypeVar
from pip._vendor.rich.progress import ( BarColumn, DownloadColumn, FileSizeColumn, MofNCompleteColumn, Progress, ProgressColumn, SpinnerColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, TransferSpeedColumn, )
from pip._internal.cli.spinners import RateLimiter from pip._internal.req.req_install import InstallRequirement from pip._internal.utils.logging import get_console, get_indentation
T = TypeVar("T") ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
def _rich_download_progress_bar( iterable: Iterable[bytes], *, bar_type: str, size: Optional[int], initial_progress: Optional[int] = None, ) -> Generator[bytes, None, None]: assert bar_type == "on", "This should only be used in the default mode."
if not size: total = float("inf") columns: Tuple[ProgressColumn, ...] = ( TextColumn("[progress.description]{task.description}"), SpinnerColumn("line", speed=1.5), FileSizeColumn(), TransferSpeedColumn(), TimeElapsedColumn(), ) else: total = size columns = ( TextColumn("[progress.description]{task.description}"), BarColumn(), DownloadColumn(), TransferSpeedColumn(), TextColumn("eta"), TimeRemainingColumn(), )
progress = Progress(*columns, refresh_per_second=5) task_id = progress.add_task(" " * (get_indentation() + 2), total=total) if initial_progress is not None: progress.update(task_id, advance=initial_progress) with progress: for chunk in iterable: yield chunk progress.update(task_id, advance=len(chunk))
def _rich_install_progress_bar( iterable: Iterable[InstallRequirement], *, total: int ) -> Iterator[InstallRequirement]: columns = ( TextColumn("{task.fields[indent]}"), BarColumn(), MofNCompleteColumn(), TextColumn("{task.description}"), ) console = get_console()
bar = Progress(*columns, refresh_per_second=6, console=console, transient=True) # Hiding the progress bar at initialization forces a refresh cycle to occur # until the bar appears, avoiding very short flashes. task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False) with bar: for req in iterable: bar.update(task, description=rf"\[{req.name}]", visible=True) yield req bar.advance(task)
def _raw_progress_bar( iterable: Iterable[bytes], *, size: Optional[int], initial_progress: Optional[int] = None, ) -> Generator[bytes, None, None]: def write_progress(current: int, total: int) -> None: sys.stdout.write(f"Progress {current} of {total}\n") sys.stdout.flush()
current = initial_progress or 0 total = size or 0 rate_limiter = RateLimiter(0.25)
write_progress(current, total) for chunk in iterable: current += len(chunk) if rate_limiter.ready() or current == total: write_progress(current, total) rate_limiter.reset() yield chunk
def get_download_progress_renderer( *, bar_type: str, size: Optional[int] = None, initial_progress: Optional[int] = None ) -> ProgressRenderer[bytes]: """Get an object that can be used to render the download progress.
Returns a callable, that takes an iterable to "wrap". """ if bar_type == "on": return functools.partial( _rich_download_progress_bar, bar_type=bar_type, size=size, initial_progress=initial_progress, ) elif bar_type == "raw": return functools.partial( _raw_progress_bar, size=size, initial_progress=initial_progress, ) else: return iter # no-op, when passed an iterator
def get_install_progress_renderer( *, bar_type: str, total: int ) -> ProgressRenderer[InstallRequirement]: """Get an object that can be used to render the install progress. Returns a callable, that takes an iterable to "wrap". """ if bar_type == "on": return functools.partial(_rich_install_progress_bar, total=total) else: return iter
|