test_hook_rabbitmq.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2025-2025 Peter Bieringer <pb@bieringer.de>
  3. #
  4. # This library is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This library is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  16. """
  17. Radicale tests related to hook 'rabbitmq'
  18. """
  19. import logging
  20. import os
  21. import pytest
  22. from radicale.tests import BaseTest
  23. from radicale.tests.helpers import get_file_content
  24. class TestHooks(BaseTest):
  25. """Tests with hooks."""
  26. # test for available pika module
  27. try:
  28. import pika
  29. except ImportError:
  30. has_pika = 0
  31. else:
  32. has_pika = 1
  33. def setup_method(self) -> None:
  34. BaseTest.setup_method(self)
  35. rights_file_path = os.path.join(self.colpath, "rights")
  36. with open(rights_file_path, "w") as f:
  37. f.write("""\
  38. [permit delete collection]
  39. user: .*
  40. collection: test-permit-delete
  41. permissions: RrWwD
  42. [forbid delete collection]
  43. user: .*
  44. collection: test-forbid-delete
  45. permissions: RrWwd
  46. [permit overwrite collection]
  47. user: .*
  48. collection: test-permit-overwrite
  49. permissions: RrWwO
  50. [forbid overwrite collection]
  51. user: .*
  52. collection: test-forbid-overwrite
  53. permissions: RrWwo
  54. [allow all]
  55. user: .*
  56. collection: .*
  57. permissions: RrWw""")
  58. self.configure({"rights": {"file": rights_file_path,
  59. "type": "from_file"}})
  60. self.configure({"hook": {"type": "rabbitmq",
  61. "dryrun": "True"}})
  62. @pytest.mark.skipif(has_pika == 0, reason="No pika module installed")
  63. def test_add_event(self, caplog) -> None:
  64. caplog.set_level(logging.WARNING)
  65. """Add an event."""
  66. self.mkcalendar("/calendar.ics/")
  67. event = get_file_content("event1.ics")
  68. path = "/calendar.ics/event1.ics"
  69. self.put(path, event)
  70. _, headers, answer = self.request("GET", path, check=200)
  71. assert "ETag" in headers
  72. assert headers["Content-Type"] == "text/calendar; charset=utf-8"
  73. assert "VEVENT" in answer
  74. assert "Event" in answer
  75. assert "UID:event" in answer
  76. found = False
  77. for line in caplog.messages:
  78. if line.find("notification_item: {'type': 'upsert'") != -1:
  79. found = True
  80. if (found is False):
  81. raise ValueError("Logging misses expected log line")
  82. @pytest.mark.skipif(has_pika == 0, reason="No pika module installed")
  83. def test_delete_event(self, caplog) -> None:
  84. caplog.set_level(logging.WARNING)
  85. """Delete an event."""
  86. self.mkcalendar("/calendar.ics/")
  87. event = get_file_content("event1.ics")
  88. path = "/calendar.ics/event1.ics"
  89. self.put(path, event)
  90. _, responses = self.delete(path)
  91. assert responses[path] == 200
  92. _, answer = self.get("/calendar.ics/")
  93. assert "VEVENT" not in answer
  94. found = False
  95. for line in caplog.messages:
  96. if line.find("notification_item: {'type': 'delete'") != -1:
  97. found = True
  98. if (found is False):
  99. raise ValueError("Logging misses expected log line")