Viewing file: __init__.py (8.14 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""User sessions for aiohttp.web."""
import abc
import json import time
from collections.abc import MutableMapping
from aiohttp import web
__version__ = '2.9.0'
class Session(MutableMapping):
"""Session dict-like object."""
def __init__(self, identity, *, data, new, max_age=None): self._changed = False self._mapping = {} self._identity = identity if data != {} else None self._new = new self._new = new if data != {} else True self._max_age = max_age created = data.get('created', None) if data else None session_data = data.get('session', None) if data else None now = int(time.time()) age = now - created if created else now if max_age is not None and age > max_age: session_data = None if self._new or created is None: self._created = now else: self._created = created
if session_data is not None: self._mapping.update(session_data)
def __repr__(self): return '<{} [new:{}, changed:{}, created:{}] {!r}>'.format( self.__class__.__name__, self.new, self._changed, self.created, self._mapping)
@property def new(self): return self._new
@property def identity(self): return self._identity
@property def created(self): return self._created
@property def empty(self): return not bool(self._mapping)
@property def max_age(self): return self._max_age
@max_age.setter def max_age(self, value): self._max_age = value
def changed(self): self._changed = True
def invalidate(self): self._changed = True self._mapping = {}
def set_new_identity(self, identity): if not self._new: raise RuntimeError( "Can't change identity for a session which is not new")
self._identity = identity
def __len__(self): return len(self._mapping)
def __iter__(self): return iter(self._mapping)
def __contains__(self, key): return key in self._mapping
def __getitem__(self, key): return self._mapping[key]
def __setitem__(self, key, value): self._mapping[key] = value self._changed = True
def __delitem__(self, key): del self._mapping[key] self._changed = True
SESSION_KEY = 'aiohttp_session' STORAGE_KEY = 'aiohttp_session_storage'
async def get_session(request): session = request.get(SESSION_KEY) if session is None: storage = request.get(STORAGE_KEY) if storage is None: raise RuntimeError( "Install aiohttp_session middleware " "in your aiohttp.web.Application") else: session = await storage.load_session(request) if not isinstance(session, Session): raise RuntimeError( "Installed {!r} storage should return session instance " "on .load_session() call, got {!r}.".format(storage, session)) request[SESSION_KEY] = session return session
async def new_session(request): storage = request.get(STORAGE_KEY) if storage is None: raise RuntimeError( "Install aiohttp_session middleware " "in your aiohttp.web.Application") else: session = await storage.new_session() if not isinstance(session, Session): raise RuntimeError( "Installed {!r} storage should return session instance " "on .load_session() call, got {!r}.".format(storage, session)) request[SESSION_KEY] = session return session
def session_middleware(storage):
if not isinstance(storage, AbstractStorage): raise RuntimeError("Expected AbstractStorage got {}".format(storage))
@web.middleware async def factory(request, handler): request[STORAGE_KEY] = storage raise_response = False try: response = await handler(request) except web.HTTPException as exc: response = exc raise_response = True if not isinstance(response, web.StreamResponse): raise RuntimeError( "Expect response, not {!r}".format(type(response))) if not isinstance(response, web.Response): # likely got websocket or streaming return response if response.prepared: raise RuntimeError( "Cannot save session data into prepared response") session = request.get(SESSION_KEY) if session is not None: if session._changed: await storage.save_session(request, response, session) if raise_response: raise response return response
return factory
def setup(app, storage): """Setup the library in aiohttp fashion."""
app.middlewares.append(session_middleware(storage))
class AbstractStorage(metaclass=abc.ABCMeta):
def __init__(self, *, cookie_name="AIOHTTP_SESSION", domain=None, max_age=None, path='/', secure=None, httponly=True, encoder=json.dumps, decoder=json.loads): self._cookie_name = cookie_name self._cookie_params = dict(domain=domain, max_age=max_age, path=path, secure=secure, httponly=httponly) self._max_age = max_age self._encoder = encoder self._decoder = decoder
@property def cookie_name(self): return self._cookie_name
@property def max_age(self): return self._max_age
@property def cookie_params(self): return self._cookie_params
def _get_session_data(self, session): if not session.empty: data = { 'created': session.created, 'session': session._mapping } else: data = {} return data
async def new_session(self): return Session(None, data=None, new=True, max_age=self.max_age)
@abc.abstractmethod async def load_session(self, request): pass
@abc.abstractmethod async def save_session(self, request, response, session): pass
def load_cookie(self, request): cookie = request.cookies.get(self._cookie_name) return cookie
def save_cookie(self, response, cookie_data, *, max_age=None): params = dict(self._cookie_params) if max_age is not None: params['max_age'] = max_age params['expires'] = time.strftime( "%a, %d-%b-%Y %T GMT", time.gmtime(time.time() + max_age)) if not cookie_data: response.del_cookie( self._cookie_name, domain=params["domain"], path=params["path"], ) else: response.set_cookie(self._cookie_name, cookie_data, **params)
class SimpleCookieStorage(AbstractStorage): """Simple JSON storage.
Doesn't any encryption/validation, use it for tests only"""
def __init__(self, *, cookie_name="AIOHTTP_SESSION", domain=None, max_age=None, path='/', secure=None, httponly=True, encoder=json.dumps, decoder=json.loads): super().__init__(cookie_name=cookie_name, domain=domain, max_age=max_age, path=path, secure=secure, httponly=httponly, encoder=encoder, decoder=decoder)
async def load_session(self, request): cookie = self.load_cookie(request) if cookie is None: return Session(None, data=None, new=True, max_age=self.max_age) else: data = self._decoder(cookie) return Session(None, data=data, new=False, max_age=self.max_age)
async def save_session(self, request, response, session): cookie_data = self._encoder(self._get_session_data(session)) self.save_cookie(response, cookie_data, max_age=session.max_age)
|