introspection.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. from collections import namedtuple
  2. import cx_Oracle
  3. from django.db import models
  4. from django.db.backends.base.introspection import (
  5. BaseDatabaseIntrospection, FieldInfo as BaseFieldInfo, TableInfo,
  6. )
  7. FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('is_autofield',))
  8. class DatabaseIntrospection(BaseDatabaseIntrospection):
  9. # Maps type objects to Django Field types.
  10. data_types_reverse = {
  11. cx_Oracle.BLOB: 'BinaryField',
  12. cx_Oracle.CLOB: 'TextField',
  13. cx_Oracle.DATETIME: 'DateField',
  14. cx_Oracle.FIXED_CHAR: 'CharField',
  15. cx_Oracle.FIXED_NCHAR: 'CharField',
  16. cx_Oracle.INTERVAL: 'DurationField',
  17. cx_Oracle.NATIVE_FLOAT: 'FloatField',
  18. cx_Oracle.NCHAR: 'CharField',
  19. cx_Oracle.NCLOB: 'TextField',
  20. cx_Oracle.NUMBER: 'DecimalField',
  21. cx_Oracle.STRING: 'CharField',
  22. cx_Oracle.TIMESTAMP: 'DateTimeField',
  23. }
  24. cache_bust_counter = 1
  25. def get_field_type(self, data_type, description):
  26. if data_type == cx_Oracle.NUMBER:
  27. precision, scale = description[4:6]
  28. if scale == 0:
  29. if precision > 11:
  30. return 'BigAutoField' if description.is_autofield else 'BigIntegerField'
  31. elif 1 < precision < 6 and description.is_autofield:
  32. return 'SmallAutoField'
  33. elif precision == 1:
  34. return 'BooleanField'
  35. elif description.is_autofield:
  36. return 'AutoField'
  37. else:
  38. return 'IntegerField'
  39. elif scale == -127:
  40. return 'FloatField'
  41. return super().get_field_type(data_type, description)
  42. def get_table_list(self, cursor):
  43. """Return a list of table and view names in the current database."""
  44. cursor.execute("""
  45. SELECT table_name, 't'
  46. FROM user_tables
  47. WHERE
  48. NOT EXISTS (
  49. SELECT 1
  50. FROM user_mviews
  51. WHERE user_mviews.mview_name = user_tables.table_name
  52. )
  53. UNION ALL
  54. SELECT view_name, 'v' FROM user_views
  55. UNION ALL
  56. SELECT mview_name, 'v' FROM user_mviews
  57. """)
  58. return [TableInfo(self.identifier_converter(row[0]), row[1]) for row in cursor.fetchall()]
  59. def get_table_description(self, cursor, table_name):
  60. """
  61. Return a description of the table with the DB-API cursor.description
  62. interface.
  63. """
  64. # user_tab_columns gives data default for columns
  65. cursor.execute("""
  66. SELECT
  67. column_name,
  68. data_default,
  69. CASE
  70. WHEN char_used IS NULL THEN data_length
  71. ELSE char_length
  72. END as internal_size,
  73. CASE
  74. WHEN identity_column = 'YES' THEN 1
  75. ELSE 0
  76. END as is_autofield
  77. FROM user_tab_cols
  78. WHERE table_name = UPPER(%s)""", [table_name])
  79. field_map = {
  80. column: (internal_size, default if default != 'NULL' else None, is_autofield)
  81. for column, default, internal_size, is_autofield in cursor.fetchall()
  82. }
  83. self.cache_bust_counter += 1
  84. cursor.execute("SELECT * FROM {} WHERE ROWNUM < 2 AND {} > 0".format(
  85. self.connection.ops.quote_name(table_name),
  86. self.cache_bust_counter))
  87. description = []
  88. for desc in cursor.description:
  89. name = desc[0]
  90. internal_size, default, is_autofield = field_map[name]
  91. name = name % {} # cx_Oracle, for some reason, doubles percent signs.
  92. description.append(FieldInfo(
  93. self.identifier_converter(name), *desc[1:3], internal_size, desc[4] or 0,
  94. desc[5] or 0, *desc[6:], default, is_autofield,
  95. ))
  96. return description
  97. def identifier_converter(self, name):
  98. """Identifier comparison is case insensitive under Oracle."""
  99. return name.lower()
  100. def get_sequences(self, cursor, table_name, table_fields=()):
  101. cursor.execute("""
  102. SELECT
  103. user_tab_identity_cols.sequence_name,
  104. user_tab_identity_cols.column_name
  105. FROM
  106. user_tab_identity_cols,
  107. user_constraints,
  108. user_cons_columns cols
  109. WHERE
  110. user_constraints.constraint_name = cols.constraint_name
  111. AND user_constraints.table_name = user_tab_identity_cols.table_name
  112. AND cols.column_name = user_tab_identity_cols.column_name
  113. AND user_constraints.constraint_type = 'P'
  114. AND user_tab_identity_cols.table_name = UPPER(%s)
  115. """, [table_name])
  116. # Oracle allows only one identity column per table.
  117. row = cursor.fetchone()
  118. if row:
  119. return [{
  120. 'name': self.identifier_converter(row[0]),
  121. 'table': self.identifier_converter(table_name),
  122. 'column': self.identifier_converter(row[1]),
  123. }]
  124. # To keep backward compatibility for AutoFields that aren't Oracle
  125. # identity columns.
  126. for f in table_fields:
  127. if isinstance(f, models.AutoField):
  128. return [{'table': table_name, 'column': f.column}]
  129. return []
  130. def get_relations(self, cursor, table_name):
  131. """
  132. Return a dictionary of {field_name: (field_name_other_table, other_table)}
  133. representing all relationships to the given table.
  134. """
  135. table_name = table_name.upper()
  136. cursor.execute("""
  137. SELECT ca.column_name, cb.table_name, cb.column_name
  138. FROM user_constraints, USER_CONS_COLUMNS ca, USER_CONS_COLUMNS cb
  139. WHERE user_constraints.table_name = %s AND
  140. user_constraints.constraint_name = ca.constraint_name AND
  141. user_constraints.r_constraint_name = cb.constraint_name AND
  142. ca.position = cb.position""", [table_name])
  143. return {
  144. self.identifier_converter(field_name): (
  145. self.identifier_converter(rel_field_name),
  146. self.identifier_converter(rel_table_name),
  147. ) for field_name, rel_table_name, rel_field_name in cursor.fetchall()
  148. }
  149. def get_key_columns(self, cursor, table_name):
  150. cursor.execute("""
  151. SELECT ccol.column_name, rcol.table_name AS referenced_table, rcol.column_name AS referenced_column
  152. FROM user_constraints c
  153. JOIN user_cons_columns ccol
  154. ON ccol.constraint_name = c.constraint_name
  155. JOIN user_cons_columns rcol
  156. ON rcol.constraint_name = c.r_constraint_name
  157. WHERE c.table_name = %s AND c.constraint_type = 'R'""", [table_name.upper()])
  158. return [
  159. tuple(self.identifier_converter(cell) for cell in row)
  160. for row in cursor.fetchall()
  161. ]
  162. def get_primary_key_column(self, cursor, table_name):
  163. cursor.execute("""
  164. SELECT
  165. cols.column_name
  166. FROM
  167. user_constraints,
  168. user_cons_columns cols
  169. WHERE
  170. user_constraints.constraint_name = cols.constraint_name AND
  171. user_constraints.constraint_type = 'P' AND
  172. user_constraints.table_name = UPPER(%s) AND
  173. cols.position = 1
  174. """, [table_name])
  175. row = cursor.fetchone()
  176. return self.identifier_converter(row[0]) if row else None
  177. def get_constraints(self, cursor, table_name):
  178. """
  179. Retrieve any constraints or keys (unique, pk, fk, check, index) across
  180. one or more columns.
  181. """
  182. constraints = {}
  183. # Loop over the constraints, getting PKs, uniques, and checks
  184. cursor.execute("""
  185. SELECT
  186. user_constraints.constraint_name,
  187. LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.position),
  188. CASE user_constraints.constraint_type
  189. WHEN 'P' THEN 1
  190. ELSE 0
  191. END AS is_primary_key,
  192. CASE
  193. WHEN user_constraints.constraint_type IN ('P', 'U') THEN 1
  194. ELSE 0
  195. END AS is_unique,
  196. CASE user_constraints.constraint_type
  197. WHEN 'C' THEN 1
  198. ELSE 0
  199. END AS is_check_constraint
  200. FROM
  201. user_constraints
  202. LEFT OUTER JOIN
  203. user_cons_columns cols ON user_constraints.constraint_name = cols.constraint_name
  204. WHERE
  205. user_constraints.constraint_type = ANY('P', 'U', 'C')
  206. AND user_constraints.table_name = UPPER(%s)
  207. GROUP BY user_constraints.constraint_name, user_constraints.constraint_type
  208. """, [table_name])
  209. for constraint, columns, pk, unique, check in cursor.fetchall():
  210. constraint = self.identifier_converter(constraint)
  211. constraints[constraint] = {
  212. 'columns': columns.split(','),
  213. 'primary_key': pk,
  214. 'unique': unique,
  215. 'foreign_key': None,
  216. 'check': check,
  217. 'index': unique, # All uniques come with an index
  218. }
  219. # Foreign key constraints
  220. cursor.execute("""
  221. SELECT
  222. cons.constraint_name,
  223. LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.position),
  224. LOWER(rcols.table_name),
  225. LOWER(rcols.column_name)
  226. FROM
  227. user_constraints cons
  228. INNER JOIN
  229. user_cons_columns rcols ON rcols.constraint_name = cons.r_constraint_name AND rcols.position = 1
  230. LEFT OUTER JOIN
  231. user_cons_columns cols ON cons.constraint_name = cols.constraint_name
  232. WHERE
  233. cons.constraint_type = 'R' AND
  234. cons.table_name = UPPER(%s)
  235. GROUP BY cons.constraint_name, rcols.table_name, rcols.column_name
  236. """, [table_name])
  237. for constraint, columns, other_table, other_column in cursor.fetchall():
  238. constraint = self.identifier_converter(constraint)
  239. constraints[constraint] = {
  240. 'primary_key': False,
  241. 'unique': False,
  242. 'foreign_key': (other_table, other_column),
  243. 'check': False,
  244. 'index': False,
  245. 'columns': columns.split(','),
  246. }
  247. # Now get indexes
  248. cursor.execute("""
  249. SELECT
  250. ind.index_name,
  251. LOWER(ind.index_type),
  252. LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.column_position),
  253. LISTAGG(cols.descend, ',') WITHIN GROUP (ORDER BY cols.column_position)
  254. FROM
  255. user_ind_columns cols, user_indexes ind
  256. WHERE
  257. cols.table_name = UPPER(%s) AND
  258. NOT EXISTS (
  259. SELECT 1
  260. FROM user_constraints cons
  261. WHERE ind.index_name = cons.index_name
  262. ) AND cols.index_name = ind.index_name
  263. GROUP BY ind.index_name, ind.index_type
  264. """, [table_name])
  265. for constraint, type_, columns, orders in cursor.fetchall():
  266. constraint = self.identifier_converter(constraint)
  267. constraints[constraint] = {
  268. 'primary_key': False,
  269. 'unique': False,
  270. 'foreign_key': None,
  271. 'check': False,
  272. 'index': True,
  273. 'type': 'idx' if type_ == 'normal' else type_,
  274. 'columns': columns.split(','),
  275. 'orders': orders.split(','),
  276. }
  277. return constraints