Guillaume Ayoub 9 лет назад
Родитель
Сommit
263f31c84b
5 измененных файлов с 297 добавлено и 319 удалено
  1. 232 269
      radicale/__init__.py
  2. 6 8
      radicale/rights.py
  3. 27 33
      radicale/storage.py
  4. 30 0
      radicale/tests/test_base.py
  5. 2 9
      radicale/xmlutils.py

+ 232 - 269
radicale/__init__.py

@@ -29,6 +29,7 @@ should have been included in this package.
 import base64
 import contextlib
 import os
+import posixpath
 import pprint
 import shlex
 import socket
@@ -38,6 +39,7 @@ import subprocess
 import threading
 import wsgiref.simple_server
 import zlib
+from contextlib import contextmanager
 from http import client
 from urllib.parse import unquote, urlparse
 
@@ -184,60 +186,31 @@ class Application:
 
     def collect_allowed_items(self, items, user):
         """Get items from request that user is allowed to access."""
-        read_last_collection_allowed = None
-        write_last_collection_allowed = None
         read_allowed_items = []
         write_allowed_items = []
-
         for item in items:
             if isinstance(item, self.Collection):
-                if self.authorized(user, item, "r"):
-                    self.logger.debug(
-                        "%s has read access to collection %s" %
-                        (user or "Anonymous", item.path or "/"))
-                    read_last_collection_allowed = True
-                    read_allowed_items.append(item)
-                else:
-                    self.logger.debug(
-                        "%s has NO read access to collection %s" %
-                        (user or "Anonymous", item.path or "/"))
-                    read_last_collection_allowed = False
-
-                if self.authorized(user, item, "w"):
-                    self.logger.debug(
-                        "%s has write access to collection %s" %
-                        (user or "Anonymous", item.path or "/"))
-                    write_last_collection_allowed = True
-                    write_allowed_items.append(item)
-                else:
-                    self.logger.debug(
-                        "%s has NO write access to collection %s" %
-                        (user or "Anonymous", item.path or "/"))
-                    write_last_collection_allowed = False
+                path = item.path
             else:
-                # item is not a collection, it's the child of the last
-                # collection we've met in the loop. Only add this item
-                # if this last collection was allowed.
-                if read_last_collection_allowed:
-                    self.logger.debug(
-                        "%s has read access to item %s" %
-                        (user or "Anonymous", item.href))
-                    read_allowed_items.append(item)
-                else:
-                    self.logger.debug(
-                        "%s has NO read access to item %s" %
-                        (user or "Anonymous", item.href))
-
-                if write_last_collection_allowed:
-                    self.logger.debug(
-                        "%s has write access to item %s" %
-                        (user or "Anonymous", item.href))
-                    write_allowed_items.append(item)
-                else:
-                    self.logger.debug(
-                        "%s has NO write access to item %s" %
-                        (user or "Anonymous", item.href))
-
+                path = item.collection.path
+            if self.authorized(user, path, "r"):
+                self.logger.debug(
+                    "%s has read access to collection %s" %
+                    (user or "Anonymous", path or "/"))
+                read_allowed_items.append(item)
+            else:
+                self.logger.debug(
+                    "%s has NO read access to collection %s" %
+                    (user or "Anonymous", path or "/"))
+            if self.authorized(user, path, "w"):
+                self.logger.debug(
+                    "%s has write access to collection %s" %
+                    (user or "Anonymous", path or "/"))
+                write_allowed_items.append(item)
+            else:
+                self.logger.debug(
+                    "%s has NO write access to collection %s" %
+                    (user or "Anonymous", path or "/"))
         return read_allowed_items, write_allowed_items
 
     def __call__(self, environ, start_response):
@@ -305,12 +278,11 @@ class Application:
         # Create principal collection
         if user and is_authenticated:
             principal_path = "/%s/" % user
-            collection = self.Collection(principal_path, True)
-            if self.authorized(user, collection, "w"):
+            if self.authorized(user, principal_path, "w"):
                 with self.Collection.acquire_lock("r"):
                     principal = next(
                         self.Collection.discover(principal_path), None)
