move.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2008 Nicolas Kandel
  3. # Copyright © 2008 Pascal Halter
  4. # Copyright © 2008-2017 Guillaume Ayoub
  5. # Copyright © 2017-2023 Unrud <unrud@outlook.com>
  6. # Copyright © 2023-2025 Peter Bieringer <pb@bieringer.de>
  7. #
  8. # This library is free software: you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation, either version 3 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  20. import posixpath
  21. import re
  22. from http import client
  23. from urllib.parse import urlparse
  24. from radicale import httputils, pathutils, storage, types
  25. from radicale.app.base import Access, ApplicationBase
  26. from radicale.log import logger
  27. def get_server_netloc(environ: types.WSGIEnviron, force_port: bool = False):
  28. if environ.get("HTTP_X_FORWARDED_HOST"):
  29. host = environ["HTTP_X_FORWARDED_HOST"]
  30. proto = environ.get("HTTP_X_FORWARDED_PROTO") or "http"
  31. port = "443" if proto == "https" else "80"
  32. port = environ["HTTP_X_FORWARDED_PORT"] or port
  33. else:
  34. host = environ.get("HTTP_HOST") or environ["SERVER_NAME"]
  35. proto = environ["wsgi.url_scheme"]
  36. port = environ["SERVER_PORT"]
  37. if (not force_port and port == ("443" if proto == "https" else "80") or
  38. re.search(r":\d+$", host)):
  39. return host
  40. return host + ":" + port
  41. class ApplicationPartMove(ApplicationBase):
  42. def do_MOVE(self, environ: types.WSGIEnviron, base_prefix: str,
  43. path: str, user: str) -> types.WSGIResponse:
  44. """Manage MOVE request."""
  45. raw_dest = environ.get("HTTP_DESTINATION", "")
  46. to_url = urlparse(raw_dest)
  47. to_netloc_with_port = to_url.netloc
  48. if to_url.port is None:
  49. to_netloc_with_port += (":443" if to_url.scheme == "https"
  50. else ":80")
  51. if to_netloc_with_port != get_server_netloc(environ, force_port=True):
  52. logger.info("Unsupported destination address: %r", raw_dest)
  53. # Remote destination server, not supported
  54. return httputils.REMOTE_DESTINATION
  55. access = Access(self._rights, user, path)
  56. if not access.check("w"):
  57. return httputils.NOT_ALLOWED
  58. to_path = pathutils.sanitize_path(to_url.path)
  59. if not (to_path + "/").startswith(base_prefix + "/"):
  60. logger.warning("Destination %r from MOVE request on %r doesn't "
  61. "start with base prefix", to_path, path)
  62. return httputils.NOT_ALLOWED
  63. to_path = to_path[len(base_prefix):]
  64. to_access = Access(self._rights, user, to_path)
  65. if not to_access.check("w"):
  66. return httputils.NOT_ALLOWED
  67. with self._storage.acquire_lock("w", user):
  68. item = next(iter(self._storage.discover(path)), None)
  69. if not item:
  70. return httputils.NOT_FOUND
  71. if (not access.check("w", item) or
  72. not to_access.check("w", item)):
  73. return httputils.NOT_ALLOWED
  74. if isinstance(item, storage.BaseCollection):
  75. # TODO: support moving collections
  76. return httputils.METHOD_NOT_ALLOWED
  77. to_item = next(iter(self._storage.discover(to_path)), None)
  78. if isinstance(to_item, storage.BaseCollection):
  79. return httputils.FORBIDDEN
  80. to_parent_path = pathutils.unstrip_path(
  81. posixpath.dirname(pathutils.strip_path(to_path)), True)
  82. to_collection = next(iter(
  83. self._storage.discover(to_parent_path)), None)
  84. if not to_collection:
  85. return httputils.CONFLICT
  86. assert isinstance(to_collection, storage.BaseCollection)
  87. assert item.collection is not None
  88. collection_tag = item.collection.tag
  89. if not collection_tag or collection_tag != to_collection.tag:
  90. return httputils.FORBIDDEN
  91. if to_item and environ.get("HTTP_OVERWRITE", "F") != "T":
  92. return httputils.PRECONDITION_FAILED
  93. if (to_item and item.uid != to_item.uid or
  94. not to_item and
  95. to_collection.path != item.collection.path and
  96. to_collection.has_uid(item.uid)):
  97. return self._webdav_error_response(
  98. client.CONFLICT, "%s:no-uid-conflict" % (
  99. "C" if collection_tag == "VCALENDAR" else "CR"))
  100. to_href = posixpath.basename(pathutils.strip_path(to_path))
  101. try:
  102. self._storage.move(item, to_collection, to_href)
  103. except ValueError as e:
  104. logger.warning(
  105. "Bad MOVE request on %r: %s", path, e, exc_info=True)
  106. return httputils.BAD_REQUEST
  107. return client.NO_CONTENT if to_item else client.CREATED, {}, None