schema.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import copy
  2. import datetime
  3. import re
  4. from django.db.backends.base.schema import BaseDatabaseSchemaEditor
  5. from django.db.utils import DatabaseError
  6. class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
  7. sql_create_column = "ALTER TABLE %(table)s ADD %(column)s %(definition)s"
  8. sql_alter_column_type = "MODIFY %(column)s %(type)s"
  9. sql_alter_column_null = "MODIFY %(column)s NULL"
  10. sql_alter_column_not_null = "MODIFY %(column)s NOT NULL"
  11. sql_alter_column_default = "MODIFY %(column)s DEFAULT %(default)s"
  12. sql_alter_column_no_default = "MODIFY %(column)s DEFAULT NULL"
  13. sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
  14. sql_create_column_inline_fk = 'CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s'
  15. sql_delete_table = "DROP TABLE %(table)s CASCADE CONSTRAINTS"
  16. sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
  17. def quote_value(self, value):
  18. if isinstance(value, (datetime.date, datetime.time, datetime.datetime)):
  19. return "'%s'" % value
  20. elif isinstance(value, str):
  21. return "'%s'" % value.replace("\'", "\'\'").replace('%', '%%')
  22. elif isinstance(value, (bytes, bytearray, memoryview)):
  23. return "'%s'" % value.hex()
  24. elif isinstance(value, bool):
  25. return "1" if value else "0"
  26. else:
  27. return str(value)
  28. def remove_field(self, model, field):
  29. # If the column is an identity column, drop the identity before
  30. # removing the field.
  31. if self._is_identity_column(model._meta.db_table, field.column):
  32. self._drop_identity(model._meta.db_table, field.column)
  33. super().remove_field(model, field)
  34. def delete_model(self, model):
  35. # Run superclass action
  36. super().delete_model(model)
  37. # Clean up manually created sequence.
  38. self.execute("""
  39. DECLARE
  40. i INTEGER;
  41. BEGIN
  42. SELECT COUNT(1) INTO i FROM USER_SEQUENCES
  43. WHERE SEQUENCE_NAME = '%(sq_name)s';
  44. IF i = 1 THEN
  45. EXECUTE IMMEDIATE 'DROP SEQUENCE "%(sq_name)s"';
  46. END IF;
  47. END;
  48. /""" % {'sq_name': self.connection.ops._get_no_autofield_sequence_name(model._meta.db_table)})
  49. def alter_field(self, model, old_field, new_field, strict=False):
  50. try:
  51. super().alter_field(model, old_field, new_field, strict)
  52. except DatabaseError as e:
  53. description = str(e)
  54. # If we're changing type to an unsupported type we need a
  55. # SQLite-ish workaround
  56. if 'ORA-22858' in description or 'ORA-22859' in description:
  57. self._alter_field_type_workaround(model, old_field, new_field)
  58. # If an identity column is changing to a non-numeric type, drop the
  59. # identity first.
  60. elif 'ORA-30675' in description:
  61. self._drop_identity(model._meta.db_table, old_field.column)
  62. self.alter_field(model, old_field, new_field, strict)
  63. # If a primary key column is changing to an identity column, drop
  64. # the primary key first.
  65. elif 'ORA-30673' in description and old_field.primary_key:
  66. self._delete_primary_key(model, strict=True)
  67. self._alter_field_type_workaround(model, old_field, new_field)
  68. else:
  69. raise
  70. def _alter_field_type_workaround(self, model, old_field, new_field):
  71. """
  72. Oracle refuses to change from some type to other type.
  73. What we need to do instead is:
  74. - Add a nullable version of the desired field with a temporary name. If
  75. the new column is an auto field, then the temporary column can't be
  76. nullable.
  77. - Update the table to transfer values from old to new
  78. - Drop old column
  79. - Rename the new column and possibly drop the nullable property
  80. """
  81. # Make a new field that's like the new one but with a temporary
  82. # column name.
  83. new_temp_field = copy.deepcopy(new_field)
  84. new_temp_field.null = (new_field.get_internal_type() not in ('AutoField', 'BigAutoField', 'SmallAutoField'))
  85. new_temp_field.column = self._generate_temp_name(new_field.column)
  86. # Add it
  87. self.add_field(model, new_temp_field)
  88. # Explicit data type conversion
  89. # https://docs.oracle.com/en/database/oracle/oracle-database/18/sqlrf
  90. # /Data-Type-Comparison-Rules.html#GUID-D0C5A47E-6F93-4C2D-9E49-4F2B86B359DD
  91. new_value = self.quote_name(old_field.column)
  92. old_type = old_field.db_type(self.connection)
  93. if re.match('^N?CLOB', old_type):
  94. new_value = "TO_CHAR(%s)" % new_value
  95. old_type = 'VARCHAR2'
  96. if re.match('^N?VARCHAR2', old_type):
  97. new_internal_type = new_field.get_internal_type()
  98. if new_internal_type == 'DateField':
  99. new_value = "TO_DATE(%s, 'YYYY-MM-DD')" % new_value
  100. elif new_internal_type == 'DateTimeField':
  101. new_value = "TO_TIMESTAMP(%s, 'YYYY-MM-DD HH24:MI:SS.FF')" % new_value
  102. elif new_internal_type == 'TimeField':
  103. # TimeField are stored as TIMESTAMP with a 1900-01-01 date part.
  104. new_value = "TO_TIMESTAMP(CONCAT('1900-01-01 ', %s), 'YYYY-MM-DD HH24:MI:SS.FF')" % new_value
  105. # Transfer values across
  106. self.execute("UPDATE %s set %s=%s" % (
  107. self.quote_name(model._meta.db_table),
  108. self.quote_name(new_temp_field.column),
  109. new_value,
  110. ))
  111. # Drop the old field
  112. self.remove_field(model, old_field)
  113. # Rename and possibly make the new field NOT NULL
  114. super().alter_field(model, new_temp_field, new_field)
  115. def normalize_name(self, name):
  116. """
  117. Get the properly shortened and uppercased identifier as returned by
  118. quote_name() but without the quotes.
  119. """
  120. nn = self.quote_name(name)
  121. if nn[0] == '"' and nn[-1] == '"':
  122. nn = nn[1:-1]
  123. return nn
  124. def _generate_temp_name(self, for_name):
  125. """Generate temporary names for workarounds that need temp columns."""
  126. suffix = hex(hash(for_name)).upper()[1:]
  127. return self.normalize_name(for_name + "_" + suffix)
  128. def prepare_default(self, value):
  129. return self.quote_value(value)
  130. def _field_should_be_indexed(self, model, field):
  131. create_index = super()._field_should_be_indexed(model, field)
  132. db_type = field.db_type(self.connection)
  133. if db_type is not None and db_type.lower() in self.connection._limited_data_types:
  134. return False
  135. return create_index
  136. def _unique_should_be_added(self, old_field, new_field):
  137. return (
  138. super()._unique_should_be_added(old_field, new_field) and
  139. not self._field_became_primary_key(old_field, new_field)
  140. )
  141. def _is_identity_column(self, table_name, column_name):
  142. with self.connection.cursor() as cursor:
  143. cursor.execute("""
  144. SELECT
  145. CASE WHEN identity_column = 'YES' THEN 1 ELSE 0 END
  146. FROM user_tab_cols
  147. WHERE table_name = %s AND
  148. column_name = %s
  149. """, [self.normalize_name(table_name), self.normalize_name(column_name)])
  150. row = cursor.fetchone()
  151. return row[0] if row else False
  152. def _drop_identity(self, table_name, column_name):
  153. self.execute('ALTER TABLE %(table)s MODIFY %(column)s DROP IDENTITY' % {
  154. 'table': self.quote_name(table_name),
  155. 'column': self.quote_name(column_name),
  156. })