httputils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2008 Nicolas Kandel
  3. # Copyright © 2008 Pascal Halter
  4. # Copyright © 2008-2017 Guillaume Ayoub
  5. # Copyright © 2017-2022 Unrud <unrud@outlook.com>
  6. # Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
  7. #
  8. # This library is free software: you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation, either version 3 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  20. """
  21. Helper functions for HTTP.
  22. """
  23. import contextlib
  24. import logging
  25. import os
  26. import pathlib
  27. import sys
  28. import time
  29. from http import client
  30. from typing import List, Mapping, Union, cast
  31. from radicale import config, pathutils, types, utils
  32. from radicale.log import logger
  33. if sys.version_info < (3, 9):
  34. import pkg_resources
  35. _TRAVERSABLE_LIKE_TYPE = pathlib.Path
  36. else:
  37. import importlib.abc
  38. from importlib import resources
  39. if sys.version_info < (3, 13):
  40. _TRAVERSABLE_LIKE_TYPE = Union[importlib.abc.Traversable, pathlib.Path]
  41. else:
  42. _TRAVERSABLE_LIKE_TYPE = Union[importlib.resources.abc.Traversable, pathlib.Path]
  43. NOT_ALLOWED: types.WSGIResponse = (
  44. client.FORBIDDEN, (("Content-Type", "text/plain"),),
  45. "Access to the requested resource forbidden.", None)
  46. FORBIDDEN: types.WSGIResponse = (
  47. client.FORBIDDEN, (("Content-Type", "text/plain"),),
  48. "Action on the requested resource refused.", None)
  49. BAD_REQUEST: types.WSGIResponse = (
  50. client.BAD_REQUEST, (("Content-Type", "text/plain"),), "Bad Request", None)
  51. NOT_FOUND: types.WSGIResponse = (
  52. client.NOT_FOUND, (("Content-Type", "text/plain"),),
  53. "The requested resource could not be found.", None)
  54. CONFLICT: types.WSGIResponse = (
  55. client.CONFLICT, (("Content-Type", "text/plain"),),
  56. "Conflict in the request.", None)
  57. METHOD_NOT_ALLOWED: types.WSGIResponse = (
  58. client.METHOD_NOT_ALLOWED, (("Content-Type", "text/plain"),),
  59. "The method is not allowed on the requested resource.", None)
  60. PRECONDITION_FAILED: types.WSGIResponse = (
  61. client.PRECONDITION_FAILED,
  62. (("Content-Type", "text/plain"),), "Precondition failed.", None)
  63. REQUEST_TIMEOUT: types.WSGIResponse = (
  64. client.REQUEST_TIMEOUT, (("Content-Type", "text/plain"),),
  65. "Connection timed out.", None)
  66. REQUEST_ENTITY_TOO_LARGE: types.WSGIResponse = (
  67. client.REQUEST_ENTITY_TOO_LARGE, (("Content-Type", "text/plain"),),
  68. "Request body too large.", None)
  69. REMOTE_DESTINATION: types.WSGIResponse = (
  70. client.BAD_GATEWAY, (("Content-Type", "text/plain"),),
  71. "Remote destination not supported.", None)
  72. DIRECTORY_LISTING: types.WSGIResponse = (
  73. client.FORBIDDEN, (("Content-Type", "text/plain"),),
  74. "Directory listings are not supported.", None)
  75. INSUFFICIENT_STORAGE: types.WSGIResponse = (
  76. client.INSUFFICIENT_STORAGE, (("Content-Type", "text/plain"),),
  77. "Insufficient Storage. Please contact the administrator.", None)
  78. INTERNAL_SERVER_ERROR: types.WSGIResponse = (
  79. client.INTERNAL_SERVER_ERROR, (("Content-Type", "text/plain"),),
  80. "A server error occurred. Please contact the administrator.", None)
  81. DAV_HEADERS: str = "1, 2, 3, calendar-access, addressbook, extended-mkcol"
  82. MIMETYPES: Mapping[str, str] = {
  83. ".css": "text/css",
  84. ".eot": "application/vnd.ms-fontobject",
  85. ".gif": "image/gif",
  86. ".html": "text/html",
  87. ".js": "application/javascript",
  88. ".manifest": "text/cache-manifest",
  89. ".png": "image/png",
  90. ".svg": "image/svg+xml",
  91. ".ttf": "application/font-sfnt",
  92. ".txt": "text/plain",
  93. ".woff": "application/font-woff",
  94. ".woff2": "font/woff2",
  95. ".xml": "text/xml"}
  96. FALLBACK_MIMETYPE: str = "application/octet-stream"
  97. def decode_request(configuration: "config.Configuration",
  98. environ: types.WSGIEnviron, text: bytes) -> str:
  99. """Try to magically decode ``text`` according to given ``environ``."""
  100. # List of charsets to try
  101. charsets: List[str] = []
  102. # First append content charset given in the request
  103. content_type = environ.get("CONTENT_TYPE")
  104. if content_type and "charset=" in content_type:
  105. charsets.append(
  106. content_type.split("charset=")[1].split(";")[0].strip())
  107. # Then append default Radicale charset
  108. charsets.append(cast(str, configuration.get("encoding", "request")))
  109. # Then append various fallbacks
  110. charsets.append("utf-8")
  111. charsets.append("iso8859-1")
  112. # Remove duplicates
  113. for i, s in reversed(list(enumerate(charsets))):
  114. if s in charsets[:i]:
  115. del charsets[i]
  116. # Try to decode
  117. for charset in charsets:
  118. with contextlib.suppress(UnicodeDecodeError):
  119. return text.decode(charset)
  120. raise UnicodeDecodeError("decode_request", text, 0, len(text),
  121. "all codecs failed [%s]" % ", ".join(charsets))
  122. def read_raw_request_body(configuration: "config.Configuration",
  123. environ: types.WSGIEnviron) -> bytes:
  124. content_length = int(environ.get("CONTENT_LENGTH") or 0)
  125. if not content_length:
  126. return b""
  127. content = environ["wsgi.input"].read(content_length)
  128. if len(content) < content_length:
  129. raise RuntimeError("Request body too short: %d" % len(content))
  130. return content
  131. def read_request_body(configuration: "config.Configuration",
  132. environ: types.WSGIEnviron) -> str:
  133. content = decode_request(configuration, environ,
  134. read_raw_request_body(configuration, environ))
  135. if configuration.get("logging", "request_content_on_debug"):
  136. if logger.isEnabledFor(logging.DEBUG):
  137. logger.debug("Request content (sha256sum): %s", utils.sha256_str(content))
  138. logger.debug("Request content:\n%s", utils.textwrap_str(content))
  139. else:
  140. if logger.isEnabledFor(logging.DEBUG):
  141. logger.debug("Request content: suppressed by config/option [logging] request_content_on_debug")
  142. return content
  143. def redirect(location: str, status: int = client.FOUND) -> types.WSGIResponse:
  144. return (status,
  145. {"Location": location, "Content-Type": "text/plain"},
  146. "Redirected to %s" % location, None)
  147. def _serve_traversable(
  148. traversable: _TRAVERSABLE_LIKE_TYPE, base_prefix: str, path: str,
  149. path_prefix: str, index_file: str, mimetypes: Mapping[str, str],
  150. fallback_mimetype: str) -> types.WSGIResponse:
  151. if path != path_prefix and not path.startswith(path_prefix):
  152. raise ValueError("path must start with path_prefix: %r --> %r" %
  153. (path_prefix, path))
  154. assert pathutils.sanitize_path(path) == path
  155. parts_path = path[len(path_prefix):].strip('/')
  156. parts = parts_path.split("/") if parts_path else []
  157. for part in parts:
  158. if not pathutils.is_safe_filesystem_path_component(part):
  159. logger.debug("Web content with unsafe path %r requested", path)
  160. return NOT_FOUND
  161. if (not traversable.is_dir() or
  162. all(part != entry.name for entry in traversable.iterdir())):
  163. return NOT_FOUND
  164. traversable = traversable.joinpath(part)
  165. if traversable.is_dir():
  166. if not path.endswith("/"):
  167. return redirect(base_prefix + path + "/")
  168. if not index_file:
  169. return NOT_FOUND
  170. traversable = traversable.joinpath(index_file)
  171. if not traversable.is_file():
  172. return NOT_FOUND
  173. content_type = MIMETYPES.get(
  174. os.path.splitext(traversable.name)[1].lower(), FALLBACK_MIMETYPE)
  175. headers = {"Content-Type": content_type}
  176. if isinstance(traversable, pathlib.Path):
  177. headers["Last-Modified"] = time.strftime(
  178. "%a, %d %b %Y %H:%M:%S GMT",
  179. time.gmtime(traversable.stat().st_mtime))
  180. answer = traversable.read_bytes()
  181. if path == "/.web/index.html" or path == "/.web/":
  182. # enable link on the fly in index.html if InfCloud index.html is existing
  183. # class="infcloudlink-hidden" -> class="infcloudlink"
  184. path_posix = str(traversable)
  185. path_posix_infcloud = path_posix.replace("/internal_data/index.html", "/internal_data/infcloud/index.html")
  186. if os.path.isfile(path_posix_infcloud):
  187. # logger.debug("Enable InfCloud link in served page: %r", path)
  188. answer = answer.replace(b"infcloudlink-hidden", b"infcloud")
  189. elif path == "/.web/infcloud/config.js":
  190. # adjust on the fly default config.js of InfCloud installation
  191. # logger.debug("Adjust on-the-fly default InfCloud config.js in served page: %r", path)
  192. answer = answer.replace(b"location.pathname.replace(RegExp('/+[^/]+/*(index\\.html)?$'),'')+", b"location.pathname.replace(RegExp('/\\.web\\.infcloud/(index\\.html)?$'),'')+")
  193. answer = answer.replace(b"'/caldav.php/',", b"'/',")
  194. answer = answer.replace(b"settingsAccount: true,", b"settingsAccount: false,")
  195. elif path == "/.web/infcloud/main.js":
  196. # adjust on the fly default main.js of InfCloud installation
  197. logger.debug("Adjust on-the-fly default InfCloud main.js in served page: %r", path)
  198. answer = answer.replace(b"'InfCloud - the open source CalDAV/CardDAV web client'", b"'InfCloud - the open source CalDAV/CardDAV web client - served through Radicale CalDAV/CardDAV server'")
  199. return client.OK, headers, answer, None
  200. def serve_resource(
  201. package: str, resource: str, base_prefix: str, path: str,
  202. path_prefix: str = "/.web", index_file: str = "index.html",
  203. mimetypes: Mapping[str, str] = MIMETYPES,
  204. fallback_mimetype: str = FALLBACK_MIMETYPE) -> types.WSGIResponse:
  205. if sys.version_info < (3, 9):
  206. traversable = pathlib.Path(
  207. pkg_resources.resource_filename(package, resource))
  208. else:
  209. traversable = resources.files(package).joinpath(resource)
  210. return _serve_traversable(traversable, base_prefix, path, path_prefix,
  211. index_file, mimetypes, fallback_mimetype)
  212. def serve_folder(
  213. folder: str, base_prefix: str, path: str,
  214. path_prefix: str = "/.web", index_file: str = "index.html",
  215. mimetypes: Mapping[str, str] = MIMETYPES,
  216. fallback_mimetype: str = FALLBACK_MIMETYPE) -> types.WSGIResponse:
  217. # deprecated: use `serve_resource` instead
  218. traversable = pathlib.Path(folder)
  219. return _serve_traversable(traversable, base_prefix, path, path_prefix,
  220. index_file, mimetypes, fallback_mimetype)