indexes.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from django.db.backends.utils import names_digest, split_identifier
  2. from django.db.models.query_utils import Q
  3. from django.db.models.sql import Query
  4. __all__ = ['Index']
  5. class Index:
  6. suffix = 'idx'
  7. # The max length of the name of the index (restricted to 30 for
  8. # cross-database compatibility with Oracle)
  9. max_name_length = 30
  10. def __init__(self, *, fields=(), name=None, db_tablespace=None, opclasses=(), condition=None):
  11. if opclasses and not name:
  12. raise ValueError('An index must be named to use opclasses.')
  13. if not isinstance(condition, (type(None), Q)):
  14. raise ValueError('Index.condition must be a Q instance.')
  15. if condition and not name:
  16. raise ValueError('An index must be named to use condition.')
  17. if not isinstance(fields, (list, tuple)):
  18. raise ValueError('Index.fields must be a list or tuple.')
  19. if not isinstance(opclasses, (list, tuple)):
  20. raise ValueError('Index.opclasses must be a list or tuple.')
  21. if opclasses and len(fields) != len(opclasses):
  22. raise ValueError('Index.fields and Index.opclasses must have the same number of elements.')
  23. if not fields:
  24. raise ValueError('At least one field is required to define an index.')
  25. self.fields = list(fields)
  26. # A list of 2-tuple with the field name and ordering ('' or 'DESC').
  27. self.fields_orders = [
  28. (field_name[1:], 'DESC') if field_name.startswith('-') else (field_name, '')
  29. for field_name in self.fields
  30. ]
  31. self.name = name or ''
  32. self.db_tablespace = db_tablespace
  33. self.opclasses = opclasses
  34. self.condition = condition
  35. def _get_condition_sql(self, model, schema_editor):
  36. if self.condition is None:
  37. return None
  38. query = Query(model=model)
  39. where = query.build_where(self.condition)
  40. compiler = query.get_compiler(connection=schema_editor.connection)
  41. sql, params = where.as_sql(compiler, schema_editor.connection)
  42. return sql % tuple(schema_editor.quote_value(p) for p in params)
  43. def create_sql(self, model, schema_editor, using='', **kwargs):
  44. fields = [model._meta.get_field(field_name) for field_name, _ in self.fields_orders]
  45. col_suffixes = [order[1] for order in self.fields_orders]
  46. condition = self._get_condition_sql(model, schema_editor)
  47. return schema_editor._create_index_sql(
  48. model, fields, name=self.name, using=using, db_tablespace=self.db_tablespace,
  49. col_suffixes=col_suffixes, opclasses=self.opclasses, condition=condition,
  50. **kwargs,
  51. )
  52. def remove_sql(self, model, schema_editor, **kwargs):
  53. return schema_editor._delete_index_sql(model, self.name, **kwargs)
  54. def deconstruct(self):
  55. path = '%s.%s' % (self.__class__.__module__, self.__class__.__name__)
  56. path = path.replace('django.db.models.indexes', 'django.db.models')
  57. kwargs = {'fields': self.fields, 'name': self.name}
  58. if self.db_tablespace is not None:
  59. kwargs['db_tablespace'] = self.db_tablespace
  60. if self.opclasses:
  61. kwargs['opclasses'] = self.opclasses
  62. if self.condition:
  63. kwargs['condition'] = self.condition
  64. return (path, (), kwargs)
  65. def clone(self):
  66. """Create a copy of this Index."""
  67. _, _, kwargs = self.deconstruct()
  68. return self.__class__(**kwargs)
  69. def set_name_with_model(self, model):
  70. """
  71. Generate a unique name for the index.
  72. The name is divided into 3 parts - table name (12 chars), field name
  73. (8 chars) and unique hash + suffix (10 chars). Each part is made to
  74. fit its size by truncating the excess length.
  75. """
  76. _, table_name = split_identifier(model._meta.db_table)
  77. column_names = [model._meta.get_field(field_name).column for field_name, order in self.fields_orders]
  78. column_names_with_order = [
  79. (('-%s' if order else '%s') % column_name)
  80. for column_name, (field_name, order) in zip(column_names, self.fields_orders)
  81. ]
  82. # The length of the parts of the name is based on the default max
  83. # length of 30 characters.
  84. hash_data = [table_name] + column_names_with_order + [self.suffix]
  85. self.name = '%s_%s_%s' % (
  86. table_name[:11],
  87. column_names[0][:7],
  88. '%s_%s' % (names_digest(*hash_data, length=6), self.suffix),
  89. )
  90. assert len(self.name) <= self.max_name_length, (
  91. 'Index too long for multiple database support. Is self.suffix '
  92. 'longer than 3 characters?'
  93. )
  94. if self.name[0] == '_' or self.name[0].isdigit():
  95. self.name = 'D%s' % self.name[1:]
  96. def __repr__(self):
  97. return "<%s: fields='%s'%s>" % (
  98. self.__class__.__name__, ', '.join(self.fields),
  99. '' if self.condition is None else ', condition=%s' % self.condition,
  100. )
  101. def __eq__(self, other):
  102. return (self.__class__ == other.__class__) and (self.deconstruct() == other.deconstruct())