-                if not principal or principal.path != user:
+                if not principal:
                     with self.Collection.acquire_lock("w"):
                         self.Collection.create_collection(principal_path)
 
@@ -333,34 +305,7 @@ class Application:
             content = None
 
         if is_valid_user:
-            if function in (
-                    self.do_GET, self.do_HEAD, self.do_OPTIONS,
-                    self.do_PROPFIND, self.do_REPORT):
-                lock_mode = "r"
-            else:
-                lock_mode = "w"
-            with self.Collection.acquire_lock(lock_mode):
-                items = self.Collection.discover(
-                    path, environ.get("HTTP_DEPTH", "0"))
-                read_allowed_items, write_allowed_items = (
-                    self.collect_allowed_items(items, user))
-                if (read_allowed_items or write_allowed_items or
-                        is_authenticated and function == self.do_PROPFIND or
-                        function == self.do_OPTIONS):
-                    status, headers, answer = function(
-                        environ, read_allowed_items, write_allowed_items,
-                        content, user)
-                    hook = self.configuration.get("storage", "hook")
-                    if lock_mode == "w" and hook:
-                        self.logger.debug("Running hook")
-                        folder = os.path.expanduser(
-                            self.configuration.get(
-                                "storage", "filesystem_folder"))
-                        subprocess.check_call(
-                            hook % {"user": shlex.quote(user or "Anonymous")},
-                            shell=True, cwd=folder)
-                else:
-                    status, headers, answer = NOT_ALLOWED
+            status, headers, answer = function(environ, path, content, user)
         else:
             status, headers, answer = NOT_ALLOWED
 
@@ -397,142 +342,154 @@ class Application:
 
         return response(status, headers, answer)
 
-    def do_DELETE(self, environ, read_collections, write_collections, content,
-                  user):
+    def _access(self, user, path, permission, item=None):
+        """Checks if ``user`` can access ``path`` or the parent collection
+           with ``permission``.
+
+           ``permission`` must either be "r" or "w".
+
+           If ``item`` is given, only access to that class of item is checked.
+
+        """
+        path = storage.sanitize_path(path)
+        parent_path = storage.sanitize_path(
+            "/%s/" % posixpath.dirname(path.strip("/")))
+        allowed = False
+        if not item or isinstance(item, self.Collection):
+            allowed |= self.authorized(user, path, permission)
+        if not item or not isinstance(item, self.Collection):
+            allowed |= self.authorized(user, parent_path, permission)
+        return allowed
+
+    @contextmanager
+    def _lock_collection(self, lock_mode, user):
+        """Lock the collection with ``permission`` and execute hook."""
+        with self.Collection.acquire_lock(lock_mode) as value:
+            yield value
+        hook = self.configuration.get("storage", "hook")
+        if lock_mode == "w" and hook:
+            self.logger.debug("Running hook")
+            folder = os.path.expanduser(self.configuration.get(
+                "storage", "filesystem_folder"))
+            subprocess.check_call(
+                hook % {"user": shlex.quote(user or "Anonymous")},
+                shell=True, cwd=folder)
+
+    def do_DELETE(self, environ, path, content, user):
         """Manage DELETE request."""
-        if not write_collections:
+        if not self._access(user, path, "w"):
             return NOT_ALLOWED
