| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import json
- from enum import Enum
- from typing import Sequence
- from radicale import pathutils, utils
- INTERNAL_TYPES: Sequence[str] = ("none", "rabbitmq")
- def load(configuration):
- """Load the storage module chosen in configuration."""
- return utils.load_plugin(
- INTERNAL_TYPES, "hook", "Hook", BaseHook, configuration)
- class BaseHook:
- def __init__(self, configuration):
- """Initialize BaseHook.
- ``configuration`` see ``radicale.config`` module.
- The ``configuration`` must not change during the lifetime of
- this object, it is kept as an internal reference.
- """
- self.configuration = configuration
- def notify(self, notification_item):
- """Upload a new or replace an existing item."""
- raise NotImplementedError
- class HookNotificationItemTypes(Enum):
- CPATCH = "cpatch"
- UPSERT = "upsert"
- DELETE = "delete"
- def _cleanup(path):
- sane_path = pathutils.strip_path(path)
- attributes = sane_path.split("/") if sane_path else []
- if len(attributes) < 2:
- return ""
- return attributes[0] + "/" + attributes[1]
- class HookNotificationItem:
- def __init__(self, notification_item_type, path, content):
- self.type = notification_item_type.value
- self.point = _cleanup(path)
- self.content = content
- def to_json(self):
- return json.dumps(
- self,
- default=lambda o: o.__dict__,
- sort_keys=True,
- indent=4
- )
|