Viewing file: test_ddl.py (8.85 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import random
from . import testing from .. import config from .. import fixtures from .. import util from ..assertions import eq_ from ..config import requirements from ..schema import Table from ... import CheckConstraint from ... import Column from ... import ForeignKeyConstraint from ... import Index from ... import inspect from ... import Integer from ... import schema from ... import String from ... import UniqueConstraint
class TableDDLTest(fixtures.TestBase): __backend__ = True
def _simple_fixture(self, schema=None): return Table( "test_table", self.metadata, Column("id", Integer, primary_key=True, autoincrement=False), Column("data", String(50)), schema=schema, )
def _underscore_fixture(self): return Table( "_test_table", self.metadata, Column("id", Integer, primary_key=True, autoincrement=False), Column("_data", String(50)), )
def _simple_roundtrip(self, table): with config.db.begin() as conn: conn.execute(table.insert().values((1, "some data"))) result = conn.execute(table.select()) eq_(result.first(), (1, "some data"))
@requirements.create_table @util.provide_metadata def test_create_table(self): table = self._simple_fixture() table.create(config.db, checkfirst=False) self._simple_roundtrip(table)
@requirements.create_table @requirements.schemas @util.provide_metadata def test_create_table_schema(self): table = self._simple_fixture(schema=config.test_schema) table.create(config.db, checkfirst=False) self._simple_roundtrip(table)
@requirements.drop_table @util.provide_metadata def test_drop_table(self): table = self._simple_fixture() table.create(config.db, checkfirst=False) table.drop(config.db, checkfirst=False)
@requirements.create_table @util.provide_metadata def test_underscore_names(self): table = self._underscore_fixture() table.create(config.db, checkfirst=False) self._simple_roundtrip(table)
@requirements.comment_reflection @util.provide_metadata def test_add_table_comment(self): table = self._simple_fixture() table.create(config.db, checkfirst=False) table.comment = "a comment" config.db.execute(schema.SetTableComment(table)) eq_( inspect(config.db).get_table_comment("test_table"), {"text": "a comment"}, )
@requirements.comment_reflection @util.provide_metadata def test_drop_table_comment(self): table = self._simple_fixture() table.create(config.db, checkfirst=False) table.comment = "a comment" config.db.execute(schema.SetTableComment(table)) config.db.execute(schema.DropTableComment(table)) eq_(inspect(config.db).get_table_comment("test_table"), {"text": None})
class LongNameBlowoutTest(fixtures.TestBase): """test the creation of a variety of DDL structures and ensure label length limits pass on backends
"""
__backend__ = True
def fk(self, metadata, connection): convention = { "fk": "foreign_key_%(table_name)s_" "%(column_0_N_name)s_" "%(referred_table_name)s_" + ( "_".join( "".join(random.choice("abcdef") for j in range(20)) for i in range(10) ) ), } metadata.naming_convention = convention
Table( "a_things_with_stuff", metadata, Column("id_long_column_name", Integer, primary_key=True), test_needs_fk=True, )
cons = ForeignKeyConstraint( ["aid"], ["a_things_with_stuff.id_long_column_name"] ) Table( "b_related_things_of_value", metadata, Column( "aid", ), cons, test_needs_fk=True, ) actual_name = cons.name
metadata.create_all(connection)
if testing.requires.foreign_key_constraint_name_reflection.enabled: insp = inspect(connection) fks = insp.get_foreign_keys("b_related_things_of_value") reflected_name = fks[0]["name"]
return actual_name, reflected_name else: return actual_name, None
def pk(self, metadata, connection): convention = { "pk": "primary_key_%(table_name)s_" "%(column_0_N_name)s" + ( "_".join( "".join(random.choice("abcdef") for j in range(30)) for i in range(10) ) ), } metadata.naming_convention = convention
a = Table( "a_things_with_stuff", metadata, Column("id_long_column_name", Integer, primary_key=True), Column("id_another_long_name", Integer, primary_key=True), ) cons = a.primary_key actual_name = cons.name
metadata.create_all(connection) insp = inspect(connection) pk = insp.get_pk_constraint("a_things_with_stuff") reflected_name = pk["name"] return actual_name, reflected_name
def ix(self, metadata, connection): convention = { "ix": "index_%(table_name)s_" "%(column_0_N_name)s" + ( "_".join( "".join(random.choice("abcdef") for j in range(30)) for i in range(10) ) ), } metadata.naming_convention = convention
a = Table( "a_things_with_stuff", metadata, Column("id_long_column_name", Integer, primary_key=True), Column("id_another_long_name", Integer), ) cons = Index(None, a.c.id_long_column_name, a.c.id_another_long_name) actual_name = cons.name
metadata.create_all(connection) insp = inspect(connection) ix = insp.get_indexes("a_things_with_stuff") reflected_name = ix[0]["name"] return actual_name, reflected_name
def uq(self, metadata, connection): convention = { "uq": "unique_constraint_%(table_name)s_" "%(column_0_N_name)s" + ( "_".join( "".join(random.choice("abcdef") for j in range(30)) for i in range(10) ) ), } metadata.naming_convention = convention
cons = UniqueConstraint("id_long_column_name", "id_another_long_name") Table( "a_things_with_stuff", metadata, Column("id_long_column_name", Integer, primary_key=True), Column("id_another_long_name", Integer), cons, ) actual_name = cons.name
metadata.create_all(connection) insp = inspect(connection) uq = insp.get_unique_constraints("a_things_with_stuff") reflected_name = uq[0]["name"] return actual_name, reflected_name
def ck(self, metadata, connection): convention = { "ck": "check_constraint_%(table_name)s" + ( "_".join( "".join(random.choice("abcdef") for j in range(30)) for i in range(10) ) ), } metadata.naming_convention = convention
cons = CheckConstraint("some_long_column_name > 5") Table( "a_things_with_stuff", metadata, Column("id_long_column_name", Integer, primary_key=True), Column("some_long_column_name", Integer), cons, ) actual_name = cons.name
metadata.create_all(connection) insp = inspect(connection) ck = insp.get_check_constraints("a_things_with_stuff") reflected_name = ck[0]["name"] return actual_name, reflected_name
@testing.combinations( ("fk",), ("pk",), ("ix",), ("ck", testing.requires.check_constraint_reflection.as_skips()), ("uq", testing.requires.unique_constraint_reflection.as_skips()), argnames="type_", ) @testing.provide_metadata def test_long_convention_name(self, type_, connection): metadata = self.metadata
actual_name, reflected_name = getattr(self, type_)( metadata, connection )
assert len(actual_name) > 255
if reflected_name is not None: overlap = actual_name[0 : len(reflected_name)] if len(overlap) < len(actual_name): eq_(overlap[0:-5], reflected_name[0 : len(overlap) - 5]) else: eq_(overlap, reflected_name)
__all__ = ("TableDDLTest", "LongNameBlowoutTest")
|