Просмотр исходного кода

Added file-based rights management

Matthias Jordan 13 лет назад
Родитель
Сommit
e3bc6afdd3
5 измененных файлов с 352 добавлено и 2 удалено
  1. 4 1
      config
  2. 2 1
      radicale/config.py
  3. 176 0
      radicale/rights/from_file.py
  4. 7 0
      test/python/rights/__init__.py
  5. 163 0
      test/python/rights/test_from_file.py

+ 4 - 1
config

@@ -80,9 +80,12 @@ courier_socket =
 
 [rights]
 # Rights management method
-# Value: None | owner_only | owner_write
+# Value: None | owner_only | owner_write | file_based
 type = None
 
+# File for file_based rights management
+file = ~/.config/radicale/rights
+
 
 [storage]
 # Storage backend

+ 2 - 1
radicale/config.py

@@ -67,7 +67,8 @@ INITIAL_CONFIG = {
         "pam_group_membership": "",
         "courier_socket": ""},
     "rights": {
-        "type": "None"},
+        "type": "None",
+        "file" : "None"},
     "storage": {
         "type": "filesystem",
         "filesystem_folder": os.path.expanduser(

+ 176 - 0
radicale/rights/from_file.py

@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Radicale Server - Calendar Server
+# Copyright © 2012 Guillaume Ayoub
+#
+# This library is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Radicale.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+File-based rights.
+
+The owner is implied to have all rights on their collections.
+
+Rights are read from a file whose name is specified in the config 
+(section "right", key "file").
+
+The file's format is per line:
+
+collectionpath ":" principal " " rights {", " principal " " rights}*
+
+collectionpath is the path part of the collection's url
+
+principal is a user name (no whitespace allowed)
+rights is a string w/o whitespace that contains "r" for reading rights, 
+"w" for writing rights and a combination of these for all rights.
+
+Empty lines are ignored. Lines starting with "#" (hash sign) are comments.
+
+Example:
+
+# This means user1 may read, user2 may write, user3 has full access
+/user0/calendar : user1 r, user2 w, user3 rw
+# user0 can read /user1/cal
+/user1/cal : user0 r 
+
+If a collection /a/b is shared and other users than the owner are 
+supposed to find the collection in a propfind request, an additional
+line for /a has to be in the defintions. E.g.:
+
+/user0/cal: user
+
+"""
+
+from radicale import config, log
+from radicale.rights import owner_only
+
+
+READ_AUTHORIZED = None
+WRITE_AUTHORIZED = None
+
+
+class ParsingError(BaseException):
+    """Raised if the file cannot be parsed"""
+
+
+def read_authorized(user, collection):
+    """Check if the user is allowed to read the collection."""
+    if owner_only.read_authorized(user, collection):
+        return True
+    
+    curl = _normalize_trail_slash(collection.url)
+
+    return _dict_knows(READ_AUTHORIZED, curl, user)
+
+
+
+def write_authorized(user, collection):
+    """Check if the user is allowed to write the collection."""
+    if owner_only.read_authorized(user, collection):
+        return True
+
+    curl = _normalize_trail_slash(collection.url)
+
+    return _dict_knows(WRITE_AUTHORIZED, curl, user)
+
+
+
+def _dict_knows(adict, url, user):
+    return adict.has_key(url) and adict.get(url).count(user) != 0
+
+
+
+def _load():
+    read = {}
+    write = {}
+    file_name = config.get("rights", "file")
+    if file_name == "None":
+        log.LOGGER.error("No file name configured for rights type 'from_file'")
+        return
+    
+    log.LOGGER.debug("Reading rights from file %s" % file_name)
+
+    lines = open(file_name, "r").readlines()
+    
+    for line in lines:
+        _process(line, read, write)
+
+    global READ_AUTHORIZED, WRITE_AUTHORIZED
+    READ_AUTHORIZED = read
+    WRITE_AUTHORIZED = write
+
+
+
+def _process(line, read, write):
+    line = line.strip()   
+    if line == "":
+        """Empty line"""
+        return
+    
+    if line.startswith("#"):
+        """Comment"""
+        return
+        
+    collection, sep, rights_part = line.partition(":")
+    
+    rights_part = rights_part.strip()
+
+    if rights_part == "":
+        return
+
+    collection = collection.strip()
+    
+    if collection == "":
+        raise ParsingError
+    
+    collection = _normalize_trail_slash(collection)
+    
+    rights = rights_part.split(",")
+    for right in rights:
+        user, sep, right_defs = right.strip().partition(" ")
+        
+        if user == "" or right_defs == "":
+            raise ParsingError
+        
+        user = user.strip()
+        right_defs = right_defs.strip()
+        
+        for right_def in list(right_defs):
+            
+            if right_def == 'r':
+                _append(read, collection, user)
+            elif right_def == 'w':
+                _append(write, collection, user)
+            else:
+                raise ParsingError
+
+
+        
+def _append(rdict, key, value):
+    if rdict.has_key(key):
+        rlist = rdict[key]
+        rlist.append(value)
+    else:
+        rlist = [value]
+        rdict[key] = rlist
+        
+        
+
+def _normalize_trail_slash(s):
+    """Removes a maybe existing trailing slash"""
+    if s != "/" and s.endswith("/"):
+        s, sep, empty = s.rpartition("/")
+    return s
+
+
+_load()

+ 7 - 0
test/python/rights/__init__.py

@@ -0,0 +1,7 @@
+'''
+Created on 09.08.2012
+
+Tests for rights-related code.
+
+@author: mj
+'''

+ 163 - 0
test/python/rights/test_from_file.py

@@ -0,0 +1,163 @@
+"""
+
+Unit test for radicale.rights.from_file.
+
+Tests reading the file. The processing is untested, yet.
+
+"""
+
+
+from radicale.rights import from_file
+import unittest
+
+
+
+class Test1(unittest.TestCase):
+    
+    def testProcessEmptyLine(self):
+        """ Line with a comment """
+
+        # Input values        
+        line = " "
+        read = {}
+        write = {}
+        
+        try:
+            # Call SUT
+            from_file._process(line, read, write)
+        except from_file.ParsingError:
+            self.assertTrue(False)
+
+        self.assertTrue(len(read.keys()) == 0)
+        self.assertTrue(len(write.keys()) == 0)
+
+
+    def testProcessComment(self):
+        """ Line with a comment """
+
+        # Input values        
+        line = "# some comment"
+        read = {}
+        write = {}
+        
+        try:
+            # Call SUT
+            from_file._process(line, read, write)
+        except from_file.ParsingError:
+            self.assertTrue(False)
+
+        self.assertTrue(len(read.keys()) == 0)
+        self.assertTrue(len(write.keys()) == 0)
+
+
+    def testProcess0a(self):
+        """ Pointless line: no rights definitions """
+
+        # Input values        
+        line = "/user1/collection1 : "
+        read = {}
+        write = {}
+        
+        try:
+            # Call SUT
+            from_file._process(line, read, write)
+        except from_file.ParsingError:
+            self.fail("Unexpected exception")
+
+        self.assertTrue(len(read.keys()) == 0)
+        self.assertTrue(len(write.keys()) == 0)
+
+
+    def testProcess1a(self):
+        """ Malformed line: no collection definitions """
+
+        # Input values        
+        line = " : a b"
+        read = {}
+        write = {}
+        
+        try:
+            # Call SUT
+            from_file._process(line, read, write)
+        except from_file.ParsingError:
+            """Exception expected"""
+        else:
+            self.fail("Expected exception not raised")
+
+
+
+    def testProcess1b(self):
+        """ Malformed line: right "b" unknown """
+
+        # Input values        
+        line = "/user1/collection1 : a b"
+        read = {}
+        write = {}
+        
+        try:
+            # Call SUT
+            from_file._process(line, read, write)
+        except from_file.ParsingError:
+            """Exception expected"""
+        else:
+            self.fail("Expected exception not raised")
+
+
+    def testProcess1c(self):
+        """ Malformed line: user/right empty """
+
+        # Input values        
+        line = "/user1/collection1 : a"
+        read = {}
+        write = {}
+        
+        try:
+            # Call SUT
+            from_file._process(line, read, write)
+        except from_file.ParsingError:
+            """Exception expected"""
+        else:
+            self.fail("Expected exception not raised")
+
+
+    def testProcess2(self):
+        """Actual sensible input all of which means the same"""
+
+        lines = [
+                 "/user1/collection1 : other1 r, other2 w, other6 rw",
+                 "/user1/collection1/ : other1 r, other2 w, other6 rw",
+                 "/user1/collection1: other1 r, other2 w, other6 rw",
+                 "/user1/collection1/: other1 r, other2 w, other6 rw",
+                 "/user1/collection1: other1  r,    other2 w,other6 rw",
+                 "/user1/collection1 :other1 r,other2 w,   other6 rw",
+                 "/user1/collection1\t:other1 r,\tother2 w,\tother6 rw",
+                 ]
+
+        for line in lines:
+            # Input values        
+            read = {}
+            write = {}
+
+            try:            
+                # Call SUT
+                from_file._process(line, read, write)
+            except:
+                self.fail("unexpected exception for input %s" % line)
+            
+            # Check
+            self.assertEquals(len(read.keys()), 1, "keys in %s" % line)
+            self.assertEquals(len(read.get("/user1/collection1")), 2, "rights in %s" % line)
+            self.assertTrue(read.get("/user1/collection1").count("other1"), "other1 read in %s" % line)
+            self.assertTrue(read.get("/user1/collection1").count("other6"), "other6 read in %s" % line)
+    
+            self.assertEquals(len(write.keys()), 1, "keys in %s" % line)
+            self.assertEquals(len(write.get("/user1/collection1")), 2, "rights in %s" % line)
+            self.assertTrue(write.get("/user1/collection1").count("other2"), "other2 write in %s" % line)
+            self.assertTrue(write.get("/user1/collection1").count("other6"), "other6 write in %s" % line)
+
+
+
+
+
+if __name__ == "__main__":
+    unittest.main()