__init__.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2020-2024 Tuna Celik <tuna@jakpark.com>
  3. # Copyright © 2025-2025 Peter Bieringer <pb@bieringer.de>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. import pika
  18. from pika.exceptions import ChannelWrongStateError, StreamLostError
  19. from radicale import hook
  20. from radicale.hook import HookNotificationItem
  21. from radicale.log import logger
  22. class Hook(hook.BaseHook):
  23. def __init__(self, configuration):
  24. super().__init__(configuration)
  25. self._endpoint = configuration.get("hook", "rabbitmq_endpoint")
  26. self._topic = configuration.get("hook", "rabbitmq_topic")
  27. self._queue_type = configuration.get("hook", "rabbitmq_queue_type")
  28. self._encoding = configuration.get("encoding", "stock")
  29. self._dryrun = configuration.get("hook", "dryrun")
  30. logger.info("Hook 'rabbitmq': endpoint=%r topic=%r queue_type=%r dryrun=%s", self._endpoint, self._topic, self._queue_type, self._dryrun)
  31. self._make_connection_synced()
  32. self._make_declare_queue_synced()
  33. def _make_connection_synced(self):
  34. parameters = pika.URLParameters(self._endpoint)
  35. if self._dryrun is True:
  36. logger.warning("Hook 'rabbitmq': DRY-RUN _make_connection_synced / parameters=%r", parameters)
  37. return
  38. logger.debug("Hook 'rabbitmq': _make_connection_synced / parameters=%r", parameters)
  39. connection = pika.BlockingConnection(parameters)
  40. self._channel = connection.channel()
  41. def _make_declare_queue_synced(self):
  42. if self._dryrun is True:
  43. logger.warning("Hook 'rabbitmq': DRY-RUN _make_declare_queue_synced")
  44. return
  45. logger.debug("Hook 'rabbitmq': _make_declare_queue_synced")
  46. self._channel.queue_declare(queue=self._topic, durable=True, arguments={"x-queue-type": self._queue_type})
  47. def notify(self, notification_item):
  48. if isinstance(notification_item, HookNotificationItem):
  49. self._notify(notification_item, True)
  50. def _notify(self, notification_item, recall):
  51. if self._dryrun is True:
  52. logger.warning("Hook 'rabbitmq': DRY-RUN _notify / notification_item: %r", vars(notification_item))
  53. return
  54. try:
  55. self._channel.basic_publish(
  56. exchange='',
  57. routing_key=self._topic,
  58. body=notification_item.to_json().encode(
  59. encoding=self._encoding
  60. )
  61. )
  62. except Exception as e:
  63. if (isinstance(e, ChannelWrongStateError) or
  64. isinstance(e, StreamLostError)) and recall:
  65. self._make_connection_synced()
  66. self._notify(notification_item, False)
  67. return
  68. logger.error("An exception occurred during "
  69. "publishing hook notification item: %s",
  70. e, exc_info=True)