|
|
@@ -52,11 +52,7 @@ INTERNAL_TYPES = ("none", "authenticated", "owner_write", "owner_only",
|
|
|
def load(configuration):
|
|
|
"""Load the rights manager chosen in configuration."""
|
|
|
rights_type = configuration.get("rights", "type")
|
|
|
- if configuration.get("auth", "type") == "none":
|
|
|
- rights_type = "none"
|
|
|
- if rights_type == "none":
|
|
|
- rights_class = NoneRights
|
|
|
- elif rights_type == "authenticated":
|
|
|
+ if rights_type == "authenticated":
|
|
|
rights_class = AuthenticatedRights
|
|
|
elif rights_type == "owner_write":
|
|
|
rights_class = OwnerWriteRights
|
|
|
@@ -97,38 +93,54 @@ class BaseRights:
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
-class NoneRights(BaseRights):
|
|
|
- def authorized(self, user, path, permissions):
|
|
|
- return intersect_permissions(permissions)
|
|
|
-
|
|
|
-
|
|
|
class AuthenticatedRights(BaseRights):
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ super().__init__(*args, **kwargs)
|
|
|
+ self._verify_user = self.configuration.get("auth", "type") != "none"
|
|
|
+
|
|
|
def authorized(self, user, path, permissions):
|
|
|
- if not user:
|
|
|
+ if self._verify_user and not user:
|
|
|
return ""
|
|
|
- return intersect_permissions(permissions)
|
|
|
+ sane_path = storage.sanitize_path(path).strip("/")
|
|
|
+ if "/" not in sane_path:
|
|
|
+ return intersect_permissions(permissions, "RW")
|
|
|
+ if sane_path.count("/") == 1:
|
|
|
+ return intersect_permissions(permissions, "rw")
|
|
|
+ return ""
|
|
|
|
|
|
|
|
|
-class OwnerWriteRights(BaseRights):
|
|
|
+class OwnerWriteRights(AuthenticatedRights):
|
|
|
def authorized(self, user, path, permissions):
|
|
|
- if not user:
|
|
|
+ if self._verify_user and not user:
|
|
|
return ""
|
|
|
sane_path = storage.sanitize_path(path).strip("/")
|
|
|
- if user != sane_path.split("/", maxsplit=1)[0]:
|
|
|
- return intersect_permissions(permissions, "Rr")
|
|
|
- return intersect_permissions(permissions)
|
|
|
+ if not sane_path:
|
|
|
+ return intersect_permissions(permissions, "R")
|
|
|
+ if self._verify_user:
|
|
|
+ owned = user == sane_path.split("/", maxsplit=1)[0]
|
|
|
+ else:
|
|
|
+ owned = True
|
|
|
+ if "/" not in sane_path:
|
|
|
+ return intersect_permissions(permissions, "RW" if owned else "R")
|
|
|
+ if sane_path.count("/") == 1:
|
|
|
+ return intersect_permissions(permissions, "rw" if owned else "r")
|
|
|
+ return ""
|
|
|
|
|
|
|
|
|
-class OwnerOnlyRights(BaseRights):
|
|
|
+class OwnerOnlyRights(AuthenticatedRights):
|
|
|
def authorized(self, user, path, permissions):
|
|
|
- if not user:
|
|
|
+ if self._verify_user and not user:
|
|
|
return ""
|
|
|
sane_path = storage.sanitize_path(path).strip("/")
|
|
|
if not sane_path:
|
|
|
return intersect_permissions(permissions, "R")
|
|
|
- if user != sane_path.split("/", maxsplit=1)[0]:
|
|
|
+ if self._verify_user and user != sane_path.split("/", maxsplit=1)[0]:
|
|
|
return ""
|
|
|
- return intersect_permissions(permissions)
|
|
|
+ if "/" not in sane_path:
|
|
|
+ return intersect_permissions(permissions, "RW")
|
|
|
+ if sane_path.count("/") == 1:
|
|
|
+ return intersect_permissions(permissions, "rw")
|
|
|
+ return ""
|
|
|
|
|
|
|
|
|
class Rights(BaseRights):
|