Serotonin Storm

source>south>db>mysql.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from django.db import connection
from django.conf import settings
from south.db import generic

class DatabaseOperations(generic.DatabaseOperations):

    """
    MySQL implementation of database operations.
    """
    
    backend_name = "mysql"
    alter_string_set_type = ''
    alter_string_set_null = 'MODIFY %(column)s %(type)s NULL;'
    alter_string_drop_null = 'MODIFY %(column)s %(type)s NOT NULL;'
    drop_index_string = 'DROP INDEX %(index_name)s ON %(table_name)s'
    drop_primary_key_string = "ALTER TABLE %(table)s DROP PRIMARY KEY"
    allows_combined_alters = False
    has_ddl_transactions = False
    has_check_constraints = False
    delete_unique_sql = "ALTER TABLE %s DROP INDEX %s"
    
    
    def connection_init(self):
        """
        Run before any SQL to let database-specific config be sent as a command,
        e.g. which storage engine (MySQL) or transaction serialisability level.
        """
        if hasattr(settings, "DATABASE_STORAGE_ENGINE") and \
           settings.DATABASE_STORAGE_ENGINE:
            cursor = connection.cursor()
            cursor.execute("SET storage_engine=%s;" % settings.DATABASE_STORAGE_ENGINE)

    
    def rename_column(self, table_name, old, new):
        if old == new or self.dry_run:
            return []
        
        qn = connection.ops.quote_name
        
        rows = [x for x in self.execute('DESCRIBE %s' % (qn(table_name),)) if x[0] == old]
        
        if not rows:
            raise ValueError("No column '%s' in '%s'." % (old, table_name))
        
        params = (
            qn(table_name),
            qn(old),
            qn(new),
            rows[0][1],
            rows[0][2] == "YES" and "NULL" or "NOT NULL",
            rows[0][4] and "DEFAULT " or "",
            rows[0][4] and "%s" or "",
            rows[0][5] or "",
        )
        
        sql = 'ALTER TABLE %s CHANGE COLUMN %s %s %s %s %s %s %s;' % params
        
        if rows[0][4]:
            self.execute(sql, (rows[0][4],))
        else:
            self.execute(sql)
    
    
    def delete_column(self, table_name, name):
        qn = connection.ops.quote_name
        db_name = settings.DATABASE_NAME
        
        # See if there is a foreign key on this column
        cursor = connection.cursor()
        get_fkeyname_query = "SELECT tc.constraint_name FROM \
                              information_schema.table_constraints tc, \
                              information_schema.key_column_usage kcu \
                              WHERE tc.table_name=kcu.table_name \
                              AND tc.table_schema=kcu.table_schema \
                              AND tc.constraint_name=kcu.constraint_name \
                              AND tc.constraint_type='FOREIGN KEY' \
                              AND tc.table_schema='%s' \
                              AND tc.table_name='%s' \
                              AND kcu.column_name='%s'"

        result = cursor.execute(get_fkeyname_query % (db_name, table_name, name))
        
        # if a foreign key exists, we need to delete it first
        if result > 0:
            assert result == 1 #we should only have one result
            fkey_name = cursor.fetchone()[0]
            drop_query = "ALTER TABLE %s DROP FOREIGN KEY %s"
            cursor.execute(drop_query % (qn(table_name), qn(fkey_name)))

        super(DatabaseOperations, self).delete_column(table_name, name)

    
    def rename_table(self, old_table_name, table_name):
        """
        Renames the table 'old_table_name' to 'table_name'.
        """
        if old_table_name == table_name:
            # No Operation
            return
        qn = connection.ops.quote_name
        params = (qn(old_table_name), qn(table_name))
        self.execute('RENAME TABLE %s TO %s;' % params)
    
    
    def _constraints_affecting_columns(self, table_name, columns, type="UNIQUE"):
        """
        Gets the names of the constraints affecting the given columns.
        """
        
        if self.dry_run:
            raise ValueError("Cannot get constraints for columns during a dry run.")
        
        columns = set(columns)
        db_name = settings.DATABASE_NAME
        # First, load all constraint->col mappings for this table.
        rows = self.execute("""
            SELECT kc.constraint_name, kc.column_name
            FROM information_schema.key_column_usage AS kc
            JOIN information_schema.table_constraints AS c ON
                kc.table_schema = c.table_schema AND
                kc.table_name = c.table_name AND
                kc.constraint_name = c.constraint_name
            WHERE
                kc.table_schema = %s AND
                kc.table_catalog IS NULL AND
                kc.table_name = %s AND
                c.constraint_type = %s
        """, [db_name, table_name, type])
        # Load into a dict
        mapping = {}
        for constraint, column in rows:
            mapping.setdefault(constraint, set())
            mapping[constraint].add(column)
        # Find ones affecting these columns
        for constraint, itscols in mapping.items():
            if itscols == columns:
                yield constraint
    
    
    def _field_sanity(self, field):
        """
        This particular override stops us sending DEFAULTs for BLOB/TEXT columns.
        """
        if field.db_type().upper() in ["BLOB", "TEXT", "LONGTEXT"]:
            field._suppress_default = True
        return field