-
-        collection = write_collections[0]
-
-        if collection.path == environ["PATH_INFO"].strip("/"):
-            # Path matching the collection, the collection must be deleted
-            item = collection
-        else:
-            # Try to get an item matching the path
-            name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
-            item = collection.get(name)
-
-        if item:
+        with self._lock_collection("w", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if not self._access(user, path, "w", item):
+                return NOT_ALLOWED
+            if not item:
+                return client.GONE, {}, None
             if_match = environ.get("HTTP_IF_MATCH", "*")
-            if if_match in ("*", item.etag):
-                # No ETag precondition or precondition verified, delete item
-                answer = xmlutils.delete(environ["PATH_INFO"], collection)
-                return client.OK, {}, answer
-
-        # No item or ETag precondition not verified, do not delete item
-        return client.PRECONDITION_FAILED, {}, None
+            if if_match not in ("*", item.etag):
+                # ETag precondition not verified, do not delete item
+                return client.PRECONDITION_FAILED, {}, None
+            if isinstance(item, self.Collection):
+                answer = xmlutils.delete(path, item)
+            else:
+                answer = xmlutils.delete(path, item.collection, item.href)
+            return client.OK, {}, answer
 
-    def do_GET(self, environ, read_collections, write_collections, content,
-               user):
+    def do_GET(self, environ, path, content, user):
         """Manage GET request."""
         # Display a "Radicale works!" message if the root URL is requested
-        if environ["PATH_INFO"] == "/":
+        if not path.strip("/"):
             headers = {"Content-type": "text/html"}
             answer = "<!DOCTYPE html>\n<title>Radicale</title>Radicale works!"
             return client.OK, headers, answer
-
-        if not read_collections:
+        if not self._access(user, path, "r"):
             return NOT_ALLOWED
-
-        collection = read_collections[0]
-
-        item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
-
-        if item_name:
-            # Get collection item
-            item = collection.get(item_name)
-            if item:
-                answer = item.serialize()
-                etag = item.etag
-            else:
-                return client.NOT_FOUND, {}, None
-        else:
-            # Get whole collection
-            answer = collection.serialize()
-            if answer is None:
+        with self._lock_collection("r", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if not self._access(user, path, "r", item):
+                return NOT_ALLOWED
+            if not item:
                 return client.NOT_FOUND, {}, None
+            if isinstance(item, self.Collection):
+                collection = item
             else:
-                etag = collection.etag
-
-        if answer:
+                collection = item.collection
+            content_type = storage.MIMETYPES.get(collection.get_meta("tag"),
+                                                 "text/plain")
             headers = {
-                "Content-Type": storage.MIMETYPES[collection.get_meta("tag")],
+                "Content-Type": content_type,
                 "Last-Modified": collection.last_modified,
-                "ETag": etag}
-        else:
-            headers = {}
-        return client.OK, headers, answer
+                "ETag": item.etag}
+            answer = item.serialize()
+            return client.OK, headers, answer
 
-    def do_HEAD(self, environ, read_collections, write_collections, content,
-                user):
+    def do_HEAD(self, environ, path, content, user):
         """Manage HEAD request."""
-        status, headers, answer = self.do_GET(
-            environ, read_collections, write_collections, content, user)
+        status, headers, answer = self.do_GET(environ, path, content, user)
         return status, headers, None
 
-    def do_MKCALENDAR(self, environ, read_collections, write_collections,
-                      content, user):
+    def do_MKCALENDAR(self, environ, path, content, user):
         """Manage MKCALENDAR request."""
-        if not write_collections:
+        if not self.authorized(user, path, "w"):
             return NOT_ALLOWED
 
-        collection = write_collections[0]
-
-        props = xmlutils.props_from_request(content)
-        props["tag"] = "VCALENDAR"
-        # TODO: use this?
-        # timezone = props.get("C:calendar-timezone")
-        collection = self.Collection.create_collection(
-            environ["PATH_INFO"], props=props)
-        return client.CREATED, {}, None
-
-    def do_MKCOL(self, environ, read_collections, write_collections, content,
-                 user):
+        with self._lock_collection("w", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if item:
+                return client.CONFLICT, {}, None
+            props = xmlutils.props_from_request(content)
+            props["tag"] = "VCALENDAR"
+            # TODO: use this?
+            # timezone = props.get("C:calendar-timezone")
+            self.Collection.create_collection(path, props=props)
+            return client.CREATED, {}, None
+
+    def do_MKCOL(self, environ, path, content, user):
         """Manage MKCOL request."""
-        if not write_collections:
+        if not self.authorized(user, path, "w"):
             return NOT_ALLOWED
-
-        collection = write_collections[0]
-
-        props = xmlutils.props_from_request(content)
-        collection = self.Collection.create_collection(
-            environ["PATH_INFO"], props=props)
-        return client.CREATED, {}, None
-
-    def do_MOVE(self, environ, read_collections, write_collections, content,
-                user):
+        with self._lock_collection("w", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if item:
+                return client.CONFLICT, {}, None
+            props = xmlutils.props_from_request(content)
+            collection = self.Collection.create_collection(path, props=props)
+            for key, value in props.items():
+                collection.set_meta(key, value)
+            return client.CREATED, {}, None
+
+    def do_MOVE(self, environ, path, content, user):
         """Manage MOVE request."""
-        if not write_collections:
+        to_url = urlparse(environ["HTTP_DESTINATION"])
+        if to_url.netloc != environ["HTTP_HOST"]:
+            # Remote destination server, not supported
+            return client.BAD_GATEWAY, {}, None
+        to_path = storage.sanitize_path(to_url.path)
+        if (not self._access(user, path, "w") or
+                not self._access(user, to_path, "w")):
             return NOT_ALLOWED
-
-        from_collection = write_collections[0]
-        from_name = xmlutils.name_from_path(
-            environ["PATH_INFO"], from_collection)
-        item = from_collection.get(from_name)
-        if item:
-            # Move the item
-            to_url_parts = urlparse(environ["HTTP_DESTINATION"])
-            if to_url_parts.netloc == environ["HTTP_HOST"]:
-                to_url = to_url_parts.path
-                to_path, to_name = to_url.rstrip("/").rsplit("/", 1)
-                for to_collection in self.Collection.discover(
-                        to_path, depth="0"):
-                    if to_collection in write_collections:
-                        to_collection.upload(to_name, item)
-                        from_collection.delete(from_name)
-                        return client.CREATED, {}, None
-                    else:
-                        return NOT_ALLOWED
-            else:
-                # Remote destination server, not supported
-                return client.BAD_GATEWAY, {}, None
-        else:
-            # No item found
-            return client.GONE, {}, None
-
-    def do_OPTIONS(self, environ, read_collections, write_collections,
-                   content, user):
+        with self._lock_collection("w", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if (not self._access(user, path, "w", item) or
+                    not self._access(user, to_path, "w", item)):
+                return NOT_ALLOWED
+            if not item:
+                return client.GONE, {}, None
+            to_item = next(self.Collection.discover(to_path, depth="0"), None)
+            to_parent_path = storage.sanitize_path(
+                "/%s/" % posixpath.dirname(to_path.strip("/")))
+            to_href = posixpath.basename(to_path.strip("/"))
+            to_collection = next(self.Collection.discover(
+                to_parent_path, depth="0"), None)
+            print(path, isinstance(item, self.Collection))
+            if (isinstance(item, self.Collection) or not to_collection or
+                    to_item or to_path.strip("/").startswith(path.strip("/"))):
+                return client.CONFLICT, {}, None
+            to_collection.upload(to_href, item.item)
+            item.collection.delete(item.href)
+            return client.CREATED, {}, None
+
+    def do_OPTIONS(self, environ, path, content, user):
         """Manage OPTIONS request."""
         headers = {
             "Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, "
@@ -540,94 +497,100 @@ class Application:
             "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"}
         return client.OK, headers, None
 
-    def do_PROPFIND(self, environ, read_collections, write_collections,
-                    content, user):
+    def do_PROPFIND(self, environ, path, content, user):
         """Manage PROPFIND request."""
-        if not read_collections:
-            return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED
-        headers = {
-            "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
-            "Content-Type": "text/xml"}
-        answer = xmlutils.propfind(
-            environ["PATH_INFO"], content, read_collections, write_collections,
-            user)
-        return client.MULTI_STATUS, headers, answer
-
-    def do_PROPPATCH(self, environ, read_collections, write_collections,
-                     content, user):
+        with self._lock_collection("r", user):
+            items = self.Collection.discover(path,
+                                             environ.get("HTTP_DEPTH", "0"))
+            read_items, write_items = self.collect_allowed_items(items, user)
+            if not read_items and not write_items:
+                return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED
+            headers = {
+                "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
+                "Content-Type": "text/xml"}
+            answer = xmlutils.propfind(
+                path, content, read_items, write_items, user)
+            return client.MULTI_STATUS, headers, answer
+
+    def do_PROPPATCH(self, environ, path, content, user):
         """Manage PROPPATCH request."""
-        if not write_collections:
+        if not self.authorized(user, path, "w"):
             return NOT_ALLOWED
+        with self._lock_collection("w", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if not isinstance(item, self.Collection):
+                return client.CONFLICT, {}, None
+            answer = xmlutils.proppatch(path, content, item)
+            headers = {
+                "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
+                "Content-Type": "text/xml"}
+            return client.MULTI_STATUS, headers, answer
 
-        collection = write_collections[0]
-
-        answer = xmlutils.proppatch(environ["PATH_INFO"], content, collection)
-        headers = {
-            "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
-            "Content-Type": "text/xml"}
-        return client.MULTI_STATUS, headers, answer
-
-    def do_PUT(self, environ, read_collections, write_collections, content,
-               user):
+    def do_PUT(self, environ, path, content, user):
         """Manage PUT request."""
-        if not write_collections:
+        if not self._access(user, path, "w"):
             return NOT_ALLOWED
 
-        collection = write_collections[0]
+        with self._lock_collection("w", user):
+            parent_path = storage.sanitize_path(
+                "/%s/" % posixpath.dirname(path.strip("/")))
+            item = next(self.Collection.discover(path, depth="0"), None)
+            parent_item = next(self.Collection.discover(
+                parent_path, depth="0"), None)
+            write_whole_collection = (
+                isinstance(item, self.Collection) or
+                not parent_item or
+                not next(parent_item.list(), None) and
+                parent_item.get_meta("tag") not in (
+                    "VADDRESSBOOK", "VCALENDAR"))
+            if (write_whole_collection and
+                    not self.authorized(user, path, "w") or
+                    not write_whole_collection and
+                    not self.authorized(user, parent_path, "w")):
+                return NOT_ALLOWED
+            etag = environ.get("HTTP_IF_MATCH", "")
+            match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
+
+            if ((not item and etag) or (item and etag and item.etag != etag) or
+                    (item and match)):
+                return client.PRECONDITION_FAILED, {}, None
 
-        content_type = environ.get("CONTENT_TYPE")
-        if content_type:
-            tags = {value: key for key, value in storage.MIMETYPES.items()}
-            tag = tags.get(content_type.split(";")[0])
-            if tag:
-                collection.set_meta({"tag": tag})
-        headers = {}
-        item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
-        item = collection.get(item_name)
-
-        etag = environ.get("HTTP_IF_MATCH", "")
-        match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
-        if (not item and not etag) or (
-                item and ((etag or item.etag) == item.etag) and not match):
-            # PUT allowed in 3 cases
-            # Case 1: No item and no ETag precondition: Add new item
-            # Case 2: Item and ETag precondition verified: Modify item
-            # Case 3: Item and no Etag precondition: Force modifying item
             items = list(vobject.readComponents(content or ""))
-            if item:
-                # PUT is modifying an existing item
-                if items:
-                    new_item = collection.update(item_name, items[0])
-                else:
-                    new_item = None
-            elif item_name:
-                # PUT is adding a new item
-                if items:
-                    new_item = collection.upload(item_name, items[0])
-                else:
-                    new_item = None
+            content_type = environ.get("CONTENT_TYPE")
+            tag = None
+            if content_type:
+                tags = {value: key for key, value in storage.MIMETYPES.items()}
+                tag = tags.get(content_type.split(";")[0])
+            if write_whole_collection:
+                if item:
+                    # Delete old collection
+                    item.delete()
+                new_item = self.Collection.create_collection(path, items, tag)
             else:
-                # PUT is replacing the whole collection
-                collection.delete()
-                new_item = self.Collection.create_collection(
-                    environ["PATH_INFO"], items)
-            if new_item:
-                headers["ETag"] = new_item.etag
-            status = client.CREATED
-        else:
-            # PUT rejected in all other cases
-            status = client.PRECONDITION_FAILED
-        return status, headers, None
+                if tag:
+                    parent_item.set_meta({"tag": tag})
+                href = posixpath.basename(path.strip("/"))
+                if item:
+                    new_item = parent_item.update(href, items[0])
+                else:
+                    new_item = parent_item.upload(href, items[0])
+            headers = {"ETag": new_item.etag}
+            return client.CREATED, headers, None
 
-    def do_REPORT(self, environ, read_collections, write_collections, content,
-                  user):
+    def do_REPORT(self, environ, path, content, user):
         """Manage REPORT request."""
-        if not read_collections:
+        if not self._access(user, path, "w"):
             return NOT_ALLOWED
-
-        collection = read_collections[0]
-
-        headers = {"Content-Type": "text/xml"}
-
-        answer = xmlutils.report(environ["PATH_INFO"], content, collection)
-        return client.MULTI_STATUS, headers, answer
+        with self._lock_collection("r", user):
+            item = next(self.Collection.discover(path, depth="0"), None)
+            if not self._access(user, path, "w", item):
+                return NOT_ALLOWED
+            if not item:
+                return client.NOT_FOUND, {}, None
+            if isinstance(item, self.Collection):
+                collection = item
+            else:
+                collection = item.collection
+            headers = {"Content-Type": "text/xml"}
+            answer = xmlutils.report(path, content, collection)
+            return client.MULTI_STATUS, headers, answer

+ 6 - 8
radicale/rights.py

@@ -107,19 +107,17 @@ class Rights(BaseRights):
         self.filename = os.path.expanduser(configuration.get("rights", "file"))
         self.rights_type = configuration.get("rights", "type").lower()
 
-    def authorized(self, user, collection, permission):
+    def authorized(self, user, path, permission):
         user = user or ""
         if user and not storage.is_safe_path_component(user):
             # Prevent usernames like "user/calendar.ics"
             raise ValueError("Unsafe username")
-        collection_url = collection.path.rstrip("/")
-        if collection_url in (".well-known/carddav", ".well-known/caldav"):
-            return permission == "r"
+        sane_path = storage.sanitize_path(path).strip("/")
         # Prevent "regex injection"
         user_escaped = re.escape(user)
-        collection_url_escaped = re.escape(collection_url)
+        sane_path_escaped = re.escape(sane_path)
         regex = ConfigParser(
-            {"login": user_escaped, "path": collection_url_escaped})
+            {"login": user_escaped, "path": sane_path_escaped})
         if self.rights_type in DEFINED_RIGHTS:
             self.logger.debug("Rights type '%s'" % self.rights_type)
             regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type]))
@@ -135,11 +133,11 @@ class Rights(BaseRights):
             re_collection = regex.get(section, "collection")
             self.logger.debug(
                 "Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
-                    user, collection_url, re_user, re_collection, section))
+                    user, sane_path, re_user, re_collection, section))
             user_match = re.fullmatch(re_user, user)
             if user_match:
                 re_collection = re_collection.format(*user_match.groups())
