This commit is contained in:
2025-08-27 21:11:48 +08:00
parent fe77c5869e
commit 695ec0b000
2490 changed files with 458653 additions and 11 deletions

View File

@@ -0,0 +1,10 @@
from .test_cte import * # noqa
from .test_ddl import * # noqa
from .test_dialect import * # noqa
from .test_insert import * # noqa
from .test_reflection import * # noqa
from .test_results import * # noqa
from .test_select import * # noqa
from .test_sequence import * # noqa
from .test_types import * # noqa
from .test_update_delete import * # noqa

View File

@@ -0,0 +1,211 @@
from .. import config
from .. import fixtures
from ..assertions import eq_
from ..schema import Column
from ..schema import Table
from ... import ForeignKey
from ... import Integer
from ... import select
from ... import String
from ... import testing
class CTETest(fixtures.TablesTest):
__backend__ = True
__requires__ = ("ctes",)
run_inserts = "each"
run_deletes = "each"
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
Column("parent_id", ForeignKey("some_table.id")),
)
Table(
"some_other_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
Column("parent_id", Integer),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "data": "d1", "parent_id": None},
{"id": 2, "data": "d2", "parent_id": 1},
{"id": 3, "data": "d3", "parent_id": 1},
{"id": 4, "data": "d4", "parent_id": 3},
{"id": 5, "data": "d5", "parent_id": 3},
],
)
def test_select_nonrecursive_round_trip(self):
some_table = self.tables.some_table
with config.db.connect() as conn:
cte = (
select([some_table])
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
.cte("some_cte")
)
result = conn.execute(
select([cte.c.data]).where(cte.c.data.in_(["d4", "d5"]))
)
eq_(result.fetchall(), [("d4",)])
def test_select_recursive_round_trip(self):
some_table = self.tables.some_table
with config.db.connect() as conn:
cte = (
select([some_table])
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
.cte("some_cte", recursive=True)
)
cte_alias = cte.alias("c1")
st1 = some_table.alias()
# note that SQL Server requires this to be UNION ALL,
# can't be UNION
cte = cte.union_all(
select([st1]).where(st1.c.id == cte_alias.c.parent_id)
)
result = conn.execute(
select([cte.c.data])
.where(cte.c.data != "d2")
.order_by(cte.c.data.desc())
)
eq_(
result.fetchall(),
[("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)],
)
def test_insert_from_select_round_trip(self):
some_table = self.tables.some_table
some_other_table = self.tables.some_other_table
with config.db.connect() as conn:
cte = (
select([some_table])
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
.cte("some_cte")
)
conn.execute(
some_other_table.insert().from_select(
["id", "data", "parent_id"], select([cte])
)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
[(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)],
)
@testing.requires.ctes_with_update_delete
@testing.requires.update_from
def test_update_from_round_trip(self):
some_table = self.tables.some_table
some_other_table = self.tables.some_other_table
with config.db.connect() as conn:
conn.execute(
some_other_table.insert().from_select(
["id", "data", "parent_id"], select([some_table])
)
)
cte = (
select([some_table])
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
.cte("some_cte")
)
conn.execute(
some_other_table.update()
.values(parent_id=5)
.where(some_other_table.c.data == cte.c.data)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
[
(1, "d1", None),
(2, "d2", 5),
(3, "d3", 5),
(4, "d4", 5),
(5, "d5", 3),
],
)
@testing.requires.ctes_with_update_delete
@testing.requires.delete_from
def test_delete_from_round_trip(self):
some_table = self.tables.some_table
some_other_table = self.tables.some_other_table
with config.db.connect() as conn:
conn.execute(
some_other_table.insert().from_select(
["id", "data", "parent_id"], select([some_table])
)
)
cte = (
select([some_table])
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
.cte("some_cte")
)
conn.execute(
some_other_table.delete().where(
some_other_table.c.data == cte.c.data
)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
[(1, "d1", None), (5, "d5", 3)],
)
@testing.requires.ctes_with_update_delete
def test_delete_scalar_subq_round_trip(self):
some_table = self.tables.some_table
some_other_table = self.tables.some_other_table
with config.db.connect() as conn:
conn.execute(
some_other_table.insert().from_select(
["id", "data", "parent_id"], select([some_table])
)
)
cte = (
select([some_table])
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
.cte("some_cte")
)
conn.execute(
some_other_table.delete().where(
some_other_table.c.data
== select([cte.c.data]).where(
cte.c.id == some_other_table.c.id
)
)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
[(1, "d1", None), (5, "d5", 3)],
)

View File

@@ -0,0 +1,91 @@
from .. import config
from .. import fixtures
from .. import util
from ..assertions import eq_
from ..config import requirements
from ... import Column
from ... import inspect
from ... import Integer
from ... import schema
from ... import String
from ... import Table
class TableDDLTest(fixtures.TestBase):
__backend__ = True
def _simple_fixture(self, schema=None):
return Table(
"test_table",
self.metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
schema=schema,
)
def _underscore_fixture(self):
return Table(
"_test_table",
self.metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("_data", String(50)),
)
def _simple_roundtrip(self, table):
with config.db.begin() as conn:
conn.execute(table.insert().values((1, "some data")))
result = conn.execute(table.select())
eq_(result.first(), (1, "some data"))
@requirements.create_table
@util.provide_metadata
def test_create_table(self):
table = self._simple_fixture()
table.create(config.db, checkfirst=False)
self._simple_roundtrip(table)
@requirements.create_table
@util.provide_metadata
def test_create_table_schema(self):
table = self._simple_fixture(schema=config.test_schema)
table.create(config.db, checkfirst=False)
self._simple_roundtrip(table)
@requirements.drop_table
@util.provide_metadata
def test_drop_table(self):
table = self._simple_fixture()
table.create(config.db, checkfirst=False)
table.drop(config.db, checkfirst=False)
@requirements.create_table
@util.provide_metadata
def test_underscore_names(self):
table = self._underscore_fixture()
table.create(config.db, checkfirst=False)
self._simple_roundtrip(table)
@requirements.comment_reflection
@util.provide_metadata
def test_add_table_comment(self):
table = self._simple_fixture()
table.create(config.db, checkfirst=False)
table.comment = "a comment"
config.db.execute(schema.SetTableComment(table))
eq_(
inspect(config.db).get_table_comment("test_table"),
{"text": "a comment"},
)
@requirements.comment_reflection
@util.provide_metadata
def test_drop_table_comment(self):
table = self._simple_fixture()
table.create(config.db, checkfirst=False)
table.comment = "a comment"
config.db.execute(schema.SetTableComment(table))
config.db.execute(schema.DropTableComment(table))
eq_(inspect(config.db).get_table_comment("test_table"), {"text": None})
__all__ = ("TableDDLTest",)

View File

@@ -0,0 +1,156 @@
#! coding: utf-8
from .. import assert_raises
from .. import config
from .. import eq_
from .. import fixtures
from .. import provide_metadata
from ..config import requirements
from ..schema import Column
from ..schema import Table
from ... import exc
from ... import Integer
from ... import literal_column
from ... import select
from ... import String
from ...util import compat
class ExceptionTest(fixtures.TablesTest):
"""Test basic exception wrapping.
DBAPIs vary a lot in exception behavior so to actually anticipate
specific exceptions from real round trips, we need to be conservative.
"""
run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"manual_pk",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
)
@requirements.duplicate_key_raises_integrity_error
def test_integrity_error(self):
with config.db.connect() as conn:
trans = conn.begin()
conn.execute(
self.tables.manual_pk.insert(), {"id": 1, "data": "d1"}
)
assert_raises(
exc.IntegrityError,
conn.execute,
self.tables.manual_pk.insert(),
{"id": 1, "data": "d1"},
)
trans.rollback()
def test_exception_with_non_ascii(self):
with config.db.connect() as conn:
try:
# try to create an error message that likely has non-ascii
# characters in the DBAPI's message string. unfortunately
# there's no way to make this happen with some drivers like
# mysqlclient, pymysql. this at least does produce a non-
# ascii error message for cx_oracle, psycopg2
conn.execute(select([literal_column(u"méil")]))
assert False
except exc.DBAPIError as err:
err_str = str(err)
assert str(err.orig) in str(err)
# test that we are actually getting string on Py2k, unicode
# on Py3k.
if compat.py2k:
assert isinstance(err_str, str)
else:
assert isinstance(err_str, str)
class AutocommitTest(fixtures.TablesTest):
run_deletes = "each"
__requires__ = ("autocommit",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
test_needs_acid=True,
)
def _test_conn_autocommits(self, conn, autocommit):
trans = conn.begin()
conn.execute(
self.tables.some_table.insert(), {"id": 1, "data": "some data"}
)
trans.rollback()
eq_(
conn.scalar(select([self.tables.some_table.c.id])),
1 if autocommit else None,
)
conn.execute(self.tables.some_table.delete())
def test_autocommit_on(self):
conn = config.db.connect()
c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
self._test_conn_autocommits(c2, True)
conn.invalidate()
self._test_conn_autocommits(conn, False)
def test_autocommit_off(self):
conn = config.db.connect()
self._test_conn_autocommits(conn, False)
class EscapingTest(fixtures.TestBase):
@provide_metadata
def test_percent_sign_round_trip(self):
"""test that the DBAPI accommodates for escaped / nonescaped
percent signs in a way that matches the compiler
"""
m = self.metadata
t = Table("t", m, Column("data", String(50)))
t.create(config.db)
with config.db.begin() as conn:
conn.execute(t.insert(), dict(data="some % value"))
conn.execute(t.insert(), dict(data="some %% other value"))
eq_(
conn.scalar(
select([t.c.data]).where(
t.c.data == literal_column("'some % value'")
)
),
"some % value",
)
eq_(
conn.scalar(
select([t.c.data]).where(
t.c.data == literal_column("'some %% other value'")
)
),
"some %% other value",
)

View File

@@ -0,0 +1,317 @@
from .. import config
from .. import engines
from .. import fixtures
from ..assertions import eq_
from ..config import requirements
from ..schema import Column
from ..schema import Table
from ... import Integer
from ... import literal
from ... import literal_column
from ... import select
from ... import String
class LastrowidTest(fixtures.TablesTest):
run_deletes = "each"
__backend__ = True
__requires__ = "implements_get_lastrowid", "autoincrement_insert"
__engine_options__ = {"implicit_returning": False}
@classmethod
def define_tables(cls, metadata):
Table(
"autoinc_pk",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("data", String(50)),
)
Table(
"manual_pk",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
)
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(row, (config.db.dialect.default_sequence_base, "some data"))
def test_autoincrement_on_insert(self):
config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
self._assert_round_trip(self.tables.autoinc_pk, config.db)
def test_last_inserted_id(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])
# failed on pypy1.9 but seems to be OK on pypy 2.1
# @exclusions.fails_if(lambda: util.pypy,
# "lastrowid not maintained after "
# "connection close")
@requirements.dbapi_lastrowid
def test_native_lastrowid_autoinc(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
lastrowid = r.lastrowid
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(lastrowid, pk)
class InsertBehaviorTest(fixtures.TablesTest):
run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"autoinc_pk",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("data", String(50)),
)
Table(
"manual_pk",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
)
Table(
"includes_defaults",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("data", String(50)),
Column("x", Integer, default=5),
Column(
"y",
Integer,
default=literal_column("2", type_=Integer) + literal(2),
),
)
def test_autoclose_on_insert(self):
if requirements.returning.enabled:
engine = engines.testing_engine(
options={"implicit_returning": False}
)
else:
engine = config.db
r = engine.execute(self.tables.autoinc_pk.insert(), data="some data")
assert r._soft_closed
assert not r.closed
assert r.is_insert
assert not r.returns_rows
@requirements.returning
def test_autoclose_on_insert_implicit_returning(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
assert r._soft_closed
assert not r.closed
assert r.is_insert
assert not r.returns_rows
@requirements.empty_inserts
def test_empty_insert(self):
r = config.db.execute(self.tables.autoinc_pk.insert())
assert r._soft_closed
assert not r.closed
r = config.db.execute(
self.tables.autoinc_pk.select().where(
self.tables.autoinc_pk.c.id != None
)
)
assert len(r.fetchall())
@requirements.insert_from_select
def test_insert_from_select_autoinc(self):
src_table = self.tables.manual_pk
dest_table = self.tables.autoinc_pk
config.db.execute(
src_table.insert(),
[
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
],
)
result = config.db.execute(
dest_table.insert().from_select(
("data",),
select([src_table.c.data]).where(
src_table.c.data.in_(["data2", "data3"])
),
)
)
eq_(result.inserted_primary_key, [None])
result = config.db.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
eq_(result.fetchall(), [("data2",), ("data3",)])
@requirements.insert_from_select
def test_insert_from_select_autoinc_no_rows(self):
src_table = self.tables.manual_pk
dest_table = self.tables.autoinc_pk
result = config.db.execute(
dest_table.insert().from_select(
("data",),
select([src_table.c.data]).where(
src_table.c.data.in_(["data2", "data3"])
),
)
)
eq_(result.inserted_primary_key, [None])
result = config.db.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
eq_(result.fetchall(), [])
@requirements.insert_from_select
def test_insert_from_select(self):
table = self.tables.manual_pk
config.db.execute(
table.insert(),
[
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
],
)
config.db.execute(
table.insert(inline=True).from_select(
("id", "data"),
select([table.c.id + 5, table.c.data]).where(
table.c.data.in_(["data2", "data3"])
),
)
)
eq_(
config.db.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
[("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
)
@requirements.insert_from_select
def test_insert_from_select_with_defaults(self):
table = self.tables.includes_defaults
config.db.execute(
table.insert(),
[
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
],
)
config.db.execute(
table.insert(inline=True).from_select(
("id", "data"),
select([table.c.id + 5, table.c.data]).where(
table.c.data.in_(["data2", "data3"])
),
)
)
eq_(
config.db.execute(
select([table]).order_by(table.c.data, table.c.id)
).fetchall(),
[
(1, "data1", 5, 4),
(2, "data2", 5, 4),
(7, "data2", 5, 4),
(3, "data3", 5, 4),
(8, "data3", 5, 4),
],
)
class ReturningTest(fixtures.TablesTest):
run_create_tables = "each"
__requires__ = "returning", "autoincrement_insert"
__backend__ = True
__engine_options__ = {"implicit_returning": True}
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(row, (config.db.dialect.default_sequence_base, "some data"))
@classmethod
def define_tables(cls, metadata):
Table(
"autoinc_pk",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("data", String(50)),
)
@requirements.fetch_rows_post_commit
def test_explicit_returning_pk_autocommit(self):
engine = config.db
table = self.tables.autoinc_pk
r = engine.execute(
table.insert().returning(table.c.id), data="some data"
)
pk = r.first()[0]
fetched_pk = config.db.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
def test_explicit_returning_pk_no_autocommit(self):
engine = config.db
table = self.tables.autoinc_pk
with engine.begin() as conn:
r = conn.execute(
table.insert().returning(table.c.id), data="some data"
)
pk = r.first()[0]
fetched_pk = config.db.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
def test_autoincrement_on_insert_implicit_returning(self):
config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
self._assert_round_trip(self.tables.autoinc_pk, config.db)
def test_last_inserted_id_implicit_returning(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])
__all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,370 @@
import datetime
from .. import config
from .. import engines
from .. import fixtures
from ..assertions import eq_
from ..config import requirements
from ..schema import Column
from ..schema import Table
from ... import DateTime
from ... import func
from ... import Integer
from ... import select
from ... import sql
from ... import String
from ... import testing
from ... import text
class RowFetchTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"plain_pk",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
Table(
"has_dates",
metadata,
Column("id", Integer, primary_key=True),
Column("today", DateTime),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.plain_pk.insert(),
[
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
],
)
config.db.execute(
cls.tables.has_dates.insert(),
[{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}],
)
def test_via_string(self):
row = config.db.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row["id"], 1)
eq_(row["data"], "d1")
def test_via_int(self):
row = config.db.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row[0], 1)
eq_(row[1], "d1")
def test_via_col_object(self):
row = config.db.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row[self.tables.plain_pk.c.id], 1)
eq_(row[self.tables.plain_pk.c.data], "d1")
@requirements.duplicate_names_in_cursor_description
def test_row_with_dupe_names(self):
result = config.db.execute(
select(
[
self.tables.plain_pk.c.data,
self.tables.plain_pk.c.data.label("data"),
]
).order_by(self.tables.plain_pk.c.id)
)
row = result.first()
eq_(result.keys(), ["data", "data"])
eq_(row, ("d1", "d1"))
def test_row_w_scalar_select(self):
"""test that a scalar select as a column is returned as such
and that type conversion works OK.
(this is half a SQLAlchemy Core test and half to catch database
backends that may have unusual behavior with scalar selects.)
"""
datetable = self.tables.has_dates
s = select([datetable.alias("x").c.today]).as_scalar()
s2 = select([datetable.c.id, s.label("somelabel")])
row = config.db.execute(s2).first()
eq_(row["somelabel"], datetime.datetime(2006, 5, 12, 12, 0, 0))
class PercentSchemaNamesTest(fixtures.TablesTest):
"""tests using percent signs, spaces in table and column names.
This is a very fringe use case, doesn't work for MySQL
or PostgreSQL. the requirement, "percent_schema_names",
is marked "skip" by default.
"""
__requires__ = ("percent_schema_names",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
cls.tables.percent_table = Table(
"percent%table",
metadata,
Column("percent%", Integer),
Column("spaces % more spaces", Integer),
)
cls.tables.lightweight_percent_table = sql.table(
"percent%table",
sql.column("percent%"),
sql.column("spaces % more spaces"),
)
def test_single_roundtrip(self):
percent_table = self.tables.percent_table
for params in [
{"percent%": 5, "spaces % more spaces": 12},
{"percent%": 7, "spaces % more spaces": 11},
{"percent%": 9, "spaces % more spaces": 10},
{"percent%": 11, "spaces % more spaces": 9},
]:
config.db.execute(percent_table.insert(), params)
self._assert_table()
def test_executemany_roundtrip(self):
percent_table = self.tables.percent_table
config.db.execute(
percent_table.insert(), {"percent%": 5, "spaces % more spaces": 12}
)
config.db.execute(
percent_table.insert(),
[
{"percent%": 7, "spaces % more spaces": 11},
{"percent%": 9, "spaces % more spaces": 10},
{"percent%": 11, "spaces % more spaces": 9},
],
)
self._assert_table()
def _assert_table(self):
percent_table = self.tables.percent_table
lightweight_percent_table = self.tables.lightweight_percent_table
for table in (
percent_table,
percent_table.alias(),
lightweight_percent_table,
lightweight_percent_table.alias(),
):
eq_(
list(
config.db.execute(
table.select().order_by(table.c["percent%"])
)
),
[(5, 12), (7, 11), (9, 10), (11, 9)],
)
eq_(
list(
config.db.execute(
table.select()
.where(table.c["spaces % more spaces"].in_([9, 10]))
.order_by(table.c["percent%"])
)
),
[(9, 10), (11, 9)],
)
row = config.db.execute(
table.select().order_by(table.c["percent%"])
).first()
eq_(row["percent%"], 5)
eq_(row["spaces % more spaces"], 12)
eq_(row[table.c["percent%"]], 5)
eq_(row[table.c["spaces % more spaces"]], 12)
config.db.execute(
percent_table.update().values(
{percent_table.c["spaces % more spaces"]: 15}
)
)
eq_(
list(
config.db.execute(
percent_table.select().order_by(
percent_table.c["percent%"]
)
)
),
[(5, 15), (7, 15), (9, 15), (11, 15)],
)
class ServerSideCursorsTest(
fixtures.TestBase, testing.AssertsExecutionResults
):
__requires__ = ("server_side_cursors",)
__backend__ = True
def _is_server_side(self, cursor):
if self.engine.dialect.driver == "psycopg2":
return cursor.name
elif self.engine.dialect.driver == "pymysql":
sscursor = __import__("pymysql.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
else:
return False
def _fixture(self, server_side_cursors):
self.engine = engines.testing_engine(
options={"server_side_cursors": server_side_cursors}
)
return self.engine
def tearDown(self):
engines.testing_reaper.close_all()
self.engine.dispose()
def test_global_string(self):
engine = self._fixture(True)
result = engine.execute("select 1")
assert self._is_server_side(result.cursor)
def test_global_text(self):
engine = self._fixture(True)
result = engine.execute(text("select 1"))
assert self._is_server_side(result.cursor)
def test_global_expr(self):
engine = self._fixture(True)
result = engine.execute(select([1]))
assert self._is_server_side(result.cursor)
def test_global_off_explicit(self):
engine = self._fixture(False)
result = engine.execute(text("select 1"))
# It should be off globally ...
assert not self._is_server_side(result.cursor)
def test_stmt_option(self):
engine = self._fixture(False)
s = select([1]).execution_options(stream_results=True)
result = engine.execute(s)
# ... but enabled for this one.
assert self._is_server_side(result.cursor)
def test_conn_option(self):
engine = self._fixture(False)
# and this one
result = (
engine.connect()
.execution_options(stream_results=True)
.execute("select 1")
)
assert self._is_server_side(result.cursor)
def test_stmt_enabled_conn_option_disabled(self):
engine = self._fixture(False)
s = select([1]).execution_options(stream_results=True)
# not this one
result = (
engine.connect().execution_options(stream_results=False).execute(s)
)
assert not self._is_server_side(result.cursor)
def test_stmt_option_disabled(self):
engine = self._fixture(True)
s = select([1]).execution_options(stream_results=False)
result = engine.execute(s)
assert not self._is_server_side(result.cursor)
def test_aliases_and_ss(self):
engine = self._fixture(False)
s1 = select([1]).execution_options(stream_results=True).alias()
result = engine.execute(s1)
assert self._is_server_side(result.cursor)
# s1's options shouldn't affect s2 when s2 is used as a
# from_obj.
s2 = select([1], from_obj=s1)
result = engine.execute(s2)
assert not self._is_server_side(result.cursor)
def test_for_update_expr(self):
engine = self._fixture(True)
s1 = select([1]).with_for_update()
result = engine.execute(s1)
assert self._is_server_side(result.cursor)
def test_for_update_string(self):
engine = self._fixture(True)
result = engine.execute("SELECT 1 FOR UPDATE")
assert self._is_server_side(result.cursor)
def test_text_no_ss(self):
engine = self._fixture(False)
s = text("select 42")
result = engine.execute(s)
assert not self._is_server_side(result.cursor)
def test_text_ss_option(self):
engine = self._fixture(False)
s = text("select 42").execution_options(stream_results=True)
result = engine.execute(s)
assert self._is_server_side(result.cursor)
@testing.provide_metadata
def test_roundtrip(self):
md = self.metadata
self._fixture(True)
test_table = Table(
"test_table",
md,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
test_table.create(checkfirst=True)
test_table.insert().execute(data="data1")
test_table.insert().execute(data="data2")
eq_(
test_table.select().order_by(test_table.c.id).execute().fetchall(),
[(1, "data1"), (2, "data2")],
)
test_table.update().where(test_table.c.id == 2).values(
data=test_table.c.data + " updated"
).execute()
eq_(
test_table.select().order_by(test_table.c.id).execute().fetchall(),
[(1, "data1"), (2, "data2 updated")],
)
test_table.delete().execute()
eq_(select([func.count("*")]).select_from(test_table).scalar(), 0)

View File

@@ -0,0 +1,658 @@
from .. import config
from .. import fixtures
from ..assertions import eq_
from ..assertions import in_
from ..schema import Column
from ..schema import Table
from ... import bindparam
from ... import case
from ... import false
from ... import func
from ... import Integer
from ... import literal_column
from ... import null
from ... import select
from ... import String
from ... import testing
from ... import true
from ... import tuple_
from ... import union
from ... import util
class CollateTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(100)),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "data": "collate data1"},
{"id": 2, "data": "collate data2"},
],
)
def _assert_result(self, select, result):
eq_(config.db.execute(select).fetchall(), result)
@testing.requires.order_by_collation
def test_collate_order_by(self):
collation = testing.requires.get_order_by_collation(testing.config)
self._assert_result(
select([self.tables.some_table]).order_by(
self.tables.some_table.c.data.collate(collation).asc()
),
[(1, "collate data1"), (2, "collate data2")],
)
class OrderByLabelTest(fixtures.TablesTest):
"""Test the dialect sends appropriate ORDER BY expressions when
labels are used.
This essentially exercises the "supports_simple_order_by_label"
setting.
"""
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
Column("q", String(50)),
Column("p", String(50)),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "q": "q1", "p": "p3"},
{"id": 2, "x": 2, "y": 3, "q": "q2", "p": "p2"},
{"id": 3, "x": 3, "y": 4, "q": "q3", "p": "p1"},
],
)
def _assert_result(self, select, result):
eq_(config.db.execute(select).fetchall(), result)
def test_plain(self):
table = self.tables.some_table
lx = table.c.x.label("lx")
self._assert_result(select([lx]).order_by(lx), [(1,), (2,), (3,)])
def test_composed_int(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label("lx")
self._assert_result(select([lx]).order_by(lx), [(3,), (5,), (7,)])
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label("lx")
ly = (func.lower(table.c.q) + table.c.p).label("ly")
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u("q1p3")), (5, util.u("q2p2")), (7, util.u("q3p1"))],
)
def test_plain_desc(self):
table = self.tables.some_table
lx = table.c.x.label("lx")
self._assert_result(
select([lx]).order_by(lx.desc()), [(3,), (2,), (1,)]
)
def test_composed_int_desc(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label("lx")
self._assert_result(
select([lx]).order_by(lx.desc()), [(7,), (5,), (3,)]
)
@testing.requires.group_by_complex_expression
def test_group_by_composed(self):
table = self.tables.some_table
expr = (table.c.x + table.c.y).label("lx")
stmt = (
select([func.count(table.c.id), expr])
.group_by(expr)
.order_by(expr)
)
self._assert_result(stmt, [(1, 3), (1, 5), (1, 7)])
class LimitOffsetTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2},
{"id": 2, "x": 2, "y": 3},
{"id": 3, "x": 3, "y": 4},
{"id": 4, "x": 4, "y": 5},
],
)
def _assert_result(self, select, result, params=()):
eq_(config.db.execute(select, params).fetchall(), result)
def test_simple_limit(self):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).limit(2),
[(1, 1, 2), (2, 2, 3)],
)
@testing.requires.offset
def test_simple_offset(self):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).offset(2),
[(3, 3, 4), (4, 4, 5)],
)
@testing.requires.offset
def test_simple_limit_offset(self):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).limit(2).offset(1),
[(2, 2, 3), (3, 3, 4)],
)
@testing.requires.offset
def test_limit_offset_nobinds(self):
"""test that 'literal binds' mode works - no bound params."""
table = self.tables.some_table
stmt = select([table]).order_by(table.c.id).limit(2).offset(1)
sql = stmt.compile(
dialect=config.db.dialect, compile_kwargs={"literal_binds": True}
)
sql = str(sql)
self._assert_result(sql, [(2, 2, 3), (3, 3, 4)])
@testing.requires.bound_limit_offset
def test_bound_limit(self):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).limit(bindparam("l")),
[(1, 1, 2), (2, 2, 3)],
params={"l": 2},
)
@testing.requires.bound_limit_offset
def test_bound_offset(self):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).offset(bindparam("o")),
[(3, 3, 4), (4, 4, 5)],
params={"o": 2},
)
@testing.requires.bound_limit_offset
def test_bound_limit_offset(self):
table = self.tables.some_table
self._assert_result(
select([table])
.order_by(table.c.id)
.limit(bindparam("l"))
.offset(bindparam("o")),
[(2, 2, 3), (3, 3, 4)],
params={"l": 2, "o": 1},
)
class CompoundSelectTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2},
{"id": 2, "x": 2, "y": 3},
{"id": 3, "x": 3, "y": 4},
{"id": 4, "x": 4, "y": 5},
],
)
def _assert_result(self, select, result, params=()):
eq_(config.db.execute(select, params).fetchall(), result)
def test_plain_union(self):
table = self.tables.some_table
s1 = select([table]).where(table.c.id == 2)
s2 = select([table]).where(table.c.id == 3)
u1 = union(s1, s2)
self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
def test_select_from_plain_union(self):
table = self.tables.some_table
s1 = select([table]).where(table.c.id == 2)
s2 = select([table]).where(table.c.id == 3)
u1 = union(s1, s2).alias().select()
self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
@testing.requires.order_by_col_from_union
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_selectable_in_unions(self):
table = self.tables.some_table
s1 = (
select([table])
.where(table.c.id == 2)
.limit(1)
.order_by(table.c.id)
)
s2 = (
select([table])
.where(table.c.id == 3)
.limit(1)
.order_by(table.c.id)
)
u1 = union(s1, s2).limit(2)
self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
@testing.requires.parens_in_union_contained_select_wo_limit_offset
def test_order_by_selectable_in_unions(self):
table = self.tables.some_table
s1 = select([table]).where(table.c.id == 2).order_by(table.c.id)
s2 = select([table]).where(table.c.id == 3).order_by(table.c.id)
u1 = union(s1, s2).limit(2)
self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
def test_distinct_selectable_in_unions(self):
table = self.tables.some_table
s1 = select([table]).where(table.c.id == 2).distinct()
s2 = select([table]).where(table.c.id == 3).distinct()
u1 = union(s1, s2).limit(2)
self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_in_unions_from_alias(self):
table = self.tables.some_table
s1 = (
select([table])
.where(table.c.id == 2)
.limit(1)
.order_by(table.c.id)
)
s2 = (
select([table])
.where(table.c.id == 3)
.limit(1)
.order_by(table.c.id)
)
# this necessarily has double parens
u1 = union(s1, s2).alias()
self._assert_result(
u1.select().limit(2).order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
)
def test_limit_offset_aliased_selectable_in_unions(self):
table = self.tables.some_table
s1 = (
select([table])
.where(table.c.id == 2)
.limit(1)
.order_by(table.c.id)
.alias()
.select()
)
s2 = (
select([table])
.where(table.c.id == 3)
.limit(1)
.order_by(table.c.id)
.alias()
.select()
)
u1 = union(s1, s2).limit(2)
self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
class ExpandingBoundInTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
Column("z", String(50)),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "z": "z1"},
{"id": 2, "x": 2, "y": 3, "z": "z2"},
{"id": 3, "x": 3, "y": 4, "z": "z3"},
{"id": 4, "x": 4, "y": 5, "z": "z4"},
],
)
def _assert_result(self, select, result, params=()):
eq_(config.db.execute(select, params).fetchall(), result)
def test_multiple_empty_sets(self):
# test that any anonymous aliasing used by the dialect
# is fine with duplicates
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(table.c.x.in_(bindparam("q", expanding=True)))
.where(table.c.y.in_(bindparam("p", expanding=True)))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": [], "p": []})
@testing.requires.tuple_in
def test_empty_heterogeneous_tuples(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(
tuple_(table.c.x, table.c.z).in_(
bindparam("q", expanding=True)
)
)
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
@testing.requires.tuple_in
def test_empty_homogeneous_tuples(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(
tuple_(table.c.x, table.c.y).in_(
bindparam("q", expanding=True)
)
)
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
def test_bound_in_scalar(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(table.c.x.in_(bindparam("q", expanding=True)))
.order_by(table.c.id)
)
self._assert_result(stmt, [(2,), (3,), (4,)], params={"q": [2, 3, 4]})
@testing.requires.tuple_in
def test_bound_in_two_tuple(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(
tuple_(table.c.x, table.c.y).in_(
bindparam("q", expanding=True)
)
)
.order_by(table.c.id)
)
self._assert_result(
stmt, [(2,), (3,), (4,)], params={"q": [(2, 3), (3, 4), (4, 5)]}
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(
tuple_(table.c.x, table.c.z).in_(
bindparam("q", expanding=True)
)
)
.order_by(table.c.id)
)
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
def test_empty_set_against_integer(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(table.c.x.in_(bindparam("q", expanding=True)))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
def test_empty_set_against_integer_negation(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(table.c.x.notin_(bindparam("q", expanding=True)))
.order_by(table.c.id)
)
self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
def test_empty_set_against_string(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(table.c.z.in_(bindparam("q", expanding=True)))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
def test_empty_set_against_string_negation(self):
table = self.tables.some_table
stmt = (
select([table.c.id])
.where(table.c.z.notin_(bindparam("q", expanding=True)))
.order_by(table.c.id)
)
self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
def test_null_in_empty_set_is_false(self):
stmt = select(
[
case(
[
(
null().in_(
bindparam("foo", value=(), expanding=True)
),
true(),
)
],
else_=false(),
)
]
)
in_(config.db.execute(stmt).fetchone()[0], (False, 0))
class LikeFunctionsTest(fixtures.TablesTest):
__backend__ = True
run_inserts = "once"
run_deletes = None
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "data": "abcdefg"},
{"id": 2, "data": "ab/cdefg"},
{"id": 3, "data": "ab%cdefg"},
{"id": 4, "data": "ab_cdefg"},
{"id": 5, "data": "abcde/fg"},
{"id": 6, "data": "abcde%fg"},
{"id": 7, "data": "ab#cdefg"},
{"id": 8, "data": "ab9cdefg"},
{"id": 9, "data": "abcde#fg"},
{"id": 10, "data": "abcd9fg"},
],
)
def _test(self, expr, expected):
some_table = self.tables.some_table
with config.db.connect() as conn:
rows = {
value
for value, in conn.execute(
select([some_table.c.id]).where(expr)
)
}
eq_(rows, expected)
def test_startswith_unescaped(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab%c"), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
def test_startswith_autoescape(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab%c", autoescape=True), {3})
def test_startswith_sqlexpr(self):
col = self.tables.some_table.c.data
self._test(
col.startswith(literal_column("'ab%c'")),
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
)
def test_startswith_escape(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab##c", escape="#"), {7})
def test_startswith_autoescape_escape(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab%c", autoescape=True, escape="#"), {3})
self._test(col.startswith("ab#c", autoescape=True, escape="#"), {7})
def test_endswith_unescaped(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e%fg"), {1, 2, 3, 4, 5, 6, 7, 8, 9})
def test_endswith_sqlexpr(self):
col = self.tables.some_table.c.data
self._test(
col.endswith(literal_column("'e%fg'")), {1, 2, 3, 4, 5, 6, 7, 8, 9}
)
def test_endswith_autoescape(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e%fg", autoescape=True), {6})
def test_endswith_escape(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e##fg", escape="#"), {9})
def test_endswith_autoescape_escape(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e%fg", autoescape=True, escape="#"), {6})
self._test(col.endswith("e#fg", autoescape=True, escape="#"), {9})
def test_contains_unescaped(self):
col = self.tables.some_table.c.data
self._test(col.contains("b%cde"), {1, 2, 3, 4, 5, 6, 7, 8, 9})
def test_contains_autoescape(self):
col = self.tables.some_table.c.data
self._test(col.contains("b%cde", autoescape=True), {3})
def test_contains_escape(self):
col = self.tables.some_table.c.data
self._test(col.contains("b##cde", escape="#"), {7})
def test_contains_autoescape_escape(self):
col = self.tables.some_table.c.data
self._test(col.contains("b%cd", autoescape=True, escape="#"), {3})
self._test(col.contains("b#cd", autoescape=True, escape="#"), {7})

View File

@@ -0,0 +1,156 @@
from .. import config
from .. import fixtures
from ..assertions import eq_
from ..config import requirements
from ..schema import Column
from ..schema import Table
from ... import Integer
from ... import MetaData
from ... import schema
from ... import Sequence
from ... import String
from ... import testing
class SequenceTest(fixtures.TablesTest):
__requires__ = ("sequences",)
__backend__ = True
run_create_tables = "each"
@classmethod
def define_tables(cls, metadata):
Table(
"seq_pk",
metadata,
Column("id", Integer, Sequence("tab_id_seq"), primary_key=True),
Column("data", String(50)),
)
Table(
"seq_opt_pk",
metadata,
Column(
"id",
Integer,
Sequence("tab_id_seq", optional=True),
primary_key=True,
),
Column("data", String(50)),
)
def test_insert_roundtrip(self):
config.db.execute(self.tables.seq_pk.insert(), data="some data")
self._assert_round_trip(self.tables.seq_pk, config.db)
def test_insert_lastrowid(self):
r = config.db.execute(self.tables.seq_pk.insert(), data="some data")
eq_(r.inserted_primary_key, [1])
def test_nextval_direct(self):
r = config.db.execute(self.tables.seq_pk.c.id.default)
eq_(r, 1)
@requirements.sequences_optional
def test_optional_seq(self):
r = config.db.execute(
self.tables.seq_opt_pk.insert(), data="some data"
)
eq_(r.inserted_primary_key, [1])
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(row, (1, "some data"))
class SequenceCompilerTest(testing.AssertsCompiledSQL, fixtures.TestBase):
__requires__ = ("sequences",)
__backend__ = True
def test_literal_binds_inline_compile(self):
table = Table(
"x",
MetaData(),
Column("y", Integer, Sequence("y_seq")),
Column("q", Integer),
)
stmt = table.insert().values(q=5)
seq_nextval = testing.db.dialect.statement_compiler(
statement=None, dialect=testing.db.dialect
).visit_sequence(Sequence("y_seq"))
self.assert_compile(
stmt,
"INSERT INTO x (y, q) VALUES (%s, 5)" % (seq_nextval,),
literal_binds=True,
dialect=testing.db.dialect,
)
class HasSequenceTest(fixtures.TestBase):
__requires__ = ("sequences",)
__backend__ = True
def test_has_sequence(self):
s1 = Sequence("user_id_seq")
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(
testing.db.dialect.has_sequence(testing.db, "user_id_seq"),
True,
)
finally:
testing.db.execute(schema.DropSequence(s1))
@testing.requires.schemas
def test_has_sequence_schema(self):
s1 = Sequence("user_id_seq", schema=config.test_schema)
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(
testing.db.dialect.has_sequence(
testing.db, "user_id_seq", schema=config.test_schema
),
True,
)
finally:
testing.db.execute(schema.DropSequence(s1))
def test_has_sequence_neg(self):
eq_(testing.db.dialect.has_sequence(testing.db, "user_id_seq"), False)
@testing.requires.schemas
def test_has_sequence_schemas_neg(self):
eq_(
testing.db.dialect.has_sequence(
testing.db, "user_id_seq", schema=config.test_schema
),
False,
)
@testing.requires.schemas
def test_has_sequence_default_not_in_remote(self):
s1 = Sequence("user_id_seq")
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(
testing.db.dialect.has_sequence(
testing.db, "user_id_seq", schema=config.test_schema
),
False,
)
finally:
testing.db.execute(schema.DropSequence(s1))
@testing.requires.schemas
def test_has_sequence_remote_not_in_default(self):
s1 = Sequence("user_id_seq", schema=config.test_schema)
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(
testing.db.dialect.has_sequence(testing.db, "user_id_seq"),
False,
)
finally:
testing.db.execute(schema.DropSequence(s1))

View File

@@ -0,0 +1,998 @@
# coding: utf-8
import datetime
import decimal
import json
import mock
from .. import config
from .. import engines
from .. import fixtures
from ..assertions import eq_
from ..config import requirements
from ..schema import Column
from ..schema import Table
from ... import and_
from ... import BigInteger
from ... import Boolean
from ... import cast
from ... import Date
from ... import DateTime
from ... import Float
from ... import Integer
from ... import JSON
from ... import literal
from ... import MetaData
from ... import null
from ... import Numeric
from ... import select
from ... import String
from ... import testing
from ... import Text
from ... import Time
from ... import TIMESTAMP
from ... import type_coerce
from ... import Unicode
from ... import UnicodeText
from ... import util
from ...ext.declarative import declarative_base
from ...orm import Session
from ...util import u
class _LiteralRoundTripFixture(object):
supports_whereclause = True
@testing.provide_metadata
def _literal_round_trip(self, type_, input_, output, filter_=None):
"""test literal rendering """
# for literal, we test the literal render in an INSERT
# into a typed column. we can then SELECT it back as its
# official type; ideally we'd be able to use CAST here
# but MySQL in particular can't CAST fully
t = Table("t", self.metadata, Column("x", type_))
t.create()
with testing.db.connect() as conn:
for value in input_:
ins = (
t.insert()
.values(x=literal(value))
.compile(
dialect=testing.db.dialect,
compile_kwargs=dict(literal_binds=True),
)
)
conn.execute(ins)
if self.supports_whereclause:
stmt = t.select().where(t.c.x == literal(value))
else:
stmt = t.select()
stmt = stmt.compile(
dialect=testing.db.dialect,
compile_kwargs=dict(literal_binds=True),
)
for row in conn.execute(stmt):
value = row[0]
if filter_ is not None:
value = filter_(value)
assert value in output
class _UnicodeFixture(_LiteralRoundTripFixture):
__requires__ = ("unicode_data",)
data = u(
"Alors vous imaginez ma 🐍 surprise, au lever du jour, "
"quand une drôle de petite 🐍 voix ma réveillé. Elle "
"disait: « Sil vous plaît… dessine-moi 🐍 un mouton! »"
)
@property
def supports_whereclause(self):
return config.requirements.expressions_against_unbounded_text.enabled
@classmethod
def define_tables(cls, metadata):
Table(
"unicode_table",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("unicode_data", cls.datatype),
)
def test_round_trip(self):
unicode_table = self.tables.unicode_table
config.db.execute(unicode_table.insert(), {"unicode_data": self.data})
row = config.db.execute(select([unicode_table.c.unicode_data])).first()
eq_(row, (self.data,))
assert isinstance(row[0], util.text_type)
def test_round_trip_executemany(self):
unicode_table = self.tables.unicode_table
config.db.execute(
unicode_table.insert(),
[{"unicode_data": self.data} for i in range(3)],
)
rows = config.db.execute(
select([unicode_table.c.unicode_data])
).fetchall()
eq_(rows, [(self.data,) for i in range(3)])
for row in rows:
assert isinstance(row[0], util.text_type)
def _test_empty_strings(self):
unicode_table = self.tables.unicode_table
config.db.execute(unicode_table.insert(), {"unicode_data": u("")})
row = config.db.execute(select([unicode_table.c.unicode_data])).first()
eq_(row, (u(""),))
def test_literal(self):
self._literal_round_trip(self.datatype, [self.data], [self.data])
def test_literal_non_ascii(self):
self._literal_round_trip(
self.datatype, [util.u("réve🐍 illé")], [util.u("réve🐍 illé")]
)
class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = ("unicode_data",)
__backend__ = True
datatype = Unicode(255)
@requirements.empty_strings_varchar
def test_empty_strings_varchar(self):
self._test_empty_strings()
class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = "unicode_data", "text_type"
__backend__ = True
datatype = UnicodeText()
@requirements.empty_strings_text
def test_empty_strings_text(self):
self._test_empty_strings()
class TextTest(_LiteralRoundTripFixture, fixtures.TablesTest):
__requires__ = ("text_type",)
__backend__ = True
@property
def supports_whereclause(self):
return config.requirements.expressions_against_unbounded_text.enabled
@classmethod
def define_tables(cls, metadata):
Table(
"text_table",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("text_data", Text),
)
def test_text_roundtrip(self):
text_table = self.tables.text_table
config.db.execute(text_table.insert(), {"text_data": "some text"})
row = config.db.execute(select([text_table.c.text_data])).first()
eq_(row, ("some text",))
def test_text_empty_strings(self):
text_table = self.tables.text_table
config.db.execute(text_table.insert(), {"text_data": ""})
row = config.db.execute(select([text_table.c.text_data])).first()
eq_(row, ("",))
def test_literal(self):
self._literal_round_trip(Text, ["some text"], ["some text"])
def test_literal_non_ascii(self):
self._literal_round_trip(
Text, [util.u("réve🐍 illé")], [util.u("réve🐍 illé")]
)
def test_literal_quoting(self):
data = """some 'text' hey "hi there" that's text"""
self._literal_round_trip(Text, [data], [data])
def test_literal_backslashes(self):
data = r"backslash one \ backslash two \\ end"
self._literal_round_trip(Text, [data], [data])
def test_literal_percentsigns(self):
data = r"percent % signs %% percent"
self._literal_round_trip(Text, [data], [data])
class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
@requirements.unbounded_varchar
def test_nolength_string(self):
metadata = MetaData()
foo = Table("foo", metadata, Column("one", String))
foo.create(config.db)
foo.drop(config.db)
def test_literal(self):
# note that in Python 3, this invokes the Unicode
# datatype for the literal part because all strings are unicode
self._literal_round_trip(String(40), ["some text"], ["some text"])
def test_literal_non_ascii(self):
self._literal_round_trip(
String(40), [util.u("réve🐍 illé")], [util.u("réve🐍 illé")]
)
def test_literal_quoting(self):
data = """some 'text' hey "hi there" that's text"""
self._literal_round_trip(String(40), [data], [data])
def test_literal_backslashes(self):
data = r"backslash one \ backslash two \\ end"
self._literal_round_trip(String(40), [data], [data])
class _DateFixture(_LiteralRoundTripFixture):
compare = None
@classmethod
def define_tables(cls, metadata):
Table(
"date_table",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("date_data", cls.datatype),
)
def test_round_trip(self):
date_table = self.tables.date_table
config.db.execute(date_table.insert(), {"date_data": self.data})
row = config.db.execute(select([date_table.c.date_data])).first()
compare = self.compare or self.data
eq_(row, (compare,))
assert isinstance(row[0], type(compare))
def test_null(self):
date_table = self.tables.date_table
config.db.execute(date_table.insert(), {"date_data": None})
row = config.db.execute(select([date_table.c.date_data])).first()
eq_(row, (None,))
@testing.requires.datetime_literals
def test_literal(self):
compare = self.compare or self.data
self._literal_round_trip(self.datatype, [self.data], [compare])
class DateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("datetime",)
__backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("datetime_microseconds",)
__backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
class TimestampMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("timestamp_microseconds",)
__backend__ = True
datatype = TIMESTAMP
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
class TimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("time",)
__backend__ = True
datatype = Time
data = datetime.time(12, 57, 18)
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("time_microseconds",)
__backend__ = True
datatype = Time
data = datetime.time(12, 57, 18, 396)
class DateTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("date",)
__backend__ = True
datatype = Date
data = datetime.date(2012, 10, 15)
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = "date", "date_coerces_from_datetime"
__backend__ = True
datatype = Date
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
compare = datetime.date(2012, 10, 15)
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("datetime_historic",)
__backend__ = True
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("date_historic",)
__backend__ = True
datatype = Date
data = datetime.date(1727, 4, 1)
class IntegerTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
def test_literal(self):
self._literal_round_trip(Integer, [5], [5])
def test_huge_int(self):
self._round_trip(BigInteger, 1376537018368127)
@testing.provide_metadata
def _round_trip(self, datatype, data):
metadata = self.metadata
int_table = Table(
"integer_table",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("integer_data", datatype),
)
metadata.create_all(config.db)
config.db.execute(int_table.insert(), {"integer_data": data})
row = config.db.execute(select([int_table.c.integer_data])).first()
eq_(row, (data,))
if util.py3k:
assert isinstance(row[0], int)
else:
assert isinstance(row[0], (long, int)) # noqa
class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
@testing.provide_metadata
def _do_test(self, type_, input_, output, filter_=None, check_scale=False):
metadata = self.metadata
t = Table("t", metadata, Column("x", type_))
t.create()
t.insert().execute([{"x": x} for x in input_])
result = {row[0] for row in t.select().execute()}
output = set(output)
if filter_:
result = set(filter_(x) for x in result)
output = set(filter_(x) for x in output)
eq_(result, output)
if check_scale:
eq_([str(x) for x in result], [str(x) for x in output])
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_render_literal_numeric(self):
self._literal_round_trip(
Numeric(precision=8, scale=4),
[15.7563, decimal.Decimal("15.7563")],
[decimal.Decimal("15.7563")],
)
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_render_literal_numeric_asfloat(self):
self._literal_round_trip(
Numeric(precision=8, scale=4, asdecimal=False),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
)
def test_render_literal_float(self):
self._literal_round_trip(
Float(4),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
filter_=lambda n: n is not None and round(n, 5) or None,
)
@testing.requires.precision_generic_float_type
def test_float_custom_scale(self):
self._do_test(
Float(None, decimal_return_scale=7, asdecimal=True),
[15.7563827, decimal.Decimal("15.7563827")],
[decimal.Decimal("15.7563827")],
check_scale=True,
)
def test_numeric_as_decimal(self):
self._do_test(
Numeric(precision=8, scale=4),
[15.7563, decimal.Decimal("15.7563")],
[decimal.Decimal("15.7563")],
)
def test_numeric_as_float(self):
self._do_test(
Numeric(precision=8, scale=4, asdecimal=False),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
)
@testing.requires.fetch_null_from_numeric
def test_numeric_null_as_decimal(self):
self._do_test(Numeric(precision=8, scale=4), [None], [None])
@testing.requires.fetch_null_from_numeric
def test_numeric_null_as_float(self):
self._do_test(
Numeric(precision=8, scale=4, asdecimal=False), [None], [None]
)
@testing.requires.floats_to_four_decimals
def test_float_as_decimal(self):
self._do_test(
Float(precision=8, asdecimal=True),
[15.7563, decimal.Decimal("15.7563"), None],
[decimal.Decimal("15.7563"), None],
)
def test_float_as_float(self):
self._do_test(
Float(precision=8),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
filter_=lambda n: n is not None and round(n, 5) or None,
)
def test_float_coerce_round_trip(self):
expr = 15.7563
val = testing.db.scalar(select([literal(expr)]))
eq_(val, expr)
# this does not work in MySQL, see #4036, however we choose not
# to render CAST unconditionally since this is kind of an edge case.
@testing.requires.implicit_decimal_binds
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_decimal_coerce_round_trip(self):
expr = decimal.Decimal("15.7563")
val = testing.db.scalar(select([literal(expr)]))
eq_(val, expr)
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_decimal_coerce_round_trip_w_cast(self):
expr = decimal.Decimal("15.7563")
val = testing.db.scalar(select([cast(expr, Numeric(10, 4))]))
eq_(val, expr)
@testing.requires.precision_numerics_general
def test_precision_decimal(self):
numbers = set(
[
decimal.Decimal("54.234246451650"),
decimal.Decimal("0.004354"),
decimal.Decimal("900.0"),
]
)
self._do_test(Numeric(precision=18, scale=12), numbers, numbers)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal(self):
"""test exceedingly small decimals.
Decimal reports values with E notation when the exponent
is greater than 6.
"""
numbers = set(
[
decimal.Decimal("1E-2"),
decimal.Decimal("1E-3"),
decimal.Decimal("1E-4"),
decimal.Decimal("1E-5"),
decimal.Decimal("1E-6"),
decimal.Decimal("1E-7"),
decimal.Decimal("1E-8"),
decimal.Decimal("0.01000005940696"),
decimal.Decimal("0.00000005940696"),
decimal.Decimal("0.00000000000696"),
decimal.Decimal("0.70000000000696"),
decimal.Decimal("696E-12"),
]
)
self._do_test(Numeric(precision=18, scale=14), numbers, numbers)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal_large(self):
"""test exceedingly large decimals.
"""
numbers = set(
[
decimal.Decimal("4E+8"),
decimal.Decimal("5748E+15"),
decimal.Decimal("1.521E+15"),
decimal.Decimal("00000000000000.1E+12"),
]
)
self._do_test(Numeric(precision=25, scale=2), numbers, numbers)
@testing.requires.precision_numerics_many_significant_digits
def test_many_significant_digits(self):
numbers = set(
[
decimal.Decimal("31943874831932418390.01"),
decimal.Decimal("319438950232418390.273596"),
decimal.Decimal("87673.594069654243"),
]
)
self._do_test(Numeric(precision=38, scale=12), numbers, numbers)
@testing.requires.precision_numerics_retains_significant_digits
def test_numeric_no_decimal(self):
numbers = set([decimal.Decimal("1.000")])
self._do_test(
Numeric(precision=5, scale=3), numbers, numbers, check_scale=True
)
class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"boolean_table",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("value", Boolean),
Column("unconstrained_value", Boolean(create_constraint=False)),
)
def test_render_literal_bool(self):
self._literal_round_trip(Boolean(), [True, False], [True, False])
def test_round_trip(self):
boolean_table = self.tables.boolean_table
config.db.execute(
boolean_table.insert(),
{"id": 1, "value": True, "unconstrained_value": False},
)
row = config.db.execute(
select(
[boolean_table.c.value, boolean_table.c.unconstrained_value]
)
).first()
eq_(row, (True, False))
assert isinstance(row[0], bool)
def test_null(self):
boolean_table = self.tables.boolean_table
config.db.execute(
boolean_table.insert(),
{"id": 1, "value": None, "unconstrained_value": None},
)
row = config.db.execute(
select(
[boolean_table.c.value, boolean_table.c.unconstrained_value]
)
).first()
eq_(row, (None, None))
def test_whereclause(self):
# testing "WHERE <column>" renders a compatible expression
boolean_table = self.tables.boolean_table
with config.db.connect() as conn:
conn.execute(
boolean_table.insert(),
[
{"id": 1, "value": True, "unconstrained_value": True},
{"id": 2, "value": False, "unconstrained_value": False},
],
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(boolean_table.c.value)
),
1,
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(
boolean_table.c.unconstrained_value
)
),
1,
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(~boolean_table.c.value)
),
2,
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(
~boolean_table.c.unconstrained_value
)
),
2,
)
class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
__requires__ = ("json_type",)
__backend__ = True
datatype = JSON
data1 = {"key1": "value1", "key2": "value2"}
data2 = {
"Key 'One'": "value1",
"key two": "value2",
"key three": "value ' three '",
}
data3 = {
"key1": [1, 2, 3],
"key2": ["one", "two", "three"],
"key3": [{"four": "five"}, {"six": "seven"}],
}
data4 = ["one", "two", "three"]
data5 = {
"nested": {
"elem1": [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}],
"elem2": {"elem3": {"elem4": "elem5"}},
}
}
data6 = {"a": 5, "b": "some value", "c": {"foo": "bar"}}
@classmethod
def define_tables(cls, metadata):
Table(
"data_table",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(30), nullable=False),
Column("data", cls.datatype),
Column("nulldata", cls.datatype(none_as_null=True)),
)
def test_round_trip_data1(self):
self._test_round_trip(self.data1)
def _test_round_trip(self, data_element):
data_table = self.tables.data_table
config.db.execute(
data_table.insert(), {"name": "row1", "data": data_element}
)
row = config.db.execute(select([data_table.c.data])).first()
eq_(row, (data_element,))
def test_round_trip_custom_json(self):
data_table = self.tables.data_table
data_element = {"key1": "data1"}
js = mock.Mock(side_effect=json.dumps)
jd = mock.Mock(side_effect=json.loads)
engine = engines.testing_engine(
options=dict(json_serializer=js, json_deserializer=jd)
)
# support sqlite :memory: database...
data_table.create(engine, checkfirst=True)
engine.execute(
data_table.insert(), {"name": "row1", "data": data_element}
)
row = engine.execute(select([data_table.c.data])).first()
eq_(row, (data_element,))
eq_(js.mock_calls, [mock.call(data_element)])
eq_(jd.mock_calls, [mock.call(json.dumps(data_element))])
def test_round_trip_none_as_sql_null(self):
col = self.tables.data_table.c["nulldata"]
with config.db.connect() as conn:
conn.execute(
self.tables.data_table.insert(), {"name": "r1", "data": None}
)
eq_(
conn.scalar(
select([self.tables.data_table.c.name]).where(
col.is_(null())
)
),
"r1",
)
eq_(conn.scalar(select([col])), None)
def test_round_trip_json_null_as_json_null(self):
col = self.tables.data_table.c["data"]
with config.db.connect() as conn:
conn.execute(
self.tables.data_table.insert(),
{"name": "r1", "data": JSON.NULL},
)
eq_(
conn.scalar(
select([self.tables.data_table.c.name]).where(
cast(col, String) == "null"
)
),
"r1",
)
eq_(conn.scalar(select([col])), None)
def test_round_trip_none_as_json_null(self):
col = self.tables.data_table.c["data"]
with config.db.connect() as conn:
conn.execute(
self.tables.data_table.insert(), {"name": "r1", "data": None}
)
eq_(
conn.scalar(
select([self.tables.data_table.c.name]).where(
cast(col, String) == "null"
)
),
"r1",
)
eq_(conn.scalar(select([col])), None)
def _criteria_fixture(self):
config.db.execute(
self.tables.data_table.insert(),
[
{"name": "r1", "data": self.data1},
{"name": "r2", "data": self.data2},
{"name": "r3", "data": self.data3},
{"name": "r4", "data": self.data4},
{"name": "r5", "data": self.data5},
{"name": "r6", "data": self.data6},
],
)
def _test_index_criteria(self, crit, expected, test_literal=True):
self._criteria_fixture()
with config.db.connect() as conn:
stmt = select([self.tables.data_table.c.name]).where(crit)
eq_(conn.scalar(stmt), expected)
if test_literal:
literal_sql = str(
stmt.compile(
config.db, compile_kwargs={"literal_binds": True}
)
)
eq_(conn.scalar(literal_sql), expected)
def test_crit_spaces_in_key(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c["data"]
# limit the rows here to avoid PG error
# "cannot extract field from a non-object", which is
# fixed in 9.4 but may exist in 9.3
self._test_index_criteria(
and_(
name.in_(["r1", "r2", "r3"]),
cast(col["key two"], String) == '"value2"',
),
"r2",
)
@config.requirements.json_array_indexes
def test_crit_simple_int(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c["data"]
# limit the rows here to avoid PG error
# "cannot extract array element from a non-array", which is
# fixed in 9.4 but may exist in 9.3
self._test_index_criteria(
and_(name == "r4", cast(col[1], String) == '"two"'), "r4"
)
def test_crit_mixed_path(self):
col = self.tables.data_table.c["data"]
self._test_index_criteria(
cast(col[("key3", 1, "six")], String) == '"seven"', "r3"
)
def test_crit_string_path(self):
col = self.tables.data_table.c["data"]
self._test_index_criteria(
cast(col[("nested", "elem2", "elem3", "elem4")], String)
== '"elem5"',
"r5",
)
def test_crit_against_string_basic(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c["data"]
self._test_index_criteria(
and_(name == "r6", cast(col["b"], String) == '"some value"'), "r6"
)
def test_crit_against_string_coerce_type(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c["data"]
self._test_index_criteria(
and_(
name == "r6",
cast(col["b"], String) == type_coerce("some value", JSON),
),
"r6",
test_literal=False,
)
def test_crit_against_int_basic(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c["data"]
self._test_index_criteria(
and_(name == "r6", cast(col["a"], String) == "5"), "r6"
)
def test_crit_against_int_coerce_type(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c["data"]
self._test_index_criteria(
and_(name == "r6", cast(col["a"], String) == type_coerce(5, JSON)),
"r6",
test_literal=False,
)
def test_unicode_round_trip(self):
with config.db.connect() as conn:
conn.execute(
self.tables.data_table.insert(),
{
"name": "r1",
"data": {
util.u("réve🐍 illé"): util.u("réve🐍 illé"),
"data": {"k1": util.u("drôl🐍e")},
},
},
)
eq_(
conn.scalar(select([self.tables.data_table.c.data])),
{
util.u("réve🐍 illé"): util.u("réve🐍 illé"),
"data": {"k1": util.u("drôl🐍e")},
},
)
def test_eval_none_flag_orm(self):
Base = declarative_base()
class Data(Base):
__table__ = self.tables.data_table
s = Session(testing.db)
d1 = Data(name="d1", data=None, nulldata=None)
s.add(d1)
s.commit()
s.bulk_insert_mappings(
Data, [{"name": "d2", "data": None, "nulldata": None}]
)
eq_(
s.query(
cast(self.tables.data_table.c.data, String()),
cast(self.tables.data_table.c.nulldata, String),
)
.filter(self.tables.data_table.c.name == "d1")
.first(),
("null", None),
)
eq_(
s.query(
cast(self.tables.data_table.c.data, String()),
cast(self.tables.data_table.c.nulldata, String),
)
.filter(self.tables.data_table.c.name == "d2")
.first(),
("null", None),
)
__all__ = (
"UnicodeVarcharTest",
"UnicodeTextTest",
"JSONTest",
"DateTest",
"DateTimeTest",
"TextTest",
"NumericTest",
"IntegerTest",
"DateTimeHistoricTest",
"DateTimeCoercedToDateTimeTest",
"TimeMicrosecondsTest",
"TimestampMicrosecondsTest",
"TimeTest",
"DateTimeMicrosecondsTest",
"DateHistoricTest",
"StringTest",
"BooleanTest",
)

View File

@@ -0,0 +1,56 @@
from .. import config
from .. import fixtures
from ..assertions import eq_
from ..schema import Column
from ..schema import Table
from ... import Integer
from ... import String
class SimpleUpdateDeleteTest(fixtures.TablesTest):
run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"plain_pk",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.plain_pk.insert(),
[
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
],
)
def test_update(self):
t = self.tables.plain_pk
r = config.db.execute(t.update().where(t.c.id == 2), data="d2_new")
assert not r.is_insert
assert not r.returns_rows
eq_(
config.db.execute(t.select().order_by(t.c.id)).fetchall(),
[(1, "d1"), (2, "d2_new"), (3, "d3")],
)
def test_delete(self):
t = self.tables.plain_pk
r = config.db.execute(t.delete().where(t.c.id == 2))
assert not r.is_insert
assert not r.returns_rows
eq_(
config.db.execute(t.select().order_by(t.c.id)).fetchall(),
[(1, "d1"), (3, "d3")],
)
__all__ = ("SimpleUpdateDeleteTest",)