introspection.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. import re
  2. from collections import namedtuple
  3. import sqlparse
  4. from django.db.backends.base.introspection import (
  5. BaseDatabaseIntrospection, FieldInfo as BaseFieldInfo, TableInfo,
  6. )
  7. from django.db.models.indexes import Index
  8. FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('pk',))
  9. field_size_re = re.compile(r'^\s*(?:var)?char\s*\(\s*(\d+)\s*\)\s*$')
  10. def get_field_size(name):
  11. """ Extract the size number from a "varchar(11)" type name """
  12. m = field_size_re.search(name)
  13. return int(m.group(1)) if m else None
  14. # This light wrapper "fakes" a dictionary interface, because some SQLite data
  15. # types include variables in them -- e.g. "varchar(30)" -- and can't be matched
  16. # as a simple dictionary lookup.
  17. class FlexibleFieldLookupDict:
  18. # Maps SQL types to Django Field types. Some of the SQL types have multiple
  19. # entries here because SQLite allows for anything and doesn't normalize the
  20. # field type; it uses whatever was given.
  21. base_data_types_reverse = {
  22. 'bool': 'BooleanField',
  23. 'boolean': 'BooleanField',
  24. 'smallint': 'SmallIntegerField',
  25. 'smallint unsigned': 'PositiveSmallIntegerField',
  26. 'smallinteger': 'SmallIntegerField',
  27. 'int': 'IntegerField',
  28. 'integer': 'IntegerField',
  29. 'bigint': 'BigIntegerField',
  30. 'integer unsigned': 'PositiveIntegerField',
  31. 'decimal': 'DecimalField',
  32. 'real': 'FloatField',
  33. 'text': 'TextField',
  34. 'char': 'CharField',
  35. 'varchar': 'CharField',
  36. 'blob': 'BinaryField',
  37. 'date': 'DateField',
  38. 'datetime': 'DateTimeField',
  39. 'time': 'TimeField',
  40. }
  41. def __getitem__(self, key):
  42. key = key.lower().split('(', 1)[0].strip()
  43. return self.base_data_types_reverse[key]
  44. class DatabaseIntrospection(BaseDatabaseIntrospection):
  45. data_types_reverse = FlexibleFieldLookupDict()
  46. def get_field_type(self, data_type, description):
  47. field_type = super().get_field_type(data_type, description)
  48. if description.pk and field_type in {'BigIntegerField', 'IntegerField', 'SmallIntegerField'}:
  49. # No support for BigAutoField or SmallAutoField as SQLite treats
  50. # all integer primary keys as signed 64-bit integers.
  51. return 'AutoField'
  52. return field_type
  53. def get_table_list(self, cursor):
  54. """Return a list of table and view names in the current database."""
  55. # Skip the sqlite_sequence system table used for autoincrement key
  56. # generation.
  57. cursor.execute("""
  58. SELECT name, type FROM sqlite_master
  59. WHERE type in ('table', 'view') AND NOT name='sqlite_sequence'
  60. ORDER BY name""")
  61. return [TableInfo(row[0], row[1][0]) for row in cursor.fetchall()]
  62. def get_table_description(self, cursor, table_name):
  63. """
  64. Return a description of the table with the DB-API cursor.description
  65. interface.
  66. """
  67. cursor.execute('PRAGMA table_info(%s)' % self.connection.ops.quote_name(table_name))
  68. return [
  69. FieldInfo(
  70. name, data_type, None, get_field_size(data_type), None, None,
  71. not notnull, default, pk == 1,
  72. )
  73. for cid, name, data_type, notnull, default, pk in cursor.fetchall()
  74. ]
  75. def get_sequences(self, cursor, table_name, table_fields=()):
  76. pk_col = self.get_primary_key_column(cursor, table_name)
  77. return [{'table': table_name, 'column': pk_col}]
  78. def get_relations(self, cursor, table_name):
  79. """
  80. Return a dictionary of {field_name: (field_name_other_table, other_table)}
  81. representing all relationships to the given table.
  82. """
  83. # Dictionary of relations to return
  84. relations = {}
  85. # Schema for this table
  86. cursor.execute(
  87. "SELECT sql, type FROM sqlite_master "
  88. "WHERE tbl_name = %s AND type IN ('table', 'view')",
  89. [table_name]
  90. )
  91. create_sql, table_type = cursor.fetchone()
  92. if table_type == 'view':
  93. # It might be a view, then no results will be returned
  94. return relations
  95. results = create_sql[create_sql.index('(') + 1:create_sql.rindex(')')]
  96. # Walk through and look for references to other tables. SQLite doesn't
  97. # really have enforced references, but since it echoes out the SQL used
  98. # to create the table we can look for REFERENCES statements used there.
  99. for field_desc in results.split(','):
  100. field_desc = field_desc.strip()
  101. if field_desc.startswith("UNIQUE"):
  102. continue
  103. m = re.search(r'references (\S*) ?\(["|]?(.*)["|]?\)', field_desc, re.I)
  104. if not m:
  105. continue
  106. table, column = [s.strip('"') for s in m.groups()]
  107. if field_desc.startswith("FOREIGN KEY"):
  108. # Find name of the target FK field
  109. m = re.match(r'FOREIGN KEY\s*\(([^\)]*)\).*', field_desc, re.I)
  110. field_name = m.groups()[0].strip('"')
  111. else:
  112. field_name = field_desc.split()[0].strip('"')
  113. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table])
  114. result = cursor.fetchall()[0]
  115. other_table_results = result[0].strip()
  116. li, ri = other_table_results.index('('), other_table_results.rindex(')')
  117. other_table_results = other_table_results[li + 1:ri]
  118. for other_desc in other_table_results.split(','):
  119. other_desc = other_desc.strip()
  120. if other_desc.startswith('UNIQUE'):
  121. continue
  122. other_name = other_desc.split(' ', 1)[0].strip('"')
  123. if other_name == column:
  124. relations[field_name] = (other_name, table)
  125. break
  126. return relations
  127. def get_key_columns(self, cursor, table_name):
  128. """
  129. Return a list of (column_name, referenced_table_name, referenced_column_name)
  130. for all key columns in given table.
  131. """
  132. key_columns = []
  133. # Schema for this table
  134. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s AND type = %s", [table_name, "table"])
  135. results = cursor.fetchone()[0].strip()
  136. results = results[results.index('(') + 1:results.rindex(')')]
  137. # Walk through and look for references to other tables. SQLite doesn't
  138. # really have enforced references, but since it echoes out the SQL used
  139. # to create the table we can look for REFERENCES statements used there.
  140. for field_index, field_desc in enumerate(results.split(',')):
  141. field_desc = field_desc.strip()
  142. if field_desc.startswith("UNIQUE"):
  143. continue
  144. m = re.search(r'"(.*)".*references (.*) \(["|](.*)["|]\)', field_desc, re.I)
  145. if not m:
  146. continue
  147. # This will append (column_name, referenced_table_name, referenced_column_name) to key_columns
  148. key_columns.append(tuple(s.strip('"') for s in m.groups()))
  149. return key_columns
  150. def get_primary_key_column(self, cursor, table_name):
  151. """Return the column name of the primary key for the given table."""
  152. # Don't use PRAGMA because that causes issues with some transactions
  153. cursor.execute(
  154. "SELECT sql, type FROM sqlite_master "
  155. "WHERE tbl_name = %s AND type IN ('table', 'view')",
  156. [table_name]
  157. )
  158. row = cursor.fetchone()
  159. if row is None:
  160. raise ValueError("Table %s does not exist" % table_name)
  161. create_sql, table_type = row
  162. if table_type == 'view':
  163. # Views don't have a primary key.
  164. return None
  165. fields_sql = create_sql[create_sql.index('(') + 1:create_sql.rindex(')')]
  166. for field_desc in fields_sql.split(','):
  167. field_desc = field_desc.strip()
  168. m = re.match(r'(?:(?:["`\[])(.*)(?:["`\]])|(\w+)).*PRIMARY KEY.*', field_desc)
  169. if m:
  170. return m.group(1) if m.group(1) else m.group(2)
  171. return None
  172. def _get_foreign_key_constraints(self, cursor, table_name):
  173. constraints = {}
  174. cursor.execute('PRAGMA foreign_key_list(%s)' % self.connection.ops.quote_name(table_name))
  175. for row in cursor.fetchall():
  176. # Remaining on_update/on_delete/match values are of no interest.
  177. id_, _, table, from_, to = row[:5]
  178. constraints['fk_%d' % id_] = {
  179. 'columns': [from_],
  180. 'primary_key': False,
  181. 'unique': False,
  182. 'foreign_key': (table, to),
  183. 'check': False,
  184. 'index': False,
  185. }
  186. return constraints
  187. def _parse_column_or_constraint_definition(self, tokens, columns):
  188. token = None
  189. is_constraint_definition = None
  190. field_name = None
  191. constraint_name = None
  192. unique = False
  193. unique_columns = []
  194. check = False
  195. check_columns = []
  196. braces_deep = 0
  197. for token in tokens:
  198. if token.match(sqlparse.tokens.Punctuation, '('):
  199. braces_deep += 1
  200. elif token.match(sqlparse.tokens.Punctuation, ')'):
  201. braces_deep -= 1
  202. if braces_deep < 0:
  203. # End of columns and constraints for table definition.
  204. break
  205. elif braces_deep == 0 and token.match(sqlparse.tokens.Punctuation, ','):
  206. # End of current column or constraint definition.
  207. break
  208. # Detect column or constraint definition by first token.
  209. if is_constraint_definition is None:
  210. is_constraint_definition = token.match(sqlparse.tokens.Keyword, 'CONSTRAINT')
  211. if is_constraint_definition:
  212. continue
  213. if is_constraint_definition:
  214. # Detect constraint name by second token.
  215. if constraint_name is None:
  216. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  217. constraint_name = token.value
  218. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  219. constraint_name = token.value[1:-1]
  220. # Start constraint columns parsing after UNIQUE keyword.
  221. if token.match(sqlparse.tokens.Keyword, 'UNIQUE'):
  222. unique = True
  223. unique_braces_deep = braces_deep
  224. elif unique:
  225. if unique_braces_deep == braces_deep:
  226. if unique_columns:
  227. # Stop constraint parsing.
  228. unique = False
  229. continue
  230. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  231. unique_columns.append(token.value)
  232. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  233. unique_columns.append(token.value[1:-1])
  234. else:
  235. # Detect field name by first token.
  236. if field_name is None:
  237. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  238. field_name = token.value
  239. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  240. field_name = token.value[1:-1]
  241. if token.match(sqlparse.tokens.Keyword, 'UNIQUE'):
  242. unique_columns = [field_name]
  243. # Start constraint columns parsing after CHECK keyword.
  244. if token.match(sqlparse.tokens.Keyword, 'CHECK'):
  245. check = True
  246. check_braces_deep = braces_deep
  247. elif check:
  248. if check_braces_deep == braces_deep:
  249. if check_columns:
  250. # Stop constraint parsing.
  251. check = False
  252. continue
  253. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  254. if token.value in columns:
  255. check_columns.append(token.value)
  256. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  257. if token.value[1:-1] in columns:
  258. check_columns.append(token.value[1:-1])
  259. unique_constraint = {
  260. 'unique': True,
  261. 'columns': unique_columns,
  262. 'primary_key': False,
  263. 'foreign_key': None,
  264. 'check': False,
  265. 'index': False,
  266. } if unique_columns else None
  267. check_constraint = {
  268. 'check': True,
  269. 'columns': check_columns,
  270. 'primary_key': False,
  271. 'unique': False,
  272. 'foreign_key': None,
  273. 'index': False,
  274. } if check_columns else None
  275. return constraint_name, unique_constraint, check_constraint, token
  276. def _parse_table_constraints(self, sql, columns):
  277. # Check constraint parsing is based of SQLite syntax diagram.
  278. # https://www.sqlite.org/syntaxdiagrams.html#table-constraint
  279. statement = sqlparse.parse(sql)[0]
  280. constraints = {}
  281. unnamed_constrains_index = 0
  282. tokens = (token for token in statement.flatten() if not token.is_whitespace)
  283. # Go to columns and constraint definition
  284. for token in tokens:
  285. if token.match(sqlparse.tokens.Punctuation, '('):
  286. break
  287. # Parse columns and constraint definition
  288. while True:
  289. constraint_name, unique, check, end_token = self._parse_column_or_constraint_definition(tokens, columns)
  290. if unique:
  291. if constraint_name:
  292. constraints[constraint_name] = unique
  293. else:
  294. unnamed_constrains_index += 1
  295. constraints['__unnamed_constraint_%s__' % unnamed_constrains_index] = unique
  296. if check:
  297. if constraint_name:
  298. constraints[constraint_name] = check
  299. else:
  300. unnamed_constrains_index += 1
  301. constraints['__unnamed_constraint_%s__' % unnamed_constrains_index] = check
  302. if end_token.match(sqlparse.tokens.Punctuation, ')'):
  303. break
  304. return constraints
  305. def get_constraints(self, cursor, table_name):
  306. """
  307. Retrieve any constraints or keys (unique, pk, fk, check, index) across
  308. one or more columns.
  309. """
  310. constraints = {}
  311. # Find inline check constraints.
  312. try:
  313. table_schema = cursor.execute(
  314. "SELECT sql FROM sqlite_master WHERE type='table' and name=%s" % (
  315. self.connection.ops.quote_name(table_name),
  316. )
  317. ).fetchone()[0]
  318. except TypeError:
  319. # table_name is a view.
  320. pass
  321. else:
  322. columns = {info.name for info in self.get_table_description(cursor, table_name)}
  323. constraints.update(self._parse_table_constraints(table_schema, columns))
  324. # Get the index info
  325. cursor.execute("PRAGMA index_list(%s)" % self.connection.ops.quote_name(table_name))
  326. for row in cursor.fetchall():
  327. # SQLite 3.8.9+ has 5 columns, however older versions only give 3
  328. # columns. Discard last 2 columns if there.
  329. number, index, unique = row[:3]
  330. cursor.execute(
  331. "SELECT sql FROM sqlite_master "
  332. "WHERE type='index' AND name=%s" % self.connection.ops.quote_name(index)
  333. )
  334. # There's at most one row.
  335. sql, = cursor.fetchone() or (None,)
  336. # Inline constraints are already detected in
  337. # _parse_table_constraints(). The reasons to avoid fetching inline
  338. # constraints from `PRAGMA index_list` are:
  339. # - Inline constraints can have a different name and information
  340. # than what `PRAGMA index_list` gives.
  341. # - Not all inline constraints may appear in `PRAGMA index_list`.
  342. if not sql:
  343. # An inline constraint
  344. continue
  345. # Get the index info for that index
  346. cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index))
  347. for index_rank, column_rank, column in cursor.fetchall():
  348. if index not in constraints:
  349. constraints[index] = {
  350. "columns": [],
  351. "primary_key": False,
  352. "unique": bool(unique),
  353. "foreign_key": None,
  354. "check": False,
  355. "index": True,
  356. }
  357. constraints[index]['columns'].append(column)
  358. # Add type and column orders for indexes
  359. if constraints[index]['index'] and not constraints[index]['unique']:
  360. # SQLite doesn't support any index type other than b-tree
  361. constraints[index]['type'] = Index.suffix
  362. order_info = sql.split('(')[-1].split(')')[0].split(',')
  363. orders = ['DESC' if info.endswith('DESC') else 'ASC' for info in order_info]
  364. constraints[index]['orders'] = orders
  365. # Get the PK
  366. pk_column = self.get_primary_key_column(cursor, table_name)
  367. if pk_column:
  368. # SQLite doesn't actually give a name to the PK constraint,
  369. # so we invent one. This is fine, as the SQLite backend never
  370. # deletes PK constraints by name, as you can't delete constraints
  371. # in SQLite; we remake the table with a new PK instead.
  372. constraints["__primary__"] = {
  373. "columns": [pk_column],
  374. "primary_key": True,
  375. "unique": False, # It's not actually a unique constraint.
  376. "foreign_key": None,
  377. "check": False,
  378. "index": False,
  379. }
  380. constraints.update(self._get_foreign_key_constraints(cursor, table_name))
  381. return constraints