-                if re.fullmatch(re_collection, collection_url):
+                if re.fullmatch(re_collection, sane_path):
                     self.logger.debug("Section '%s' matches" % section)
                     return permission in regex.get(section, "permission")
                 else:

+ 27 - 33
radicale/storage.py

@@ -229,9 +229,7 @@ class BaseCollection:
         returned.
 
         If ``depth`` is anything but "0", it is considered as "1" and direct
-        children are included in the result. If ``include_container`` is
-        ``True`` (the default), the containing object is included in the
-        result.
+        children are included in the result.
 
         The ``path`` is relative.
 
@@ -398,41 +396,37 @@ class Collection(BaseCollection):
 
         # Try to guess if the path leads to a collection or an item
         folder = cls._get_collection_root_folder()
-        # HACK: Detection of principal collections fails if folder doesn't
-        #       exist. This can be removed, when this method stop returning
-        #       collections that don't exist.
-        os.makedirs(folder, exist_ok=True)
-        if not os.path.isdir(path_to_filesystem(folder, sane_path)):
-            # path is not a collection
-            if attributes and os.path.isfile(path_to_filesystem(folder,
-                                                                sane_path)):
-                # path is an item
-                attributes.pop()
-            elif attributes and os.path.isdir(path_to_filesystem(
-                    folder, *attributes[:-1])):
-                # path parent is a collection
-                attributes.pop()
-            # TODO: else: return?
+        try:
+            filesystem_path = path_to_filesystem(folder, sane_path)
+        except ValueError:
+            # Path is unsafe
+            return
+        href = None
+        if not os.path.isdir(filesystem_path):
+            if attributes and os.path.isfile(filesystem_path):
+                href = attributes.pop()
+            else:
+                return
 
         path = "/".join(attributes)
