diff --git a/migrate/changeset/databases/sqlite.py b/migrate/changeset/databases/sqlite.py index 8a5ba90..92d42f2 100644 --- a/migrate/changeset/databases/sqlite.py +++ b/migrate/changeset/databases/sqlite.py @@ -11,6 +11,7 @@ from copy import copy import re from sqlalchemy.databases import sqlite as sa_base +from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.schema import UniqueConstraint from migrate import exceptions @@ -29,8 +30,24 @@ class SQLiteCommon(object): class SQLiteHelper(SQLiteCommon): - def _get_unique_constraints(self, table): - """Retrieve information about existing unique constraints of the table + def _filter_columns(self, cols, table): + """Splits the string of columns and returns those only in the table. + + :param cols: comma-delimited string of table columns + :param table: the table to check + :return: list of columns in the table + """ + columns = [] + for c in cols.split(","): + if c in table.columns: + # There was a bug in reflection of SQLite columns with + # reserved identifiers as names (SQLite can return them + # wrapped with double quotes), so strip double quotes. + columns.extend(c.strip(' "')) + return columns + + def _get_constraints(self, table): + """Retrieve information about existing constraints of the table This feature is needed for recreate_table() to work properly. """ @@ -48,19 +65,21 @@ class SQLiteHelper(SQLiteCommon): constraints = [] for name, cols in re.findall(UNIQUE_PATTERN, data): # Filter out any columns that were dropped from the table. - columns = [] - for c in cols.split(","): - if c in table.columns: - # There was a bug in reflection of SQLite columns with - # reserved identifiers as names (SQLite can return them - # wrapped with double quotes), so strip double quotes. - columns.extend(c.strip(' "')) + columns = self._filter_columns(cols, table) if columns: constraints.extend(UniqueConstraint(*columns, name=name)) + + FKEY_PATTERN = "CONSTRAINT (\w+) FOREIGN KEY \(([^\)]+)\)" + for name, cols in re.findall(FKEY_PATTERN, data): + # Filter out any columns that were dropped from the table. + columns = self._filter_columns(cols, table) + if columns: + constraints.extend(ForeignKeyConstraint(*columns, name=name)) + return constraints def recreate_table(self, table, column=None, delta=None, - omit_uniques=None): + omit_constraints=None): table_name = self.preparer.format_table(table) # we remove all indexes so as not to have @@ -68,13 +87,13 @@ class SQLiteHelper(SQLiteCommon): for index in table.indexes: index.drop() - # reflect existing unique constraints - for uc in self._get_unique_constraints(table): - table.append_constraint(uc) - # omit given unique constraints when creating a new table if required + # reflect existing constraints + for constraint in self._get_constraints(table): + table.append_constraint(constraint) + # omit given constraints when creating a new table if required table.constraints = set([ cons for cons in table.constraints - if omit_uniques is None or cons.name not in omit_uniques + if omit_constraints is None or cons.name not in omit_constraints ]) self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) @@ -182,13 +201,13 @@ class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, self.execute() def visit_migrate_foreign_key_constraint(self, *p, **k): - self._not_supported('ALTER TABLE DROP CONSTRAINT') + self.recreate_table(p[0].table, omit_constraints=[p[0].name]) def visit_migrate_check_constraint(self, *p, **k): self._not_supported('ALTER TABLE DROP CONSTRAINT') def visit_migrate_unique_constraint(self, *p, **k): - self.recreate_table(p[0].table, omit_uniques=[p[0].name]) + self.recreate_table(p[0].table, omit_constraints=[p[0].name]) # TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index diff --git a/migrate/tests/changeset/test_constraint.py b/migrate/tests/changeset/test_constraint.py index 6421206..1faf4cc 100644 --- a/migrate/tests/changeset/test_constraint.py +++ b/migrate/tests/changeset/test_constraint.py @@ -71,7 +71,7 @@ class TestConstraint(CommonTestConstraint): self.assertTrue(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint)) return pk - @fixture.usedb(not_supported='sqlite') + @fixture.usedb() def test_define_fk(self): """FK constraints can be defined, created, and dropped""" # FK target must be unique