Browse Source

Merge pull request #1092 from freakmaxi/master

Hook capability for event changes and deletions
Peter Bieringer 2 years ago
parent
commit
989cbefc64

+ 35 - 1
DOCUMENTATION.md

@@ -891,7 +891,41 @@ An example to relax the same-origin policy:
 Access-Control-Allow-Origin = *
 ```
 
-### Supported Clients
+#### hook
+##### type
+
+Hook binding for event changes and deletion notifications.
+
+Available types:
+
+`none`
+: Disabled. Nothing will be notified.
+
+`rabbitmq`
+: Push the message to the rabbitmq server.
+
+Default: `none`
+
+#### rabbitmq_endpoint
+
+End-point address for rabbitmq server.
+Ex: amqp://user:password@localhost:5672/   
+
+Default: 
+
+#### rabbitmq_topic
+
+RabbitMQ topic to publish message.
+
+Default:
+
+#### rabbitmq_queue_type
+
+RabbitMQ queue type for the topic.
+
+Default: classic
+
+## Supported Clients
 
 Radicale has been tested with:
 

+ 9 - 0
config

@@ -118,3 +118,12 @@
 
 # Additional HTTP headers
 #Access-Control-Allow-Origin = *
+
+[hook]
+
+# Hook types
+# Value: none | rabbitmq
+#type = none
+#rabbitmq_endpoint =
+#rabbitmq_topic =
+#rabbitmq_queue_type = classic

+ 4 - 2
radicale/app/base.py

@@ -21,8 +21,8 @@ import sys
 import xml.etree.ElementTree as ET
 from typing import Optional
 
-from radicale import (auth, config, httputils, pathutils, rights, storage,
-                      types, web, xmlutils)
+from radicale import (auth, config, hook, httputils, pathutils, rights,
+                      storage, types, web, xmlutils)
 from radicale.log import logger
 
 # HACK: https://github.com/tiran/defusedxml/issues/54
@@ -38,6 +38,7 @@ class ApplicationBase:
     _rights: rights.BaseRights
     _web: web.BaseWeb
     _encoding: str
+    _hook: hook.BaseHook
 
     def __init__(self, configuration: config.Configuration) -> None:
         self.configuration = configuration
@@ -46,6 +47,7 @@ class ApplicationBase:
         self._rights = rights.load(configuration)
         self._web = web.load(configuration)
         self._encoding = configuration.get("encoding", "request")
+        self._hook = hook.load(configuration)
 
     def _read_xml_request_body(self, environ: types.WSGIEnviron
                                ) -> Optional[ET.Element]:

+ 19 - 0
radicale/app/delete.py

@@ -23,6 +23,7 @@ from typing import Optional
 
 from radicale import httputils, storage, types, xmlutils
 from radicale.app.base import Access, ApplicationBase
+from radicale.hook import HookNotificationItem, HookNotificationItemTypes
 
 
 def xml_delete(base_prefix: str, path: str, collection: storage.BaseCollection,
@@ -67,12 +68,30 @@ class ApplicationPartDelete(ApplicationBase):
             if if_match not in ("*", item.etag):
                 # ETag precondition not verified, do not delete item
                 return httputils.PRECONDITION_FAILED
+            hook_notification_item_list = []
             if isinstance(item, storage.BaseCollection):
+                for i in item.get_all():
+                    hook_notification_item_list.append(
+                        HookNotificationItem(
+                            HookNotificationItemTypes.DELETE,
+                            access.path,
+                            i.uid
+                        )
+                    )
                 xml_answer = xml_delete(base_prefix, path, item)
             else:
                 assert item.collection is not None
                 assert item.href is not None
+                hook_notification_item_list.append(
+                    HookNotificationItem(
+                        HookNotificationItemTypes.DELETE,
+                        access.path,
+                        item.uid
+                    )
+                )
                 xml_answer = xml_delete(
                     base_prefix, path, item.collection, item.href)
+            for notification_item in hook_notification_item_list:
+                self._hook.notify(notification_item)
             headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
             return client.OK, headers, self._xml_response(xml_answer)

+ 13 - 0
radicale/app/proppatch.py

@@ -22,9 +22,12 @@ import xml.etree.ElementTree as ET
 from http import client
 from typing import Dict, Optional, cast
 
+import defusedxml.ElementTree as DefusedET
+
 import radicale.item as radicale_item
 from radicale import httputils, storage, types, xmlutils
 from radicale.app.base import Access, ApplicationBase
+from radicale.hook import HookNotificationItem, HookNotificationItemTypes
 from radicale.log import logger
 
 
@@ -93,6 +96,16 @@ class ApplicationPartProppatch(ApplicationBase):
             try:
                 xml_answer = xml_proppatch(base_prefix, path, xml_content,
                                            item)
+                if xml_content is not None:
+                    hook_notification_item = HookNotificationItem(
+                        HookNotificationItemTypes.CPATCH,
+                        access.path,
+                        DefusedET.tostring(
+                            xml_content,
+                            encoding=self._encoding
+                        ).decode(encoding=self._encoding)
+                    )
+                    self._hook.notify(hook_notification_item)
             except ValueError as e:
                 logger.warning(
                     "Bad PROPPATCH request on %r: %s", path, e, exc_info=True)

+ 14 - 0
radicale/app/put.py

@@ -30,6 +30,7 @@ import vobject
 import radicale.item as radicale_item
 from radicale import httputils, pathutils, rights, storage, types, xmlutils
 from radicale.app.base import Access, ApplicationBase
+from radicale.hook import HookNotificationItem, HookNotificationItemTypes
 from radicale.log import logger
 
 MIMETYPE_TAGS: Mapping[str, str] = {value: key for key, value in
@@ -206,6 +207,13 @@ class ApplicationPartPut(ApplicationBase):
                 try:
                     etag = self._storage.create_collection(
                         path, prepared_items, props).etag
+                    for item in prepared_items:
+                        hook_notification_item = HookNotificationItem(
+                            HookNotificationItemTypes.UPSERT,
+                            access.path,
+                            item.serialize()
+                        )
+                        self._hook.notify(hook_notification_item)
                 except ValueError as e:
                     logger.warning(
                         "Bad PUT request on %r: %s", path, e, exc_info=True)
@@ -222,6 +230,12 @@ class ApplicationPartPut(ApplicationBase):
                 href = posixpath.basename(pathutils.strip_path(path))
                 try:
                     etag = parent_item.upload(href, prepared_item).etag
+                    hook_notification_item = HookNotificationItem(
+                        HookNotificationItemTypes.UPSERT,
+                        access.path,
+                        prepared_item.serialize()
+                    )
+                    self._hook.notify(hook_notification_item)
                 except ValueError as e:
                     logger.warning(
                         "Bad PUT request on %r: %s", path, e, exc_info=True)

+ 19 - 1
radicale/config.py

@@ -35,7 +35,7 @@ from configparser import RawConfigParser
 from typing import (Any, Callable, ClassVar, Iterable, List, Optional,
                     Sequence, Tuple, TypeVar, Union)
 
-from radicale import auth, rights, storage, types, web
+from radicale import auth, hook, rights, storage, types, web
 
 DEFAULT_CONFIG_PATH: str = os.pathsep.join([
     "?/etc/radicale/config",
@@ -210,6 +210,24 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
             "value": "True",
             "help": "sync all changes to filesystem during requests",
             "type": bool})])),
+    ("hook", OrderedDict([
+        ("type", {
+            "value": "none",
+            "help": "hook backend",
+            "type": str,
+            "internal": hook.INTERNAL_TYPES}),
+        ("rabbitmq_endpoint", {
+            "value": "",
+            "help": "endpoint where rabbitmq server is running",
+            "type": str}),
+        ("rabbitmq_topic", {
+            "value": "",
+            "help": "topic to declare queue",
+            "type": str}),
+        ("rabbitmq_queue_type", {
+            "value": "",
+            "help": "queue type for topic declaration",
+            "type": str})])),
     ("web", OrderedDict([
         ("type", {
             "value": "internal",

+ 60 - 0
radicale/hook/__init__.py

@@ -0,0 +1,60 @@
+import json
+from enum import Enum
+from typing import Sequence
+
+from radicale import pathutils, utils
+
+INTERNAL_TYPES: Sequence[str] = ("none", "rabbitmq")
+
+
+def load(configuration):
+    """Load the storage module chosen in configuration."""
+    return utils.load_plugin(
+        INTERNAL_TYPES, "hook", "Hook", BaseHook, configuration)
+
+
+class BaseHook:
+    def __init__(self, configuration):
+        """Initialize BaseHook.
+
+        ``configuration`` see ``radicale.config`` module.
+        The ``configuration`` must not change during the lifetime of
+        this object, it is kept as an internal reference.
+
+        """
+        self.configuration = configuration
+
+    def notify(self, notification_item):
+        """Upload a new or replace an existing item."""
+        raise NotImplementedError
+
+
+class HookNotificationItemTypes(Enum):
+    CPATCH = "cpatch"
+    UPSERT = "upsert"
+    DELETE = "delete"
+
+
+def _cleanup(path):
+    sane_path = pathutils.strip_path(path)
+    attributes = sane_path.split("/") if sane_path else []
+
+    if len(attributes) < 2:
+        return ""
+    return attributes[0] + "/" + attributes[1]
+
+
+class HookNotificationItem:
+
+    def __init__(self, notification_item_type, path, content):
+        self.type = notification_item_type.value
+        self.point = _cleanup(path)
+        self.content = content
+
+    def to_json(self):
+        return json.dumps(
+            self,
+            default=lambda o: o.__dict__,
+            sort_keys=True,
+            indent=4
+        )

+ 6 - 0
radicale/hook/none.py

@@ -0,0 +1,6 @@
+from radicale import hook
+
+
+class Hook(hook.BaseHook):
+    def notify(self, notification_item):
+        """Notify nothing. Empty hook."""

+ 50 - 0
radicale/hook/rabbitmq/__init__.py

@@ -0,0 +1,50 @@
+import pika
+from pika.exceptions import ChannelWrongStateError, StreamLostError
+
+from radicale import hook
+from radicale.hook import HookNotificationItem
+from radicale.log import logger
+
+
+class Hook(hook.BaseHook):
+
+    def __init__(self, configuration):
+        super().__init__(configuration)
+        self._endpoint = configuration.get("hook", "rabbitmq_endpoint")
+        self._topic = configuration.get("hook", "rabbitmq_topic")
+        self._queue_type = configuration.get("hook", "rabbitmq_queue_type")
+        self._encoding = configuration.get("encoding", "stock")
+
+        self._make_connection_synced()
+        self._make_declare_queue_synced()
+
+    def _make_connection_synced(self):
+        parameters = pika.URLParameters(self._endpoint)
+        connection = pika.BlockingConnection(parameters)
+        self._channel = connection.channel()
+
+    def _make_declare_queue_synced(self):
+        self._channel.queue_declare(queue=self._topic, durable=True, arguments={"x-queue-type": self._queue_type})
+
+    def notify(self, notification_item):
+        if isinstance(notification_item, HookNotificationItem):
+            self._notify(notification_item, True)
+
+    def _notify(self, notification_item, recall):
+        try:
+            self._channel.basic_publish(
+                exchange='',
+                routing_key=self._topic,
+                body=notification_item.to_json().encode(
+                    encoding=self._encoding
+                )
+            )
+        except Exception as e:
+            if (isinstance(e, ChannelWrongStateError) or
+                    isinstance(e, StreamLostError)) and recall:
+                self._make_connection_synced()
+                self._notify(notification_item, False)
+                return
+            logger.error("An exception occurred during "
+                         "publishing hook notification item: %s",
+                         e, exc_info=True)

+ 1 - 1
setup.py

@@ -29,7 +29,7 @@ web_files = ["web/internal_data/css/icon.png",
              "web/internal_data/index.html"]
 
 install_requires = ["defusedxml", "passlib", "vobject>=0.9.6",
-                    "python-dateutil>=2.7.3",
+                    "python-dateutil>=2.7.3", "pika>=1.1.0",
                     "setuptools; python_version<'3.9'"]
 bcrypt_requires = ["passlib[bcrypt]", "bcrypt"]
 # typeguard requires pytest<7