2008-02-06 18:39:07 +00:00

127 lines
4.7 KiB
Python

import sqlalchemy
from sqlalchemy import schema
class ConstraintChangeset(object):
def _normalize_columns(self,cols,fullname=False):
"""Given: column objects or names; return col names and (maybe) a table"""
colnames = []
table = None
for col in cols:
if isinstance(col,schema.Column):
if col.table is not None and table is None:
table = col.table
if fullname:
col = '.'.join((col.table.name,col.name))
else:
col = col.name
colnames.append(col)
return colnames,table
def create(self,engine=None):
if engine is None:
engine = self.engine
engine.create(self)
def drop(self,engine=None):
if engine is None:
engine = self.engine
#if self.name is None:
# self.name = self.autoname()
engine.drop(self)
def _derived_metadata(self):
return self.table._derived_metadata()
def accept_schema_visitor(self,visitor,*p,**k):
raise NotImplementedError()
def _accept_schema_visitor(self,visitor,func,*p,**k):
"""Call the visitor only if it defines the given function"""
try:
func = getattr(visitor,func)
except AttributeError:
return
return func(self)
def autoname(self):
raise NotImplementedError()
def _engine_run_visitor(engine,visitorcallable,element,**kwargs):
conn = engine.connect()
try:
element.accept_schema_visitor(visitorcallable(conn))
finally:
conn.close()
class PrimaryKeyConstraint(ConstraintChangeset,schema.PrimaryKeyConstraint):
def __init__(self,*cols,**kwargs):
colnames,table = self._normalize_columns(cols)
table = kwargs.pop('table',table)
super(PrimaryKeyConstraint,self).__init__(*colnames,**kwargs)
if table is not None:
self._set_parent(table)
def _set_parent(self,table):
self.table = table
return super(ConstraintChangeset,self)._set_parent(table)
def create(self, *args, **kwargs):
from migrate.changeset.databases.visitor import get_engine_visitor
visitorcallable = get_engine_visitor(self.table.bind,'constraintgenerator')
_engine_run_visitor (self.table.bind, visitorcallable, self)
def autoname(self):
"""Mimic the database's automatic constraint names"""
ret = "%(table)s_pkey"%dict(
table=self.table.name,
)
return ret
def drop(self,*args,**kwargs):
from migrate.changeset.databases.visitor import get_engine_visitor
visitorcallable = get_engine_visitor(self.table.bind,'constraintdropper')
_engine_run_visitor (self.table.bind, visitorcallable, self)
self.columns.clear()
return self
def accept_schema_visitor(self,visitor,*p,**k):
#return visitor.visit_constraint(self,*p,**k)
func = 'visit_migrate_primary_key_constraint'
return self._accept_schema_visitor(visitor,func,*p,**k)
class ForeignKeyConstraint(ConstraintChangeset,schema.ForeignKeyConstraint):
def __init__(self,columns,refcolumns,*p,**k):
colnames,table = self._normalize_columns(columns)
table = k.pop('table',table)
refcolnames,reftable = self._normalize_columns(refcolumns,fullname=True)
super(ForeignKeyConstraint,self).__init__(colnames,refcolnames,*p,**k)
if table is not None:
self._set_parent(table)
def _get_referenced(self):
return [e.column for e in self.elements]
referenced = property(_get_referenced)
def _get_reftable(self):
return self.referenced[0].table
reftable = property(_get_reftable)
def autoname(self):
"""Mimic the database's automatic constraint names"""
ret = "%(table)s_%(reftable)s_fkey"%dict(
table=self.table.name,
reftable=self.reftable.name,
)
return ret
def create(self, *args, **kwargs):
from migrate.changeset.databases.visitor import get_engine_visitor
visitorcallable = get_engine_visitor(self.table.bind,'constraintgenerator')
_engine_run_visitor (self.table.bind, visitorcallable, self)
return self
def drop(self,*args,**kwargs):
from migrate.changeset.databases.visitor import get_engine_visitor
visitorcallable = get_engine_visitor(self.table.bind,'constraintdropper')
_engine_run_visitor (self.table.bind, visitorcallable, self)
self.columns.clear()
return self
def accept_schema_visitor(self,visitor,*p,**k):
func = 'visit_migrate_foreign_key_constraint'
return self._accept_schema_visitor(visitor,func,*p,**k)