proppatch.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2008 Nicolas Kandel
  3. # Copyright © 2008 Pascal Halter
  4. # Copyright © 2008-2017 Guillaume Ayoub
  5. # Copyright © 2017-2020 Unrud <unrud@outlook.com>
  6. # Copyright © 2020-2020 Tuna Celik <tuna@jakpark.com>
  7. # Copyright © 2025-2025 Peter Bieringer <pb@bieringer.de>
  8. #
  9. # This library is free software: you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by
  11. # the Free Software Foundation, either version 3 of the License, or
  12. # (at your option) any later version.
  13. #
  14. # This library is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License
  20. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  21. import socket
  22. import xml.etree.ElementTree as ET
  23. from http import client
  24. from typing import Dict, Optional, cast
  25. import defusedxml.ElementTree as DefusedET
  26. import radicale.item as radicale_item
  27. from radicale import httputils, storage, types, xmlutils
  28. from radicale.app.base import Access, ApplicationBase
  29. from radicale.hook import HookNotificationItem, HookNotificationItemTypes
  30. from radicale.log import logger
  31. def xml_proppatch(base_prefix: str, path: str,
  32. xml_request: Optional[ET.Element],
  33. collection: storage.BaseCollection) -> ET.Element:
  34. """Read and answer PROPPATCH requests.
  35. Read rfc4918-9.2 for info.
  36. """
  37. multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
  38. response = ET.Element(xmlutils.make_clark("D:response"))
  39. multistatus.append(response)
  40. href = ET.Element(xmlutils.make_clark("D:href"))
  41. href.text = xmlutils.make_href(base_prefix, path)
  42. response.append(href)
  43. # Create D:propstat element for props with status 200 OK
  44. propstat = ET.Element(xmlutils.make_clark("D:propstat"))
  45. status = ET.Element(xmlutils.make_clark("D:status"))
  46. status.text = xmlutils.make_response(200)
  47. props_ok = ET.Element(xmlutils.make_clark("D:prop"))
  48. propstat.append(props_ok)
  49. propstat.append(status)
  50. response.append(propstat)
  51. props_with_remove = xmlutils.props_from_request(xml_request)
  52. all_props_with_remove = cast(Dict[str, Optional[str]],
  53. dict(collection.get_meta()))
  54. all_props_with_remove.update(props_with_remove)
  55. all_props = radicale_item.check_and_sanitize_props(all_props_with_remove)
  56. collection.set_meta(all_props)
  57. for short_name in props_with_remove:
  58. props_ok.append(ET.Element(xmlutils.make_clark(short_name)))
  59. return multistatus
  60. class ApplicationPartProppatch(ApplicationBase):
  61. def do_PROPPATCH(self, environ: types.WSGIEnviron, base_prefix: str,
  62. path: str, user: str) -> types.WSGIResponse:
  63. """Manage PROPPATCH request."""
  64. access = Access(self._rights, user, path)
  65. if not access.check("w"):
  66. return httputils.NOT_ALLOWED
  67. try:
  68. xml_content = self._read_xml_request_body(environ)
  69. except RuntimeError as e:
  70. logger.warning(
  71. "Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
  72. return httputils.BAD_REQUEST
  73. except socket.timeout:
  74. logger.debug("Client timed out", exc_info=True)
  75. return httputils.REQUEST_TIMEOUT
  76. with self._storage.acquire_lock("w", user):
  77. item = next(iter(self._storage.discover(path)), None)
  78. if not item:
  79. return httputils.NOT_FOUND
  80. if not access.check("w", item):
  81. return httputils.NOT_ALLOWED
  82. if not isinstance(item, storage.BaseCollection):
  83. return httputils.FORBIDDEN
  84. headers = {"DAV": httputils.DAV_HEADERS,
  85. "Content-Type": "text/xml; charset=%s" % self._encoding}
  86. try:
  87. xml_answer = xml_proppatch(base_prefix, path, xml_content,
  88. item)
  89. if xml_content is not None:
  90. hook_notification_item = HookNotificationItem(
  91. HookNotificationItemTypes.CPATCH,
  92. access.path,
  93. DefusedET.tostring(
  94. xml_content,
  95. encoding=self._encoding
  96. ).decode(encoding=self._encoding)
  97. )
  98. self._hook.notify(hook_notification_item)
  99. except ValueError as e:
  100. logger.warning(
  101. "Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
  102. return httputils.BAD_REQUEST
  103. return client.MULTI_STATUS, headers, self._xml_response(xml_answer)