Viewing file: kv.py (5.48 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import operator
from peewee import * from peewee import sqlite3 from peewee import Expression from playhouse.fields import PickleField try: from playhouse.sqlite_ext import CSqliteExtDatabase as SqliteExtDatabase except ImportError: from playhouse.sqlite_ext import SqliteExtDatabase
Sentinel = type('Sentinel', (object,), {})
class KeyValue(object): """ Persistent dictionary.
:param Field key_field: field to use for key. Defaults to CharField. :param Field value_field: field to use for value. Defaults to PickleField. :param bool ordered: data should be returned in key-sorted order. :param Database database: database where key/value data is stored. :param str table_name: table name for data. """ def __init__(self, key_field=None, value_field=None, ordered=False, database=None, table_name='keyvalue'): if key_field is None: key_field = CharField(max_length=255, primary_key=True) if not key_field.primary_key: raise ValueError('key_field must have primary_key=True.')
if value_field is None: value_field = PickleField()
self._key_field = key_field self._value_field = value_field self._ordered = ordered self._database = database or SqliteExtDatabase(':memory:') self._table_name = table_name support_on_conflict = (isinstance(self._database, PostgresqlDatabase) or (isinstance(self._database, SqliteDatabase) and self._database.server_version >= (3, 24))) if support_on_conflict: self.upsert = self._postgres_upsert self.update = self._postgres_update else: self.upsert = self._upsert self.update = self._update
self.model = self.create_model() self.key = self.model.key self.value = self.model.value
# Ensure table exists. self.model.create_table()
def create_model(self): class KeyValue(Model): key = self._key_field value = self._value_field class Meta: database = self._database table_name = self._table_name return KeyValue
def query(self, *select): query = self.model.select(*select).tuples() if self._ordered: query = query.order_by(self.key) return query
def convert_expression(self, expr): if not isinstance(expr, Expression): return (self.key == expr), True return expr, False
def __contains__(self, key): expr, _ = self.convert_expression(key) return self.model.select().where(expr).exists()
def __len__(self): return len(self.model)
def __getitem__(self, expr): converted, is_single = self.convert_expression(expr) query = self.query(self.value).where(converted) item_getter = operator.itemgetter(0) result = [item_getter(row) for row in query] if len(result) == 0 and is_single: raise KeyError(expr) elif is_single: return result[0] return result
def _upsert(self, key, value): (self.model .insert(key=key, value=value) .on_conflict('replace') .execute())
def _postgres_upsert(self, key, value): (self.model .insert(key=key, value=value) .on_conflict(conflict_target=[self.key], preserve=[self.value]) .execute())
def __setitem__(self, expr, value): if isinstance(expr, Expression): self.model.update(value=value).where(expr).execute() else: self.upsert(expr, value)
def __delitem__(self, expr): converted, _ = self.convert_expression(expr) self.model.delete().where(converted).execute()
def __iter__(self): return iter(self.query().execute())
def keys(self): return map(operator.itemgetter(0), self.query(self.key))
def values(self): return map(operator.itemgetter(0), self.query(self.value))
def items(self): return iter(self.query().execute())
def _update(self, __data=None, **mapping): if __data is not None: mapping.update(__data) return (self.model .insert_many(list(mapping.items()), fields=[self.key, self.value]) .on_conflict('replace') .execute())
def _postgres_update(self, __data=None, **mapping): if __data is not None: mapping.update(__data) return (self.model .insert_many(list(mapping.items()), fields=[self.key, self.value]) .on_conflict(conflict_target=[self.key], preserve=[self.value]) .execute())
def get(self, key, default=None): try: return self[key] except KeyError: return default
def setdefault(self, key, default=None): try: return self[key] except KeyError: self[key] = default return default
def pop(self, key, default=Sentinel): with self._database.atomic(): try: result = self[key] except KeyError: if default is Sentinel: raise return default del self[key]
return result
def clear(self): self.model.delete().execute()
|