test_storage.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2019 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for storage backends.
  19. """
  20. import os
  21. import shutil
  22. from typing import ClassVar, cast
  23. import pytest
  24. import radicale.tests.custom.storage_simple_sync
  25. from radicale.tests import BaseTest
  26. from radicale.tests.helpers import get_file_content
  27. from radicale.tests.test_base import TestBaseRequests as _TestBaseRequests
  28. class TestMultiFileSystem(BaseTest):
  29. """Tests for multifilesystem."""
  30. def setup_method(self) -> None:
  31. _TestBaseRequests.setup_method(cast(_TestBaseRequests, self))
  32. self.configure({"storage": {"type": "multifilesystem"}})
  33. def test_folder_creation(self) -> None:
  34. """Verify that the folder is created."""
  35. folder = os.path.join(self.colpath, "subfolder")
  36. self.configure({"storage": {"filesystem_folder": folder}})
  37. assert os.path.isdir(folder)
  38. def test_fsync(self) -> None:
  39. """Create a directory and file with syncing enabled."""
  40. self.configure({"storage": {"_filesystem_fsync": "True"}})
  41. self.mkcalendar("/calendar.ics/")
  42. def test_hook(self) -> None:
  43. """Run hook."""
  44. self.configure({"storage": {"hook": "mkdir %s" % os.path.join(
  45. "collection-root", "created_by_hook")}})
  46. self.mkcalendar("/calendar.ics/")
  47. self.propfind("/created_by_hook/")
  48. def test_hook_read_access(self) -> None:
  49. """Verify that hook is not run for read accesses."""
  50. self.configure({"storage": {"hook": "mkdir %s" % os.path.join(
  51. "collection-root", "created_by_hook")}})
  52. self.propfind("/")
  53. self.propfind("/created_by_hook/", check=404)
  54. @pytest.mark.skipif(not shutil.which("flock"),
  55. reason="flock command not found")
  56. def test_hook_storage_locked(self) -> None:
  57. """Verify that the storage is locked when the hook runs."""
  58. self.configure({"storage": {"hook": (
  59. "flock -n .Radicale.lock || exit 0; exit 1")}})
  60. self.mkcalendar("/calendar.ics/")
  61. def test_hook_principal_collection_creation(self) -> None:
  62. """Verify that the hooks runs when a new user is created."""
  63. self.configure({"storage": {"hook": "mkdir %s" % os.path.join(
  64. "collection-root", "created_by_hook")}})
  65. self.propfind("/", login="user:")
  66. self.propfind("/created_by_hook/")
  67. def test_hook_fail(self) -> None:
  68. """Verify that a request fails if the hook fails."""
  69. self.configure({"storage": {"hook": "exit 1"}})
  70. self.mkcalendar("/calendar.ics/", check=500)
  71. def test_item_cache_rebuild(self) -> None:
  72. """Delete the item cache and verify that it is rebuild."""
  73. self.mkcalendar("/calendar.ics/")
  74. event = get_file_content("event1.ics")
  75. path = "/calendar.ics/event1.ics"
  76. self.put(path, event)
  77. _, answer1 = self.get(path)
  78. cache_folder = os.path.join(self.colpath, "collection-root",
  79. "calendar.ics", ".Radicale.cache", "item")
  80. assert os.path.exists(os.path.join(cache_folder, "event1.ics"))
  81. shutil.rmtree(cache_folder)
  82. _, answer2 = self.get(path)
  83. assert answer1 == answer2
  84. assert os.path.exists(os.path.join(cache_folder, "event1.ics"))
  85. def test_put_whole_calendar_uids_used_as_file_names(self) -> None:
  86. """Test if UIDs are used as file names."""
  87. _TestBaseRequests.test_put_whole_calendar(
  88. cast(_TestBaseRequests, self))
  89. for uid in ("todo", "event"):
  90. _, answer = self.get("/calendar.ics/%s.ics" % uid)
  91. assert "\r\nUID:%s\r\n" % uid in answer
  92. def test_put_whole_calendar_random_uids_used_as_file_names(self) -> None:
  93. """Test if UIDs are used as file names."""
  94. _TestBaseRequests.test_put_whole_calendar_without_uids(
  95. cast(_TestBaseRequests, self))
  96. _, answer = self.get("/calendar.ics")
  97. assert answer is not None
  98. uids = []
  99. for line in answer.split("\r\n"):
  100. if line.startswith("UID:"):
  101. uids.append(line[len("UID:"):])
  102. for uid in uids:
  103. _, answer = self.get("/calendar.ics/%s.ics" % uid)
  104. assert answer is not None
  105. assert "\r\nUID:%s\r\n" % uid in answer
  106. def test_put_whole_addressbook_uids_used_as_file_names(self) -> None:
  107. """Test if UIDs are used as file names."""
  108. _TestBaseRequests.test_put_whole_addressbook(
  109. cast(_TestBaseRequests, self))
  110. for uid in ("contact1", "contact2"):
  111. _, answer = self.get("/contacts.vcf/%s.vcf" % uid)
  112. assert "\r\nUID:%s\r\n" % uid in answer
  113. def test_put_whole_addressbook_random_uids_used_as_file_names(
  114. self) -> None:
  115. """Test if UIDs are used as file names."""
  116. _TestBaseRequests.test_put_whole_addressbook_without_uids(
  117. cast(_TestBaseRequests, self))
  118. _, answer = self.get("/contacts.vcf")
  119. assert answer is not None
  120. uids = []
  121. for line in answer.split("\r\n"):
  122. if line.startswith("UID:"):
  123. uids.append(line[len("UID:"):])
  124. for uid in uids:
  125. _, answer = self.get("/contacts.vcf/%s.vcf" % uid)
  126. assert answer is not None
  127. assert "\r\nUID:%s\r\n" % uid in answer
  128. class TestMultiFileSystemNoLock(BaseTest):
  129. """Tests for multifilesystem_nolock."""
  130. def setup_method(self) -> None:
  131. _TestBaseRequests.setup_method(cast(_TestBaseRequests, self))
  132. self.configure({"storage": {"type": "multifilesystem_nolock"}})
  133. test_add_event = _TestBaseRequests.test_add_event
  134. test_item_cache_rebuild = TestMultiFileSystem.test_item_cache_rebuild
  135. class TestCustomStorageSystem(BaseTest):
  136. """Test custom backend loading."""
  137. def setup_method(self) -> None:
  138. _TestBaseRequests.setup_method(cast(_TestBaseRequests, self))
  139. self.configure({"storage": {
  140. "type": "radicale.tests.custom.storage_simple_sync"}})
  141. full_sync_token_support: ClassVar[bool] = False
  142. test_add_event = _TestBaseRequests.test_add_event
  143. _report_sync_token = _TestBaseRequests._report_sync_token
  144. # include tests related to sync token
  145. s: str = ""
  146. for s in dir(_TestBaseRequests):
  147. if s.startswith("test_") and "sync" in s.split("_"):
  148. locals()[s] = getattr(_TestBaseRequests, s)
  149. del s
  150. class TestCustomStorageSystemCallable(BaseTest):
  151. """Test custom backend loading with ``callable``."""
  152. def setup_method(self) -> None:
  153. _TestBaseRequests.setup_method(cast(_TestBaseRequests, self))
  154. self.configure({"storage": {
  155. "type": radicale.tests.custom.storage_simple_sync.Storage}})
  156. test_add_event = _TestBaseRequests.test_add_event