move.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2008 Nicolas Kandel
  3. # Copyright © 2008 Pascal Halter
  4. # Copyright © 2008-2017 Guillaume Ayoub
  5. # Copyright © 2017-2023 Unrud <unrud@outlook.com>
  6. # Copyright © 2023-2025 Peter Bieringer <pb@bieringer.de>
  7. #
  8. # This library is free software: you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation, either version 3 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  20. import errno
  21. import posixpath
  22. import re
  23. from http import client
  24. from urllib.parse import urlparse
  25. from radicale import httputils, pathutils, storage, types
  26. from radicale.app.base import Access, ApplicationBase
  27. from radicale.log import logger
  28. def get_server_netloc(environ: types.WSGIEnviron, force_port: bool = False):
  29. if environ.get("HTTP_X_FORWARDED_HOST"):
  30. host = environ["HTTP_X_FORWARDED_HOST"]
  31. proto = environ.get("HTTP_X_FORWARDED_PROTO") or "http"
  32. port = "443" if proto == "https" else "80"
  33. port = environ["HTTP_X_FORWARDED_PORT"] or port
  34. else:
  35. host = environ.get("HTTP_HOST") or environ["SERVER_NAME"]
  36. proto = environ["wsgi.url_scheme"]
  37. port = environ["SERVER_PORT"]
  38. if (not force_port and port == ("443" if proto == "https" else "80") or
  39. re.search(r":\d+$", host)):
  40. return host
  41. return host + ":" + port
  42. class ApplicationPartMove(ApplicationBase):
  43. def do_MOVE(self, environ: types.WSGIEnviron, base_prefix: str,
  44. path: str, user: str) -> types.WSGIResponse:
  45. """Manage MOVE request."""
  46. raw_dest = environ.get("HTTP_DESTINATION", "")
  47. to_url = urlparse(raw_dest)
  48. to_netloc_with_port = to_url.netloc
  49. if to_url.port is None:
  50. to_netloc_with_port += (":443" if to_url.scheme == "https"
  51. else ":80")
  52. if to_netloc_with_port != get_server_netloc(environ, force_port=True):
  53. logger.info("Unsupported destination address: %r", raw_dest)
  54. # Remote destination server, not supported
  55. return httputils.REMOTE_DESTINATION
  56. access = Access(self._rights, user, path)
  57. if not access.check("w"):
  58. return httputils.NOT_ALLOWED
  59. to_path = pathutils.sanitize_path(to_url.path)
  60. if not (to_path + "/").startswith(base_prefix + "/"):
  61. logger.warning("Destination %r from MOVE request on %r doesn't "
  62. "start with base prefix", to_path, path)
  63. return httputils.NOT_ALLOWED
  64. to_path = to_path[len(base_prefix):]
  65. to_access = Access(self._rights, user, to_path)
  66. if not to_access.check("w"):
  67. return httputils.NOT_ALLOWED
  68. with self._storage.acquire_lock("w", user):
  69. item = next(iter(self._storage.discover(path)), None)
  70. if not item:
  71. return httputils.NOT_FOUND
  72. if (not access.check("w", item) or
  73. not to_access.check("w", item)):
  74. return httputils.NOT_ALLOWED
  75. if isinstance(item, storage.BaseCollection):
  76. # TODO: support moving collections
  77. return httputils.METHOD_NOT_ALLOWED
  78. to_item = next(iter(self._storage.discover(to_path)), None)
  79. if isinstance(to_item, storage.BaseCollection):
  80. return httputils.FORBIDDEN
  81. to_parent_path = pathutils.unstrip_path(
  82. posixpath.dirname(pathutils.strip_path(to_path)), True)
  83. to_collection = next(iter(
  84. self._storage.discover(to_parent_path)), None)
  85. if not to_collection:
  86. return httputils.CONFLICT
  87. assert isinstance(to_collection, storage.BaseCollection)
  88. assert item.collection is not None
  89. collection_tag = item.collection.tag
  90. if not collection_tag or collection_tag != to_collection.tag:
  91. return httputils.FORBIDDEN
  92. if to_item and environ.get("HTTP_OVERWRITE", "F") != "T":
  93. return httputils.PRECONDITION_FAILED
  94. if (to_item and item.uid != to_item.uid or
  95. not to_item and
  96. to_collection.path != item.collection.path and
  97. to_collection.has_uid(item.uid)):
  98. return self._webdav_error_response(
  99. client.CONFLICT, "%s:no-uid-conflict" % (
  100. "C" if collection_tag == "VCALENDAR" else "CR"))
  101. to_href = posixpath.basename(pathutils.strip_path(to_path))
  102. try:
  103. self._storage.move(item, to_collection, to_href)
  104. except ValueError as e:
  105. # return better matching HTTP result in case errno is provided and catched
  106. errno_match = re.search("\\[Errno ([0-9]+)\\]", str(e))
  107. if errno_match:
  108. logger.error(
  109. "Failed MOVE request on %r: %s", path, e, exc_info=True)
  110. errno_e = int(errno_match.group(1))
  111. if errno_e == errno.ENOSPC:
  112. return httputils.INSUFFICIENT_STORAGE
  113. elif errno_e in [errno.EPERM, errno.EACCES]:
  114. return httputils.FORBIDDEN
  115. else:
  116. return httputils.INTERNAL_SERVER_ERROR
  117. else:
  118. logger.warning(
  119. "Bad MOVE request on %r: %s", path, e, exc_info=True)
  120. return httputils.BAD_REQUEST
  121. return client.NO_CONTENT if to_item else client.CREATED, {}, None