Răsfoiți Sursa

Addd hook capability

Tuna Celik 5 ani în urmă
părinte
comite
5253a464ab

+ 2 - 1
radicale/app/__init__.py

@@ -41,7 +41,7 @@ import defusedxml.ElementTree as DefusedET
 import pkg_resources
 
 from radicale import (auth, httputils, log, pathutils, rights, storage, web,
-                      xmlutils)
+                      xmlutils, hook)
 from radicale.app.delete import ApplicationDeleteMixin
 from radicale.app.get import ApplicationGetMixin
 from radicale.app.head import ApplicationHeadMixin
@@ -82,6 +82,7 @@ class Application(
         self._rights = rights.load(configuration)
         self._web = web.load(configuration)
         self._encoding = configuration.get("encoding", "request")
+        self._hook = hook.load(configuration)
 
     def _headers_log(self, environ):
         """Sanitize headers for logging."""

+ 4 - 0
radicale/app/delete.py

@@ -21,6 +21,7 @@ from http import client
 from xml.etree import ElementTree as ET
 
 from radicale import app, httputils, storage, xmlutils
+from radicale.hook.rabbitmq import QueueItem, QueueItemTypes
 
 
 def xml_delete(base_prefix, path, collection, href=None):
@@ -64,8 +65,11 @@ class ApplicationDeleteMixin:
                 return httputils.PRECONDITION_FAILED
             if isinstance(item, storage.BaseCollection):
                 xml_answer = xml_delete(base_prefix, path, item)
+                for item in item.get_all():
+                    self._hook.notify(QueueItem(QueueItemTypes.DELETE, item.uid))
             else:
                 xml_answer = xml_delete(
                     base_prefix, path, item.collection, item.href)
+                self._hook.notify(QueueItem(QueueItemTypes.DELETE, item.uid))
             headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
             return client.OK, headers, self._write_xml_content(xml_answer)

+ 4 - 0
radicale/app/put.py

@@ -29,6 +29,7 @@ from radicale import app, httputils
 from radicale import item as radicale_item
 from radicale import pathutils, rights, storage, xmlutils
 from radicale.log import logger
+from radicale.hook.rabbitmq import QueueItem, QueueItemTypes
 
 MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()}
 
@@ -193,6 +194,8 @@ class ApplicationPutMixin:
                 try:
                     etag = self._storage.create_collection(
                         path, prepared_items, props).etag
+                    for item in prepared_items:
+                        self._hook.notify(QueueItem(QueueItemTypes.UPSERT, item.serialize()))
                 except ValueError as e:
                     logger.warning(
                         "Bad PUT request on %r: %s", path, e, exc_info=True)
@@ -208,6 +211,7 @@ class ApplicationPutMixin:
                 href = posixpath.basename(pathutils.strip_path(path))
                 try:
                     etag = parent_item.upload(href, prepared_item).etag
+                    self._hook.notify(QueueItem(QueueItemTypes.UPSERT, prepared_item.serialize()))
                 except ValueError as e:
                     logger.warning(
                         "Bad PUT request on %r: %s", path, e, exc_info=True)

+ 15 - 1
radicale/config.py

@@ -32,7 +32,7 @@ import string
 from collections import OrderedDict
 from configparser import RawConfigParser
 
-from radicale import auth, rights, storage, web
+from radicale import auth, rights, storage, web, hook
 
 DEFAULT_CONFIG_PATH = os.pathsep.join([
     "?/etc/radicale/config",
@@ -207,6 +207,20 @@ DEFAULT_CONFIG_SCHEMA = OrderedDict([
             "value": "True",
             "help": "sync all changes to filesystem during requests",
             "type": bool})])),
+    ("hook", OrderedDict([
+        ("type", {
+            "value": "none",
+            "help": "hook backend",
+            "type": str,
+            "internal": hook.INTERNAL_TYPES}),
+        ("rabbitmq_endpoint", {
+            "value": "",
+            "help": "endpoint where rabbitmq server is running",
+            "type": str}),
+        ("rabbitmq_topic", {
+            "value": "",
+            "help": "topic to declare queue",
+            "type": str})])),
     ("web", OrderedDict([
         ("type", {
             "value": "internal",

+ 25 - 0
radicale/hook/__init__.py

@@ -0,0 +1,25 @@
+from radicale import utils
+
+INTERNAL_TYPES = ("none", "rabbitmq")
+
+
+def load(configuration):
+    """Load the storage module chosen in configuration."""
+    return utils.load_plugin(
+        INTERNAL_TYPES, "hook", "Hook", configuration)
+
+
+class BaseHook:
+    def __init__(self, configuration):
+        """Initialize BaseHook.
+
+        ``configuration`` see ``radicale.config`` module.
+        The ``configuration`` must not change during the lifetime of
+        this object, it is kept as an internal reference.
+
+        """
+        self.configuration = configuration
+
+    def notify(self, content):
+        """Upload a new or replace an existing item."""
+        raise NotImplementedError

+ 6 - 0
radicale/hook/none.py

@@ -0,0 +1,6 @@
+from radicale import hook
+
+
+class Hook(hook.BaseHook):
+    def notify(self, content):
+        """Notify nothing. Empty hook."""

+ 47 - 0
radicale/hook/rabbitmq/__init__.py

@@ -0,0 +1,47 @@
+import pika
+import json
+
+from radicale import hook
+from enum import Enum
+
+
+class Hook(hook.BaseHook):
+
+    def __init__(self, configuration):
+        super().__init__(configuration)
+        endpoint = configuration.get("hook", "rabbitmq_endpoint")
+        self.topic = configuration.get("hook", "rabbitmq_topic")
+        self.encoding = configuration.get("encoding", "stock")
+
+        self._make_connection_synced(endpoint)
+        self._make_declare_queue_synced(self.topic)
+
+    def _make_connection_synced(self, endpoint):
+        parameters = pika.URLParameters(endpoint)
+        self.connection = pika.BlockingConnection(parameters)
+        self.channel = self.connection.channel()
+
+    def _make_declare_queue_synced(self, topic):
+        self.channel.queue_declare(queue=topic)
+
+    def notify(self, content):
+        if not isinstance(content, QueueItem):
+            return
+        self.channel.basic_publish(exchange='',
+                                   routing_key=self.topic,
+                                   body=content.to_json().encode(encoding=self.encoding))
+
+
+class QueueItemTypes(Enum):
+    UPSERT = "upsert"
+    DELETE = "delete"
+
+
+class QueueItem:
+
+    def __init__(self, queue_item_type, content):
+        self.type = queue_item_type.value
+        self.content = content
+
+    def to_json(self):
+        return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)