Viewing file: backports.py (2.74 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import sys
if sys.version_info[:2] < (3, 8):
import asyncio, functools from asyncio.coroutines import _is_coroutine from inspect import ismethod, isfunction, CO_COROUTINE from unittest import TestCase
def _unwrap_partial(func): while isinstance(func, functools.partial): func = func.func return func
def _has_code_flag(f, flag): """Return true if ``f`` is a function (or a method or functools.partial wrapper wrapping a function) whose code object has the given ``flag`` set in its flags.""" while ismethod(f): f = f.__func__ f = _unwrap_partial(f) if not isfunction(f): return False return bool(f.__code__.co_flags & flag)
def iscoroutinefunction(obj): """Return true if the object is a coroutine function.
Coroutine functions are defined with "async def" syntax. """ return ( _has_code_flag(obj, CO_COROUTINE) or getattr(obj, '_is_coroutine', None) is _is_coroutine )
class IsolatedAsyncioTestCase(TestCase):
def __init__(self, methodName='runTest'): super().__init__(methodName) self._asyncioTestLoop = None self._asyncioCallsQueue = None
async def _asyncioLoopRunner(self, fut): self._asyncioCallsQueue = queue = asyncio.Queue() fut.set_result(None) while True: query = await queue.get() queue.task_done() assert query is None
def _setupAsyncioLoop(self): assert self._asyncioTestLoop is None loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.set_debug(True) self._asyncioTestLoop = loop fut = loop.create_future() self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut)) loop.run_until_complete(fut)
def _tearDownAsyncioLoop(self): assert self._asyncioTestLoop is not None loop = self._asyncioTestLoop self._asyncioTestLoop = None self._asyncioCallsQueue.put_nowait(None) loop.run_until_complete(self._asyncioCallsQueue.join())
try: # shutdown asyncgens loop.run_until_complete(loop.shutdown_asyncgens()) finally: asyncio.set_event_loop(None) loop.close()
def run(self, result=None): self._setupAsyncioLoop() try: return super().run(result) finally: self._tearDownAsyncioLoop()
else:
from asyncio import iscoroutinefunction from unittest import IsolatedAsyncioTestCase
|