main.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. from datetime import datetime, timedelta
  2. from django import forms
  3. from django.conf import settings
  4. from django.contrib import messages
  5. from django.contrib.admin import FieldListFilter
  6. from django.contrib.admin.exceptions import (
  7. DisallowedModelAdminLookup, DisallowedModelAdminToField,
  8. )
  9. from django.contrib.admin.options import (
  10. IS_POPUP_VAR, TO_FIELD_VAR, IncorrectLookupParameters,
  11. )
  12. from django.contrib.admin.utils import (
  13. get_fields_from_path, lookup_needs_distinct, prepare_lookup_value, quote,
  14. )
  15. from django.core.exceptions import (
  16. FieldDoesNotExist, ImproperlyConfigured, SuspiciousOperation,
  17. )
  18. from django.core.paginator import InvalidPage
  19. from django.db import models
  20. from django.db.models.expressions import Combinable, F, OrderBy
  21. from django.urls import reverse
  22. from django.utils.http import urlencode
  23. from django.utils.timezone import make_aware
  24. from django.utils.translation import gettext
  25. # Changelist settings
  26. ALL_VAR = 'all'
  27. ORDER_VAR = 'o'
  28. ORDER_TYPE_VAR = 'ot'
  29. PAGE_VAR = 'p'
  30. SEARCH_VAR = 'q'
  31. ERROR_FLAG = 'e'
  32. IGNORED_PARAMS = (
  33. ALL_VAR, ORDER_VAR, ORDER_TYPE_VAR, SEARCH_VAR, IS_POPUP_VAR, TO_FIELD_VAR)
  34. class ChangeListSearchForm(forms.Form):
  35. def __init__(self, *args, **kwargs):
  36. super().__init__(*args, **kwargs)
  37. # Populate "fields" dynamically because SEARCH_VAR is a variable:
  38. self.fields = {
  39. SEARCH_VAR: forms.CharField(required=False, strip=False),
  40. }
  41. class ChangeList:
  42. search_form_class = ChangeListSearchForm
  43. def __init__(self, request, model, list_display, list_display_links,
  44. list_filter, date_hierarchy, search_fields, list_select_related,
  45. list_per_page, list_max_show_all, list_editable, model_admin, sortable_by):
  46. self.model = model
  47. self.opts = model._meta
  48. self.lookup_opts = self.opts
  49. self.root_queryset = model_admin.get_queryset(request)
  50. self.list_display = list_display
  51. self.list_display_links = list_display_links
  52. self.list_filter = list_filter
  53. self.has_filters = None
  54. self.date_hierarchy = date_hierarchy
  55. self.search_fields = search_fields
  56. self.list_select_related = list_select_related
  57. self.list_per_page = list_per_page
  58. self.list_max_show_all = list_max_show_all
  59. self.model_admin = model_admin
  60. self.preserved_filters = model_admin.get_preserved_filters(request)
  61. self.sortable_by = sortable_by
  62. # Get search parameters from the query string.
  63. _search_form = self.search_form_class(request.GET)
  64. if not _search_form.is_valid():
  65. for error in _search_form.errors.values():
  66. messages.error(request, ', '.join(error))
  67. self.query = _search_form.cleaned_data.get(SEARCH_VAR) or ''
  68. try:
  69. self.page_num = int(request.GET.get(PAGE_VAR, 0))
  70. except ValueError:
  71. self.page_num = 0
  72. self.show_all = ALL_VAR in request.GET
  73. self.is_popup = IS_POPUP_VAR in request.GET
  74. to_field = request.GET.get(TO_FIELD_VAR)
  75. if to_field and not model_admin.to_field_allowed(request, to_field):
  76. raise DisallowedModelAdminToField("The field %s cannot be referenced." % to_field)
  77. self.to_field = to_field
  78. self.params = dict(request.GET.items())
  79. if PAGE_VAR in self.params:
  80. del self.params[PAGE_VAR]
  81. if ERROR_FLAG in self.params:
  82. del self.params[ERROR_FLAG]
  83. if self.is_popup:
  84. self.list_editable = ()
  85. else:
  86. self.list_editable = list_editable
  87. self.queryset = self.get_queryset(request)
  88. self.get_results(request)
  89. if self.is_popup:
  90. title = gettext('Select %s')
  91. elif self.model_admin.has_change_permission(request):
  92. title = gettext('Select %s to change')
  93. else:
  94. title = gettext('Select %s to view')
  95. self.title = title % self.opts.verbose_name
  96. self.pk_attname = self.lookup_opts.pk.attname
  97. def get_filters_params(self, params=None):
  98. """
  99. Return all params except IGNORED_PARAMS.
  100. """
  101. params = params or self.params
  102. lookup_params = params.copy() # a dictionary of the query string
  103. # Remove all the parameters that are globally and systematically
  104. # ignored.
  105. for ignored in IGNORED_PARAMS:
  106. if ignored in lookup_params:
  107. del lookup_params[ignored]
  108. return lookup_params
  109. def get_filters(self, request):
  110. lookup_params = self.get_filters_params()
  111. use_distinct = False
  112. for key, value in lookup_params.items():
  113. if not self.model_admin.lookup_allowed(key, value):
  114. raise DisallowedModelAdminLookup("Filtering by %s not allowed" % key)
  115. filter_specs = []
  116. for list_filter in self.list_filter:
  117. if callable(list_filter):
  118. # This is simply a custom list filter class.
  119. spec = list_filter(request, lookup_params, self.model, self.model_admin)
  120. else:
  121. field_path = None
  122. if isinstance(list_filter, (tuple, list)):
  123. # This is a custom FieldListFilter class for a given field.
  124. field, field_list_filter_class = list_filter
  125. else:
  126. # This is simply a field name, so use the default
  127. # FieldListFilter class that has been registered for the
  128. # type of the given field.
  129. field, field_list_filter_class = list_filter, FieldListFilter.create
  130. if not isinstance(field, models.Field):
  131. field_path = field
  132. field = get_fields_from_path(self.model, field_path)[-1]
  133. lookup_params_count = len(lookup_params)
  134. spec = field_list_filter_class(
  135. field, request, lookup_params,
  136. self.model, self.model_admin, field_path=field_path,
  137. )
  138. # field_list_filter_class removes any lookup_params it
  139. # processes. If that happened, check if distinct() is needed to
  140. # remove duplicate results.
  141. if lookup_params_count > len(lookup_params):
  142. use_distinct = use_distinct or lookup_needs_distinct(self.lookup_opts, field_path)
  143. if spec and spec.has_output():
  144. filter_specs.append(spec)
  145. if self.date_hierarchy:
  146. # Create bounded lookup parameters so that the query is more
  147. # efficient.
  148. year = lookup_params.pop('%s__year' % self.date_hierarchy, None)
  149. if year is not None:
  150. month = lookup_params.pop('%s__month' % self.date_hierarchy, None)
  151. day = lookup_params.pop('%s__day' % self.date_hierarchy, None)
  152. try:
  153. from_date = datetime(
  154. int(year),
  155. int(month if month is not None else 1),
  156. int(day if day is not None else 1),
  157. )
  158. except ValueError as e:
  159. raise IncorrectLookupParameters(e) from e
  160. if settings.USE_TZ:
  161. from_date = make_aware(from_date)
  162. if day:
  163. to_date = from_date + timedelta(days=1)
  164. elif month:
  165. # In this branch, from_date will always be the first of a
  166. # month, so advancing 32 days gives the next month.
  167. to_date = (from_date + timedelta(days=32)).replace(day=1)
  168. else:
  169. to_date = from_date.replace(year=from_date.year + 1)
  170. lookup_params.update({
  171. '%s__gte' % self.date_hierarchy: from_date,
  172. '%s__lt' % self.date_hierarchy: to_date,
  173. })
  174. # At this point, all the parameters used by the various ListFilters
  175. # have been removed from lookup_params, which now only contains other
  176. # parameters passed via the query string. We now loop through the
  177. # remaining parameters both to ensure that all the parameters are valid
  178. # fields and to determine if at least one of them needs distinct(). If
  179. # the lookup parameters aren't real fields, then bail out.
  180. try:
  181. for key, value in lookup_params.items():
  182. lookup_params[key] = prepare_lookup_value(key, value)
  183. use_distinct = use_distinct or lookup_needs_distinct(self.lookup_opts, key)
  184. return filter_specs, bool(filter_specs), lookup_params, use_distinct
  185. except FieldDoesNotExist as e:
  186. raise IncorrectLookupParameters(e) from e
  187. def get_query_string(self, new_params=None, remove=None):
  188. if new_params is None:
  189. new_params = {}
  190. if remove is None:
  191. remove = []
  192. p = self.params.copy()
  193. for r in remove:
  194. for k in list(p):
  195. if k.startswith(r):
  196. del p[k]
  197. for k, v in new_params.items():
  198. if v is None:
  199. if k in p:
  200. del p[k]
  201. else:
  202. p[k] = v
  203. return '?%s' % urlencode(sorted(p.items()))
  204. def get_results(self, request):
  205. paginator = self.model_admin.get_paginator(request, self.queryset, self.list_per_page)
  206. # Get the number of objects, with admin filters applied.
  207. result_count = paginator.count
  208. # Get the total number of objects, with no admin filters applied.
  209. if self.model_admin.show_full_result_count:
  210. full_result_count = self.root_queryset.count()
  211. else:
  212. full_result_count = None
  213. can_show_all = result_count <= self.list_max_show_all
  214. multi_page = result_count > self.list_per_page
  215. # Get the list of objects to display on this page.
  216. if (self.show_all and can_show_all) or not multi_page:
  217. result_list = self.queryset._clone()
  218. else:
  219. try:
  220. result_list = paginator.page(self.page_num + 1).object_list
  221. except InvalidPage:
  222. raise IncorrectLookupParameters
  223. self.result_count = result_count
  224. self.show_full_result_count = self.model_admin.show_full_result_count
  225. # Admin actions are shown if there is at least one entry
  226. # or if entries are not counted because show_full_result_count is disabled
  227. self.show_admin_actions = not self.show_full_result_count or bool(full_result_count)
  228. self.full_result_count = full_result_count
  229. self.result_list = result_list
  230. self.can_show_all = can_show_all
  231. self.multi_page = multi_page
  232. self.paginator = paginator
  233. def _get_default_ordering(self):
  234. ordering = []
  235. if self.model_admin.ordering:
  236. ordering = self.model_admin.ordering
  237. elif self.lookup_opts.ordering:
  238. ordering = self.lookup_opts.ordering
  239. return ordering
  240. def get_ordering_field(self, field_name):
  241. """
  242. Return the proper model field name corresponding to the given
  243. field_name to use for ordering. field_name may either be the name of a
  244. proper model field or the name of a method (on the admin or model) or a
  245. callable with the 'admin_order_field' attribute. Return None if no
  246. proper model field name can be matched.
  247. """
  248. try:
  249. field = self.lookup_opts.get_field(field_name)
  250. return field.name
  251. except FieldDoesNotExist:
  252. # See whether field_name is a name of a non-field
  253. # that allows sorting.
  254. if callable(field_name):
  255. attr = field_name
  256. elif hasattr(self.model_admin, field_name):
  257. attr = getattr(self.model_admin, field_name)
  258. else:
  259. attr = getattr(self.model, field_name)
  260. if isinstance(attr, property) and hasattr(attr, 'fget'):
  261. attr = attr.fget
  262. return getattr(attr, 'admin_order_field', None)
  263. def get_ordering(self, request, queryset):
  264. """
  265. Return the list of ordering fields for the change list.
  266. First check the get_ordering() method in model admin, then check
  267. the object's default ordering. Then, any manually-specified ordering
  268. from the query string overrides anything. Finally, a deterministic
  269. order is guaranteed by calling _get_deterministic_ordering() with the
  270. constructed ordering.
  271. """
  272. params = self.params
  273. ordering = list(self.model_admin.get_ordering(request) or self._get_default_ordering())
  274. if ORDER_VAR in params:
  275. # Clear ordering and used params
  276. ordering = []
  277. order_params = params[ORDER_VAR].split('.')
  278. for p in order_params:
  279. try:
  280. none, pfx, idx = p.rpartition('-')
  281. field_name = self.list_display[int(idx)]
  282. order_field = self.get_ordering_field(field_name)
  283. if not order_field:
  284. continue # No 'admin_order_field', skip it
  285. if hasattr(order_field, 'as_sql'):
  286. # order_field is an expression.
  287. ordering.append(order_field.desc() if pfx == '-' else order_field.asc())
  288. # reverse order if order_field has already "-" as prefix
  289. elif order_field.startswith('-') and pfx == '-':
  290. ordering.append(order_field[1:])
  291. else:
  292. ordering.append(pfx + order_field)
  293. except (IndexError, ValueError):
  294. continue # Invalid ordering specified, skip it.
  295. # Add the given query's ordering fields, if any.
  296. ordering.extend(queryset.query.order_by)
  297. return self._get_deterministic_ordering(ordering)
  298. def _get_deterministic_ordering(self, ordering):
  299. """
  300. Ensure a deterministic order across all database backends. Search for a
  301. single field or unique together set of fields providing a total
  302. ordering. If these are missing, augment the ordering with a descendant
  303. primary key.
  304. """
  305. ordering = list(ordering)
  306. ordering_fields = set()
  307. total_ordering_fields = {'pk'} | {
  308. field.attname for field in self.lookup_opts.fields
  309. if field.unique and not field.null
  310. }
  311. for part in ordering:
  312. # Search for single field providing a total ordering.
  313. field_name = None
  314. if isinstance(part, str):
  315. field_name = part.lstrip('-')
  316. elif isinstance(part, F):
  317. field_name = part.name
  318. elif isinstance(part, OrderBy) and isinstance(part.expression, F):
  319. field_name = part.expression.name
  320. if field_name:
  321. # Normalize attname references by using get_field().
  322. try:
  323. field = self.lookup_opts.get_field(field_name)
  324. except FieldDoesNotExist:
  325. # Could be "?" for random ordering or a related field
  326. # lookup. Skip this part of introspection for now.
  327. continue
  328. # Ordering by a related field name orders by the referenced
  329. # model's ordering. Skip this part of introspection for now.
  330. if field.remote_field and field_name == field.name:
  331. continue
  332. if field.attname in total_ordering_fields:
  333. break
  334. ordering_fields.add(field.attname)
  335. else:
  336. # No single total ordering field, try unique_together.
  337. for field_names in self.lookup_opts.unique_together:
  338. # Normalize attname references by using get_field().
  339. fields = [self.lookup_opts.get_field(field_name) for field_name in field_names]
  340. # Composite unique constraints containing a nullable column
  341. # cannot ensure total ordering.
  342. if any(field.null for field in fields):
  343. continue
  344. if ordering_fields.issuperset(field.attname for field in fields):
  345. break
  346. else:
  347. # If no set of unique fields is present in the ordering, rely
  348. # on the primary key to provide total ordering.
  349. ordering.append('-pk')
  350. return ordering
  351. def get_ordering_field_columns(self):
  352. """
  353. Return a dictionary of ordering field column numbers and asc/desc.
  354. """
  355. # We must cope with more than one column having the same underlying sort
  356. # field, so we base things on column numbers.
  357. ordering = self._get_default_ordering()
  358. ordering_fields = {}
  359. if ORDER_VAR not in self.params:
  360. # for ordering specified on ModelAdmin or model Meta, we don't know
  361. # the right column numbers absolutely, because there might be more
  362. # than one column associated with that ordering, so we guess.
  363. for field in ordering:
  364. if isinstance(field, (Combinable, OrderBy)):
  365. if not isinstance(field, OrderBy):
  366. field = field.asc()
  367. if isinstance(field.expression, F):
  368. order_type = 'desc' if field.descending else 'asc'
  369. field = field.expression.name
  370. else:
  371. continue
  372. elif field.startswith('-'):
  373. field = field[1:]
  374. order_type = 'desc'
  375. else:
  376. order_type = 'asc'
  377. for index, attr in enumerate(self.list_display):
  378. if self.get_ordering_field(attr) == field:
  379. ordering_fields[index] = order_type
  380. break
  381. else:
  382. for p in self.params[ORDER_VAR].split('.'):
  383. none, pfx, idx = p.rpartition('-')
  384. try:
  385. idx = int(idx)
  386. except ValueError:
  387. continue # skip it
  388. ordering_fields[idx] = 'desc' if pfx == '-' else 'asc'
  389. return ordering_fields
  390. def get_queryset(self, request):
  391. # First, we collect all the declared list filters.
  392. (self.filter_specs, self.has_filters, remaining_lookup_params,
  393. filters_use_distinct) = self.get_filters(request)
  394. # Then, we let every list filter modify the queryset to its liking.
  395. qs = self.root_queryset
  396. for filter_spec in self.filter_specs:
  397. new_qs = filter_spec.queryset(request, qs)
  398. if new_qs is not None:
  399. qs = new_qs
  400. try:
  401. # Finally, we apply the remaining lookup parameters from the query
  402. # string (i.e. those that haven't already been processed by the
  403. # filters).
  404. qs = qs.filter(**remaining_lookup_params)
  405. except (SuspiciousOperation, ImproperlyConfigured):
  406. # Allow certain types of errors to be re-raised as-is so that the
  407. # caller can treat them in a special way.
  408. raise
  409. except Exception as e:
  410. # Every other error is caught with a naked except, because we don't
  411. # have any other way of validating lookup parameters. They might be
  412. # invalid if the keyword arguments are incorrect, or if the values
  413. # are not in the correct type, so we might get FieldError,
  414. # ValueError, ValidationError, or ?.
  415. raise IncorrectLookupParameters(e)
  416. if not qs.query.select_related:
  417. qs = self.apply_select_related(qs)
  418. # Set ordering.
  419. ordering = self.get_ordering(request, qs)
  420. qs = qs.order_by(*ordering)
  421. # Apply search results
  422. qs, search_use_distinct = self.model_admin.get_search_results(request, qs, self.query)
  423. # Remove duplicates from results, if necessary
  424. if filters_use_distinct | search_use_distinct:
  425. return qs.distinct()
  426. else:
  427. return qs
  428. def apply_select_related(self, qs):
  429. if self.list_select_related is True:
  430. return qs.select_related()
  431. if self.list_select_related is False:
  432. if self.has_related_field_in_list_display():
  433. return qs.select_related()
  434. if self.list_select_related:
  435. return qs.select_related(*self.list_select_related)
  436. return qs
  437. def has_related_field_in_list_display(self):
  438. for field_name in self.list_display:
  439. try:
  440. field = self.lookup_opts.get_field(field_name)
  441. except FieldDoesNotExist:
  442. pass
  443. else:
  444. if isinstance(field.remote_field, models.ManyToOneRel):
  445. # <FK>_id field names don't require a join.
  446. if field_name != field.get_attname():
  447. return True
  448. return False
  449. def url_for_result(self, result):
  450. pk = getattr(result, self.pk_attname)
  451. return reverse('admin:%s_%s_change' % (self.opts.app_label,
  452. self.opts.model_name),
  453. args=(quote(pk),),
  454. current_app=self.model_admin.admin_site.name)