Viewing file: test_dialect.py (6.7 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#! coding: utf-8
from .. import assert_raises from .. import config from .. import eq_ from .. import fixtures from .. import ne_ from .. import provide_metadata from ..config import requirements from ..schema import Column from ..schema import Table from ... import exc from ... import Integer from ... import literal_column from ... import select from ... import String from ...util import compat
class ExceptionTest(fixtures.TablesTest): """Test basic exception wrapping.
DBAPIs vary a lot in exception behavior so to actually anticipate specific exceptions from real round trips, we need to be conservative.
"""
run_deletes = "each"
__backend__ = True
@classmethod def define_tables(cls, metadata): Table( "manual_pk", metadata, Column("id", Integer, primary_key=True, autoincrement=False), Column("data", String(50)), )
@requirements.duplicate_key_raises_integrity_error def test_integrity_error(self):
with config.db.connect() as conn:
trans = conn.begin() conn.execute( self.tables.manual_pk.insert(), {"id": 1, "data": "d1"} )
assert_raises( exc.IntegrityError, conn.execute, self.tables.manual_pk.insert(), {"id": 1, "data": "d1"}, )
trans.rollback()
def test_exception_with_non_ascii(self): with config.db.connect() as conn: try: # try to create an error message that likely has non-ascii # characters in the DBAPI's message string. unfortunately # there's no way to make this happen with some drivers like # mysqlclient, pymysql. this at least does produce a non- # ascii error message for cx_oracle, psycopg2 conn.execute(select([literal_column(u"méil")])) assert False except exc.DBAPIError as err: err_str = str(err)
assert str(err.orig) in str(err)
# test that we are actually getting string on Py2k, unicode # on Py3k. if compat.py2k: assert isinstance(err_str, str) else: assert isinstance(err_str, str)
class IsolationLevelTest(fixtures.TestBase): __backend__ = True
__requires__ = ("isolation_level",)
def _get_non_default_isolation_level(self): levels = requirements.get_isolation_levels(config)
default = levels["default"] supported = levels["supported"]
s = set(supported).difference(["AUTOCOMMIT", default]) if s: return s.pop() else: config.skip_test("no non-default isolation level available")
def test_default_isolation_level(self): eq_( config.db.dialect.default_isolation_level, requirements.get_isolation_levels(config)["default"], )
def test_non_default_isolation_level(self): non_default = self._get_non_default_isolation_level()
with config.db.connect() as conn: existing = conn.get_isolation_level()
ne_(existing, non_default)
conn.execution_options(isolation_level=non_default)
eq_(conn.get_isolation_level(), non_default)
conn.dialect.reset_isolation_level(conn.connection)
eq_(conn.get_isolation_level(), existing)
def test_all_levels(self): levels = requirements.get_isolation_levels(config)
all_levels = levels["supported"]
for level in set(all_levels).difference(["AUTOCOMMIT"]): with config.db.connect() as conn: conn.execution_options(isolation_level=level)
eq_(conn.get_isolation_level(), level)
trans = conn.begin() trans.rollback()
eq_(conn.get_isolation_level(), level)
with config.db.connect() as conn: eq_( conn.get_isolation_level(), levels["default"], )
class AutocommitTest(fixtures.TablesTest):
run_deletes = "each"
__requires__ = ("autocommit",)
__backend__ = True
@classmethod def define_tables(cls, metadata): Table( "some_table", metadata, Column("id", Integer, primary_key=True, autoincrement=False), Column("data", String(50)), test_needs_acid=True, )
def _test_conn_autocommits(self, conn, autocommit): trans = conn.begin() conn.execute( self.tables.some_table.insert(), {"id": 1, "data": "some data"} ) trans.rollback()
eq_( conn.scalar(select([self.tables.some_table.c.id])), 1 if autocommit else None, )
conn.execute(self.tables.some_table.delete())
def test_autocommit_on(self): conn = config.db.connect() c2 = conn.execution_options(isolation_level="AUTOCOMMIT") self._test_conn_autocommits(c2, True)
c2.dialect.reset_isolation_level(c2.connection)
self._test_conn_autocommits(conn, False)
def test_autocommit_off(self): conn = config.db.connect() self._test_conn_autocommits(conn, False)
def test_turn_autocommit_off_via_default_iso_level(self): conn = config.db.connect() conn.execution_options(isolation_level="AUTOCOMMIT") self._test_conn_autocommits(conn, True)
conn.execution_options( isolation_level=requirements.get_isolation_levels(config)[ "default" ] ) self._test_conn_autocommits(conn, False)
class EscapingTest(fixtures.TestBase): @provide_metadata def test_percent_sign_round_trip(self): """test that the DBAPI accommodates for escaped / nonescaped percent signs in a way that matches the compiler
""" m = self.metadata t = Table("t", m, Column("data", String(50))) t.create(config.db) with config.db.begin() as conn: conn.execute(t.insert(), dict(data="some % value")) conn.execute(t.insert(), dict(data="some %% other value"))
eq_( conn.scalar( select([t.c.data]).where( t.c.data == literal_column("'some % value'") ) ), "some % value", )
eq_( conn.scalar( select([t.c.data]).where( t.c.data == literal_column("'some %% other value'") ) ), "some %% other value", )
|