-
         principal = len(attributes) == 1
         collection = cls(path, principal)
+        if href:
+            yield collection.get(href)
+            return
         yield collection
-        if depth != "0":
-            # TODO: fix this
-            items = list(collection.list())
-            if items:
-                for item in items:
-                    yield collection.get(item[0])
-            _, directories, _ = next(os.walk(collection._filesystem_path))
-            for sub_path in directories:
-                if not is_safe_filesystem_path_component(sub_path):
-                    cls.logger.debug("Skipping collection: %s", sub_path)
-                    continue
-                full_path = os.path.join(collection._filesystem_path, sub_path)
-                if os.path.exists(full_path):
-                    yield cls(posixpath.join(path, sub_path))
+        if depth == "0":
+            return
+        for item in collection.list():
+            yield collection.get(item[0])
+        for href in os.listdir(filesystem_path):
+            if not is_safe_filesystem_path_component(href):
+                cls.logger.debug("Skipping collection: %s", href)
+                continue
+            child_filesystem_path = path_to_filesystem(filesystem_path, href)
+            if os.path.isdir(child_filesystem_path):
+                child_principal = len(attributes) == 0
+                yield cls(child_filesystem_path, child_principal)
 
     @classmethod
     def create_collection(cls, href, collection=None, props=None):

