|
|
@@ -282,13 +282,23 @@ class TestBaseAuthRequests(BaseTest):
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
|
|
|
def _test_dovecot(
|
|
|
- self, user, password, expected_status,
|
|
|
- response=b'FAIL\n1\n', mech=[b'PLAIN'], broken=None):
|
|
|
+ self, user, password, expected_status, expected_rip=None,
|
|
|
+ response=b'FAIL\t1', mech=[b'PLAIN'], broken=None,
|
|
|
+ extra_config=None, extra_env=None):
|
|
|
import socket
|
|
|
from unittest.mock import DEFAULT, patch
|
|
|
|
|
|
- self.configure({"auth": {"type": "dovecot",
|
|
|
- "dovecot_socket": "./dovecot.sock"}})
|
|
|
+ if extra_env is None:
|
|
|
+ extra_env = {}
|
|
|
+ if extra_config is None:
|
|
|
+ extra_config = {}
|
|
|
+
|
|
|
+ config = {"auth": {"type": "dovecot",
|
|
|
+ "dovecot_socket": "./dovecot.sock"}}
|
|
|
+ for toplvl, entries in extra_config.items():
|
|
|
+ for key, val in entries.items():
|
|
|
+ config[toplvl][key] = val
|
|
|
+ self.configure(config)
|
|
|
|
|
|
if broken is None:
|
|
|
broken = []
|
|
|
@@ -311,10 +321,18 @@ class TestBaseAuthRequests(BaseTest):
|
|
|
if "done" not in broken:
|
|
|
handshake += b'DONE\n'
|
|
|
|
|
|
+ sent_rip = None
|
|
|
+
|
|
|
+ def record_sent_data(s, data, flags=None):
|
|
|
+ nonlocal sent_rip
|
|
|
+ if b'\trip=' in data:
|
|
|
+ sent_rip = data.split(b'\trip=')[1].split(b'\t')[0]
|
|
|
+ return len(data)
|
|
|
+
|
|
|
with patch.multiple(
|
|
|
'socket.socket',
|
|
|
connect=DEFAULT,
|
|
|
- send=DEFAULT,
|
|
|
+ send=record_sent_data,
|
|
|
recv=DEFAULT
|
|
|
) as mock_socket:
|
|
|
if "socket" in broken:
|
|
|
@@ -325,7 +343,9 @@ class TestBaseAuthRequests(BaseTest):
|
|
|
status, _, answer = self.request(
|
|
|
"PROPFIND", "/",
|
|
|
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(
|
|
|
- ("%s:%s" % (user, password)).encode()).decode())
|
|
|
+ ("%s:%s" % (user, password)).encode()).decode(),
|
|
|
+ **extra_env)
|
|
|
+ assert sent_rip == expected_rip
|
|
|
assert status == expected_status
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
|
|
|
@@ -392,6 +412,36 @@ class TestBaseAuthRequests(BaseTest):
|
|
|
def test_dovecot_auth_id_mismatch(self):
|
|
|
self._test_dovecot("user", "password", 401, response=b'OK\t2')
|
|
|
|
|
|
+ @pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
|
|
|
+ def test_dovecot_remote_addr(self):
|
|
|
+ self._test_dovecot("user", "password", 401, expected_rip=b'172.17.16.15',
|
|
|
+ extra_env={
|
|
|
+ 'REMOTE_ADDR': '172.17.16.15',
|
|
|
+ 'HTTP_X_REMOTE_ADDR': '127.0.0.1',
|
|
|
+ })
|
|
|
+
|
|
|
+ @pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
|
|
|
+ def test_dovecot_x_remote_addr(self):
|
|
|
+ self._test_dovecot("user", "password", 401, expected_rip=b'172.17.16.15',
|
|
|
+ extra_env={
|
|
|
+ 'REMOTE_ADDR': '127.0.0.1',
|
|
|
+ 'HTTP_X_REMOTE_ADDR': '172.17.16.15',
|
|
|
+ },
|
|
|
+ extra_config={
|
|
|
+ 'auth': {"dovecot_rip_x_remote_addr": "True"},
|
|
|
+ })
|
|
|
+
|
|
|
+ @pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
|
|
|
+ def test_dovecot_x_remote_addr_whitespace(self):
|
|
|
+ self._test_dovecot("user", "password", 401, expected_rip=b'172.17.16.15rip=127.0.0.1',
|
|
|
+ extra_env={
|
|
|
+ 'REMOTE_ADDR': '127.0.0.1',
|
|
|
+ 'HTTP_X_REMOTE_ADDR': '172.17.16.15\trip=127.0.0.1',
|
|
|
+ },
|
|
|
+ extra_config={
|
|
|
+ 'auth': {"dovecot_rip_x_remote_addr": "True"},
|
|
|
+ })
|
|
|
+
|
|
|
def test_custom(self) -> None:
|
|
|
"""Custom authentication."""
|
|
|
self.configure({"auth": {"type": "radicale.tests.custom.auth"}})
|