__init__.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import json
  2. from enum import Enum
  3. from typing import Sequence
  4. from radicale import pathutils, utils
  5. INTERNAL_TYPES: Sequence[str] = ("none", "rabbitmq")
  6. def load(configuration):
  7. """Load the storage module chosen in configuration."""
  8. return utils.load_plugin(
  9. INTERNAL_TYPES, "hook", "Hook", BaseHook, configuration)
  10. class BaseHook:
  11. def __init__(self, configuration):
  12. """Initialize BaseHook.
  13. ``configuration`` see ``radicale.config`` module.
  14. The ``configuration`` must not change during the lifetime of
  15. this object, it is kept as an internal reference.
  16. """
  17. self.configuration = configuration
  18. def notify(self, notification_item):
  19. """Upload a new or replace an existing item."""
  20. raise NotImplementedError
  21. class HookNotificationItemTypes(Enum):
  22. CPATCH = "cpatch"
  23. UPSERT = "upsert"
  24. DELETE = "delete"
  25. def _cleanup(path):
  26. sane_path = pathutils.strip_path(path)
  27. attributes = sane_path.split("/") if sane_path else []
  28. if len(attributes) < 2:
  29. return ""
  30. return attributes[0] + "/" + attributes[1]
  31. class HookNotificationItem:
  32. def __init__(self, notification_item_type, path, content):
  33. self.type = notification_item_type.value
  34. self.point = _cleanup(path)
  35. self.content = content
  36. def to_json(self):
  37. return json.dumps(
  38. self,
  39. default=lambda o: o.__dict__,
  40. sort_keys=True,
  41. indent=4
  42. )