diff --git a/migrate/changeset/databases/sqlite.py b/migrate/changeset/databases/sqlite.py index a601593..8a5ba90 100644 --- a/migrate/changeset/databases/sqlite.py +++ b/migrate/changeset/databases/sqlite.py @@ -8,8 +8,10 @@ try: # Python 3 except ImportError: # Python 2 from UserDict import DictMixin from copy import copy +import re from sqlalchemy.databases import sqlite as sa_base +from sqlalchemy.schema import UniqueConstraint from migrate import exceptions from migrate.changeset import ansisql @@ -27,7 +29,38 @@ class SQLiteCommon(object): class SQLiteHelper(SQLiteCommon): - def recreate_table(self,table,column=None,delta=None): + def _get_unique_constraints(self, table): + """Retrieve information about existing unique constraints of the table + + This feature is needed for recreate_table() to work properly. + """ + + data = table.metadata.bind.execute( + """SELECT sql + FROM sqlite_master + WHERE + type='table' AND + name=:table_name""", + table_name=table.name + ).fetchone()[0] + + UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)" + constraints = [] + for name, cols in re.findall(UNIQUE_PATTERN, data): + # Filter out any columns that were dropped from the table. + columns = [] + for c in cols.split(","): + if c in table.columns: + # There was a bug in reflection of SQLite columns with + # reserved identifiers as names (SQLite can return them + # wrapped with double quotes), so strip double quotes. + columns.extend(c.strip(' "')) + if columns: + constraints.extend(UniqueConstraint(*columns, name=name)) + return constraints + + def recreate_table(self, table, column=None, delta=None, + omit_uniques=None): table_name = self.preparer.format_table(table) # we remove all indexes so as not to have @@ -35,6 +68,15 @@ class SQLiteHelper(SQLiteCommon): for index in table.indexes: index.drop() + # reflect existing unique constraints + for uc in self._get_unique_constraints(table): + table.append_constraint(uc) + # omit given unique constraints when creating a new table if required + table.constraints = set([ + cons for cons in table.constraints + if omit_uniques is None or cons.name not in omit_uniques + ]) + self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) self.execute() @@ -126,9 +168,12 @@ class SQLiteConstraintGenerator(ansisql.ANSIConstraintGenerator, SQLiteHelper, S class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, - SQLiteCommon, + SQLiteHelper, ansisql.ANSIConstraintCommon): + def _modify_table(self, table, column, delta): + return 'INSERT INTO %(table_name)s SELECT * from migration_tmp' + def visit_migrate_primary_key_constraint(self, constraint): tmpl = "DROP INDEX %s " name = self.get_constraint_name(constraint) @@ -143,7 +188,7 @@ class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, self._not_supported('ALTER TABLE DROP CONSTRAINT') def visit_migrate_unique_constraint(self, *p, **k): - self._not_supported('ALTER TABLE DROP CONSTRAINT') + self.recreate_table(p[0].table, omit_uniques=[p[0].name]) # TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index diff --git a/migrate/tests/changeset/test_constraint.py b/migrate/tests/changeset/test_constraint.py index d27554e..6421206 100644 --- a/migrate/tests/changeset/test_constraint.py +++ b/migrate/tests/changeset/test_constraint.py @@ -274,7 +274,7 @@ class TestAutoname(CommonTestConstraint): self.table.insert(values={'id': 2, 'fkey': 2}).execute() self.table.insert(values={'id': 1, 'fkey': 3}).execute() - @fixture.usedb(not_supported=['oracle', 'sqlite']) + @fixture.usedb(not_supported=['oracle']) def test_autoname_unique(self): """UniqueConstraints can guess their name if None is given""" cons = UniqueConstraint(self.table.c.fkey)