+ 30 - 0
radicale/tests/test_base.py

@@ -133,6 +133,36 @@ class BaseRequests:
         status, headers, answer = self.request("GET", "/calendar.ics/")
         assert "VEVENT" not in answer
 
+    def test_mkcalendar(self):
+        """Make a calendar."""
+        self.request("MKCALENDAR", "/calendar.ics/")
+        status, headers, answer = self.request("GET", "/calendar.ics/")
+        assert status == 200
+
+    def test_move(self):
+        """Move a item."""
+        self.request("MKCALENDAR", "/calendar.ics/")
+        event = get_file_content("event1.ics")
+        path1 = "/calendar.ics/event1.ics"
+        path2 = "/calendar.ics/event2.ics"
+        status, headers, answer = self.request("PUT", path1, event)
+        status, headers, answer = self.request(
+            "MOVE", path1, HTTP_DESTINATION=path2, HTTP_HOST="")
+        assert status == 201
+        status, headers, answer = self.request("GET", path1)
+        assert status == 404
+        status, headers, answer = self.request("GET", path2)
+        assert status == 200
+
+    def test_head(self):
+        status, headers, answer = self.request("HEAD", "/")
+        assert status == 200
+
+    def test_options(self):
+        status, headers, answer = self.request("OPTIONS", "/")
+        assert status == 200
+        assert "DAV" in headers
+
     def test_multiple_events_with_same_uid(self):
         """Add two events with the same UID."""
         self.request("MKCOL", "/calendar.ics/")

+ 2 - 9
radicale/xmlutils.py

@@ -472,20 +472,13 @@ def props_from_request(root, actions=("set", "remove")):
     return result
 
 
-def delete(path, collection):
+def delete(path, collection, href=None):
     """Read and answer DELETE requests.
 
     Read rfc4918-9.6 for info.
 
     """
-    # Reading request
-    if collection.path == path.strip("/"):
-        # Delete the whole collection
-        collection.delete()
-    else:
-        # Remove an item from the collection
-        collection.delete(name_from_path(path, collection))
-
+    collection.delete(href)
     # Writing answer
     multistatus = ET.Element(_tag("D", "multistatus"))
     response = ET.Element(_tag("D", "response"))