| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823 |
- # This file is part of Radicale - CalDAV and CardDAV server
- # Copyright © 2008 Nicolas Kandel
- # Copyright © 2008 Pascal Halter
- # Copyright © 2008-2017 Guillaume Ayoub
- # Copyright © 2017-2021 Unrud <unrud@outlook.com>
- # Copyright © 2024-2024 Pieter Hijma <pieterhijma@users.noreply.github.com>
- # Copyright © 2024-2024 Ray <ray@react0r.com>
- # Copyright © 2024-2025 Georgiy <metallerok@gmail.com>
- # Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
- # Copyright © 2025-2025 David Greaves <david@dgreaves.com>
- #
- # This library is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
- import contextlib
- import copy
- import datetime
- import posixpath
- import socket
- import xml.etree.ElementTree as ET
- from http import client
- from typing import (Callable, Iterable, Iterator, List, Optional, Sequence,
- Tuple, Union)
- from urllib.parse import unquote, urlparse
- import vobject
- import vobject.base
- from vobject.base import ContentLine
- import radicale.item as radicale_item
- from radicale import httputils, pathutils, storage, types, xmlutils
- from radicale.app.base import Access, ApplicationBase
- from radicale.item import filter as radicale_filter
- from radicale.log import logger
- DT_FORMAT_TIMESTAMP: str = '%Y%m%dT%H%M%SZ'
- DT_FORMAT_DATE: str = '%Y%m%d'
- def free_busy_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
- collection: storage.BaseCollection, encoding: str,
- unlock_storage_fn: Callable[[], None],
- max_occurrence: int
- ) -> Tuple[int, Union[ET.Element, str]]:
- # NOTE: this function returns both an Element and a string because
- # free-busy reports are an edge-case on the return type according
- # to the spec.
- multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
- if xml_request is None:
- return client.MULTI_STATUS, multistatus
- root = xml_request
- if (root.tag == xmlutils.make_clark("C:free-busy-query") and
- collection.tag != "VCALENDAR"):
- logger.warning("Invalid REPORT method %r on %r requested",
- xmlutils.make_human_tag(root.tag), path)
- return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
- time_range_element = root.find(xmlutils.make_clark("C:time-range"))
- assert isinstance(time_range_element, ET.Element)
- # Build a single filter from the free busy query for retrieval
- # TODO: filter for VFREEBUSY in additional to VEVENT but
- # test_filter doesn't support that yet.
- vevent_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
- attrib={'name': 'VEVENT'})
- vevent_cf_element.append(time_range_element)
- vcalendar_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
- attrib={'name': 'VCALENDAR'})
- vcalendar_cf_element.append(vevent_cf_element)
- filter_element = ET.Element(xmlutils.make_clark("C:filter"))
- filter_element.append(vcalendar_cf_element)
- filters = (filter_element,)
- # First pull from storage
- retrieved_items = list(collection.get_filtered(filters))
- # !!! Don't access storage after this !!!
- unlock_storage_fn()
- cal = vobject.iCalendar()
- collection_tag = collection.tag
- while retrieved_items:
- # Second filtering before evaluating occurrences.
- # ``item.vobject_item`` might be accessed during filtering.
- # Don't keep reference to ``item``, because VObject requires a lot of
- # memory.
- item, filter_matched = retrieved_items.pop(0)
- if not filter_matched:
- try:
- if not test_filter(collection_tag, item, filter_element):
- continue
- except ValueError as e:
- raise ValueError("Failed to free-busy filter item %r from %r: %s" %
- (item.href, collection.path, e)) from e
- except Exception as e:
- raise RuntimeError("Failed to free-busy filter item %r from %r: %s" %
- (item.href, collection.path, e)) from e
- fbtype = None
- if item.component_name == 'VEVENT':
- transp = getattr(item.vobject_item.vevent, 'transp', None)
- if transp and transp.value != 'OPAQUE':
- continue
- status = getattr(item.vobject_item.vevent, 'status', None)
- if not status or status.value == 'CONFIRMED':
- fbtype = 'BUSY'
- elif status.value == 'CANCELLED':
- fbtype = 'FREE'
- elif status.value == 'TENTATIVE':
- fbtype = 'BUSY-TENTATIVE'
- else:
- # Could do fbtype = status.value for x-name, I prefer this
- fbtype = 'BUSY'
- # TODO: coalesce overlapping periods
- if max_occurrence > 0:
- n_occurrences = max_occurrence+1
- else:
- n_occurrences = 0
- occurrences = radicale_filter.time_range_fill(item.vobject_item,
- time_range_element,
- "VEVENT",
- n=n_occurrences)
- if len(occurrences) >= max_occurrence:
- raise ValueError("FREEBUSY occurrences limit of {} hit"
- .format(max_occurrence))
- for occurrence in occurrences:
- vfb = cal.add('vfreebusy')
- vfb.add('dtstamp').value = item.vobject_item.vevent.dtstamp.value
- vfb.add('dtstart').value, vfb.add('dtend').value = occurrence
- if fbtype:
- vfb.add('fbtype').value = fbtype
- return (client.OK, cal.serialize())
- def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
- collection: storage.BaseCollection, encoding: str,
- unlock_storage_fn: Callable[[], None],
- max_occurrence: int = 0,
- ) -> Tuple[int, ET.Element]:
- """Read and answer REPORT requests that return XML.
- Read rfc3253-3.6 for info.
- """
- multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
- if xml_request is None:
- return client.MULTI_STATUS, multistatus
- root = xml_request
- if root.tag in (xmlutils.make_clark("D:principal-search-property-set"),
- xmlutils.make_clark("D:principal-property-search"),
- xmlutils.make_clark("D:expand-property")):
- # We don't support searching for principals or indirect retrieving of
- # properties, just return an empty result.
- # InfCloud asks for expand-property reports (even if we don't announce
- # support for them) and stops working if an error code is returned.
- logger.warning("Unsupported REPORT method %r on %r requested",
- xmlutils.make_human_tag(root.tag), path)
- return client.MULTI_STATUS, multistatus
- if (root.tag == xmlutils.make_clark("C:calendar-multiget") and
- collection.tag != "VCALENDAR" or
- root.tag == xmlutils.make_clark("CR:addressbook-multiget") and
- collection.tag != "VADDRESSBOOK" or
- root.tag == xmlutils.make_clark("D:sync-collection") and
- collection.tag not in ("VADDRESSBOOK", "VCALENDAR")):
- logger.warning("Invalid REPORT method %r on %r requested",
- xmlutils.make_human_tag(root.tag), path)
- return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
- props: Union[ET.Element, List]
- if root.find(xmlutils.make_clark("D:prop")) is not None:
- props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
- else:
- props = []
- hreferences: Iterable[str]
- if root.tag in (
- xmlutils.make_clark("C:calendar-multiget"),
- xmlutils.make_clark("CR:addressbook-multiget")):
- # Read rfc4791-7.9 for info
- hreferences = set()
- for href_element in root.findall(xmlutils.make_clark("D:href")):
- temp_url_path = urlparse(href_element.text).path
- assert isinstance(temp_url_path, str)
- href_path = pathutils.sanitize_path(unquote(temp_url_path))
- if (href_path + "/").startswith(base_prefix + "/"):
- hreferences.add(href_path[len(base_prefix):])
- else:
- logger.warning("Skipping invalid path %r in REPORT request on "
- "%r", href_path, path)
- elif root.tag == xmlutils.make_clark("D:sync-collection"):
- old_sync_token_element = root.find(
- xmlutils.make_clark("D:sync-token"))
- old_sync_token = ""
- if old_sync_token_element is not None and old_sync_token_element.text:
- old_sync_token = old_sync_token_element.text.strip()
- logger.debug("Client provided sync token: %r", old_sync_token)
- try:
- sync_token, names = collection.sync(old_sync_token)
- except ValueError as e:
- # Invalid sync token
- logger.warning("Client provided invalid sync token %r: %s",
- old_sync_token, e, exc_info=True)
- # client.CONFLICT doesn't work with some clients (e.g. InfCloud)
- return (client.FORBIDDEN,
- xmlutils.webdav_error("D:valid-sync-token"))
- hreferences = (pathutils.unstrip_path(
- posixpath.join(collection.path, n)) for n in names)
- # Append current sync token to response
- sync_token_element = ET.Element(xmlutils.make_clark("D:sync-token"))
- sync_token_element.text = sync_token
- multistatus.append(sync_token_element)
- else:
- hreferences = (path,)
- filters = (
- root.findall(xmlutils.make_clark("C:filter")) +
- root.findall(xmlutils.make_clark("CR:filter")))
- expand = root.find(".//" + xmlutils.make_clark("C:expand"))
- # if we have expand prop we use "filter (except time range) -> expand -> filter (only time range)" approach
- time_range_element = None
- main_filters = []
- for filter_ in filters:
- # extract time-range filter for processing after main filters
- # for expand request
- filter_copy = copy.deepcopy(filter_)
- if expand is not None:
- for comp_filter in filter_copy.findall(".//" + xmlutils.make_clark("C:comp-filter")):
- if comp_filter.get("name", "").upper() == "VCALENDAR":
- continue
- time_range_element = comp_filter.find(xmlutils.make_clark("C:time-range"))
- if time_range_element is not None:
- comp_filter.remove(time_range_element)
- main_filters.append(filter_copy)
- # Retrieve everything required for finishing the request.
- retrieved_items = list(retrieve_items(
- base_prefix, path, collection, hreferences, main_filters, multistatus))
- collection_tag = collection.tag
- # !!! Don't access storage after this !!!
- unlock_storage_fn()
- n_vevents = 0
- while retrieved_items:
- # ``item.vobject_item`` might be accessed during filtering.
- # Don't keep reference to ``item``, because VObject requires a lot of
- # memory.
- item, filters_matched = retrieved_items.pop(0)
- if filters and not filters_matched:
- try:
- if not all(test_filter(collection_tag, item, filter_)
- for filter_ in main_filters):
- continue
- except ValueError as e:
- raise ValueError("Failed to filter item %r from %r: %s" %
- (item.href, collection.path, e)) from e
- except Exception as e:
- raise RuntimeError("Failed to filter item %r from %r: %s" %
- (item.href, collection.path, e)) from e
- found_props = []
- not_found_props = []
- item_etag: str = ""
- for prop in props:
- element = ET.Element(prop.tag)
- if prop.tag == xmlutils.make_clark("D:getetag"):
- if expand is not None:
- item_etag = item.etag
- else:
- element.text = item.etag
- found_props.append(element)
- elif prop.tag == xmlutils.make_clark("D:getcontenttype"):
- element.text = xmlutils.get_content_type(item, encoding)
- found_props.append(element)
- elif prop.tag in (
- xmlutils.make_clark("C:calendar-data"),
- xmlutils.make_clark("CR:address-data")):
- element.text = item.serialize()
- if (expand is not None) and item.component_name == 'VEVENT':
- starts = expand.get('start')
- ends = expand.get('end')
- if (starts is None) or (ends is None):
- return client.FORBIDDEN, \
- xmlutils.webdav_error("C:expand")
- start = datetime.datetime.strptime(
- starts, DT_FORMAT_TIMESTAMP
- ).replace(tzinfo=datetime.timezone.utc)
- end = datetime.datetime.strptime(
- ends, DT_FORMAT_TIMESTAMP
- ).replace(tzinfo=datetime.timezone.utc)
- time_range_start = None
- time_range_end = None
- if time_range_element is not None:
- time_range_start, time_range_end = radicale_filter.parse_time_range(time_range_element)
- (expanded_element, n_vev) = _expand(
- element=element, item=copy.copy(item),
- start=start, end=end,
- time_range_start=time_range_start, time_range_end=time_range_end,
- max_occurrence=max_occurrence,
- )
- if n_vev == 0:
- logger.debug("No VEVENTs found after expansion for %r, skipping", item.href)
- continue
- n_vevents += n_vev
- found_props.append(expanded_element)
- else:
- found_props.append(element)
- if hasattr(item.vobject_item, "vevent_list"):
- n_vevents += len(item.vobject_item.vevent_list)
- # Avoid DoS with too many events
- if max_occurrence and n_vevents > max_occurrence:
- raise ValueError("REPORT occurrences limit of {} hit"
- .format(max_occurrence))
- else:
- not_found_props.append(element)
- assert item.href
- uri = pathutils.unstrip_path(
- posixpath.join(collection.path, item.href))
- if found_props or not_found_props:
- multistatus.append(xml_item_response(
- base_prefix, uri, found_props=found_props,
- not_found_props=not_found_props, found_item=True, item_etag=item_etag))
- return client.MULTI_STATUS, multistatus
- def _expand(
- element: ET.Element,
- item: radicale_item.Item,
- start: datetime.datetime,
- end: datetime.datetime,
- time_range_start: Optional[datetime.datetime] = None,
- time_range_end: Optional[datetime.datetime] = None,
- max_occurrence: int = 0,
- ) -> Tuple[ET.Element, int]:
- vevent_component: vobject.base.Component = copy.copy(item.vobject_item)
- logger.info("Expanding event %s", item.href)
- logger.debug(f"Expand range: {start} to {end}")
- logger.debug(f"Time range: {time_range_start} to {time_range_end}")
- # Split the vevents included in the component into one that contains the
- # recurrence information and others that contain a recurrence id to
- # override instances.
- base_vevent, vevents_overridden = _split_overridden_vevents(vevent_component)
- dt_format = DT_FORMAT_TIMESTAMP
- all_day_event = False
- if type(base_vevent.dtstart.value) is datetime.date:
- # If an event comes to us with a dtstart specified as a date
- # then in the response we return the date, not datetime
- dt_format = DT_FORMAT_DATE
- all_day_event = True
- # In case of dates, we need to remove timezone information since
- # rruleset.between computes with datetimes without timezone information
- start = start.replace(tzinfo=None)
- end = end.replace(tzinfo=None)
- if time_range_start is not None and time_range_end is not None:
- time_range_start = time_range_start.replace(tzinfo=None)
- time_range_end = time_range_end.replace(tzinfo=None)
- for vevent in vevents_overridden:
- _strip_single_event(vevent, dt_format)
- duration = None
- if hasattr(base_vevent, "dtend"):
- duration = base_vevent.dtend.value - base_vevent.dtstart.value
- elif hasattr(base_vevent, "duration"):
- try:
- duration = base_vevent.duration.value
- if duration.total_seconds() <= 0:
- logger.warning("Invalid DURATION: %s", duration)
- duration = None
- except (AttributeError, TypeError) as e:
- logger.warning("Failed to parse DURATION: %s", e)
- duration = None
- # Generate EXDATE to remove from expansion range
- exdates_set: set[datetime.datetime] = set()
- if hasattr(base_vevent, 'exdate'):
- exdates = base_vevent.exdate.value
- if not isinstance(exdates, list):
- exdates = [exdates]
- exdates_set = {
- exdate.astimezone(datetime.timezone.utc) if isinstance(exdate, datetime.datetime)
- else datetime.datetime.fromordinal(exdate.toordinal()).replace(tzinfo=None)
- for exdate in exdates
- }
- logger.debug("EXDATE values: %s", exdates_set)
- events_for_filtering = vevents_overridden
- rruleset = None
- if hasattr(base_vevent, 'rrule'):
- rruleset = base_vevent.getrruleset()
- else:
- # if event does not have rrule, only include base event
- events_for_filtering = [base_vevent]
- filtered_vevents = []
- if rruleset:
- # This function uses datetimes internally without timezone info for dates
- # A vobject rruleset is for the event dtstart.
- # Expanded over a given time range this will not include
- # events which started before the time range but are still
- # ongoing at the start of the range
- # To accomodate this, reduce the start time by the duration of
- # the event. If this introduces an extra reccurence point then
- # that event should be included as it is still ongoing. If no
- # extra point is generated then it was a no-op.
- rstart = start - duration if duration and duration.total_seconds() > 0 else start
- recurrences = rruleset.between(rstart, end, inc=True, count=max_occurrence)
- if max_occurrence and len(recurrences) >= max_occurrence:
- # this shouldn't be > and if it's == then assume a limit
- # was hit and ignore that maybe some would be filtered out
- # by EXDATE etc. This is anti-DoS, not precise limits
- raise ValueError("REPORT occurrences limit of {} hit"
- .format(max_occurrence))
- _strip_component(vevent_component)
- _strip_single_event(base_vevent, dt_format)
- i_overridden = 0
- for recurrence_dt in recurrences:
- recurrence_utc = recurrence_dt if all_day_event else recurrence_dt.astimezone(datetime.timezone.utc)
- logger.debug("Processing recurrence: %s (all_day_event: %s)", recurrence_utc, all_day_event)
- # Apply time-range filter
- if time_range_start is not None and time_range_end is not None:
- dtstart = recurrence_utc
- dtend = dtstart + duration if duration else dtstart
- # Start includes the time, end does not
- if not (dtstart <= time_range_end and dtend > time_range_start):
- logger.debug("Recurrence %s filtered out by time-range", recurrence_utc)
- continue
- # Check exdate
- if recurrence_utc in exdates_set:
- logger.debug("Recurrence %s excluded by EXDATE", recurrence_utc)
- continue
- # Check for overridden instances
- i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format)
- if not vevent:
- # Create new instance from recurrence
- vevent = copy.deepcopy(base_vevent)
- # For all day events, the system timezone may influence the
- # results, so use recurrence_dt
- recurrence_id = recurrence_dt if all_day_event else recurrence_utc
- logger.debug("Creating new VEVENT with RECURRENCE-ID: %s", recurrence_id)
- vevent.recurrence_id = ContentLine(
- name='RECURRENCE-ID',
- value=recurrence_id, params={}
- )
- _convert_to_utc(vevent, 'recurrence_id', dt_format)
- suffix = ''
- if (dt_format == DT_FORMAT_DATE):
- suffix = ';VALUE=DATE'
- else:
- suffix = ''
- vevent.dtstart = ContentLine(
- name='DTSTART' + suffix,
- value=recurrence_id.strftime(dt_format), params={}
- )
- # if there is a DTEND, override it. Duration does not need changing
- if hasattr(vevent, "dtend"):
- vevent.dtend = ContentLine(
- name='DTEND' + suffix,
- value=(recurrence_id + duration).strftime(dt_format), params={}
- )
- filtered_vevents.append(vevent)
- # Filter overridden and non-recurring events
- if time_range_start is not None and time_range_end is not None:
- for vevent in events_for_filtering:
- dtstart = vevent.dtstart.value
- # Handle string values for DTSTART/DTEND
- if isinstance(dtstart, str):
- try:
- dtstart = datetime.datetime.strptime(dtstart, dt_format)
- if all_day_event:
- dtstart = dtstart.date()
- except ValueError as e:
- logger.warning("Invalid DTSTART format: %s, error: %s", dtstart, e)
- continue
- dtend = dtstart + duration if duration else dtstart
- logger.debug(
- "Filtering VEVENT with DTSTART: %s (type: %s), DTEND: %s (type: %s)",
- dtstart, type(dtstart), dtend, type(dtend))
- # Convert to datetime for comparison
- if all_day_event and isinstance(dtstart, datetime.date) and not isinstance(dtstart, datetime.datetime):
- dtstart = datetime.datetime.fromordinal(dtstart.toordinal()).replace(tzinfo=None)
- dtend = datetime.datetime.fromordinal(dtend.toordinal()).replace(tzinfo=None)
- elif not all_day_event and isinstance(dtstart, datetime.datetime) \
- and isinstance(dtend, datetime.datetime):
- dtstart = dtstart.replace(tzinfo=datetime.timezone.utc)
- dtend = dtend.replace(tzinfo=datetime.timezone.utc)
- else:
- logger.warning("Unexpected DTSTART/DTEND type: dtstart=%s, dtend=%s", type(dtstart), type(dtend))
- continue
- if dtstart < time_range_end and dtend > time_range_start:
- if vevent not in filtered_vevents: # Avoid duplicates
- logger.debug("VEVENT passed time-range filter: %s", dtstart)
- filtered_vevents.append(vevent)
- else:
- logger.debug("VEVENT filtered out: %s", dtstart)
- # Rebuild component
- if not filtered_vevents:
- element.text = ""
- return element, 0
- else:
- vevent_component.vevent_list = filtered_vevents
- logger.debug("lbt: vevent_component %s", vevent_component)
- element.text = vevent_component.serialize()
- return element, len(filtered_vevents)
- def _convert_timezone(vevent: vobject.icalendar.RecurringComponent,
- name_prop: str,
- name_content_line: str):
- prop = getattr(vevent, name_prop, None)
- if prop:
- if type(prop.value) is datetime.date:
- date_time = datetime.datetime.fromordinal(
- prop.value.toordinal()
- ).replace(tzinfo=datetime.timezone.utc)
- else:
- date_time = prop.value.astimezone(datetime.timezone.utc)
- setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[]))
- def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent,
- name_prop: str,
- dt_format: str):
- prop = getattr(vevent, name_prop, None)
- if prop:
- setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[]))
- def _strip_single_event(vevent: vobject.icalendar.RecurringComponent, dt_format: str) -> None:
- _convert_timezone(vevent, 'dtstart', 'DTSTART')
- _convert_timezone(vevent, 'dtend', 'DTEND')
- _convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID')
- # There is something strange behaviour during serialization native datetime, so converting manually
- _convert_to_utc(vevent, 'dtstart', dt_format)
- _convert_to_utc(vevent, 'dtend', dt_format)
- _convert_to_utc(vevent, 'recurrence_id', dt_format)
- try:
- delattr(vevent, 'rrule')
- delattr(vevent, 'exdate')
- delattr(vevent, 'exrule')
- delattr(vevent, 'rdate')
- except AttributeError:
- pass
- def _strip_component(vevent: vobject.base.Component) -> None:
- timezones_to_remove = []
- for component in vevent.components():
- if component.name == 'VTIMEZONE':
- timezones_to_remove.append(component)
- for timezone in timezones_to_remove:
- vevent.remove(timezone)
- def _split_overridden_vevents(
- component: vobject.base.Component,
- ) -> Tuple[
- vobject.icalendar.RecurringComponent,
- List[vobject.icalendar.RecurringComponent]
- ]:
- vevent_recurrence = None
- vevents_overridden = []
- for vevent in component.vevent_list:
- if hasattr(vevent, 'recurrence_id'):
- vevents_overridden += [vevent]
- elif vevent_recurrence:
- raise ValueError(
- f"component with UID {vevent.uid} "
- f"has more than one vevent with recurrence information"
- )
- else:
- vevent_recurrence = vevent
- if vevent_recurrence:
- return (
- vevent_recurrence, sorted(
- vevents_overridden,
- key=lambda vevent: vevent.recurrence_id.value
- )
- )
- else:
- raise ValueError(
- f"component with UID {vevent.uid} "
- f"does not have a vevent without a recurrence_id"
- )
- def _find_overridden(
- start: int,
- vevents: List[vobject.icalendar.RecurringComponent],
- dt: datetime.datetime,
- dt_format: str
- ) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]:
- for i in range(start, len(vevents)):
- dt_event = datetime.datetime.strptime(
- vevents[i].recurrence_id.value,
- dt_format
- ).replace(tzinfo=datetime.timezone.utc)
- if dt_event == dt:
- return (i + 1, vevents[i])
- return (start, None)
- def xml_item_response(base_prefix: str, href: str,
- found_props: Sequence[ET.Element] = (),
- not_found_props: Sequence[ET.Element] = (),
- found_item: bool = True, item_etag: str = "") -> ET.Element:
- response = ET.Element(xmlutils.make_clark("D:response"))
- href_element = ET.Element(xmlutils.make_clark("D:href"))
- href_element.text = xmlutils.make_href(base_prefix, href)
- response.append(href_element)
- if found_item:
- for code, props in ((200, found_props), (404, not_found_props)):
- if props:
- propstat = ET.Element(xmlutils.make_clark("D:propstat"))
- status = ET.Element(xmlutils.make_clark("D:status"))
- status.text = xmlutils.make_response(code)
- prop_element = ET.Element(xmlutils.make_clark("D:prop"))
- if (item_etag != "") and (code == 200):
- prop_etag = ET.Element(xmlutils.make_clark("D:getetag"))
- prop_etag.text = item_etag
- prop_element.append(prop_etag)
- for prop in props:
- prop_element.append(prop)
- propstat.append(prop_element)
- propstat.append(status)
- response.append(propstat)
- else:
- status = ET.Element(xmlutils.make_clark("D:status"))
- status.text = xmlutils.make_response(404)
- response.append(status)
- return response
- def retrieve_items(
- base_prefix: str, path: str, collection: storage.BaseCollection,
- hreferences: Iterable[str], filters: Sequence[ET.Element],
- multistatus: ET.Element) -> Iterator[Tuple[radicale_item.Item, bool]]:
- """Retrieves all items that are referenced in ``hreferences`` from
- ``collection`` and adds 404 responses for missing and invalid items
- to ``multistatus``."""
- collection_requested = False
- def get_names() -> Iterator[str]:
- """Extracts all names from references in ``hreferences`` and adds
- 404 responses for invalid references to ``multistatus``.
- If the whole collections is referenced ``collection_requested``
- gets set to ``True``."""
- nonlocal collection_requested
- for hreference in hreferences:
- try:
- name = pathutils.name_from_path(hreference, collection)
- except ValueError as e:
- logger.warning("Skipping invalid path %r in REPORT request on "
- "%r: %s", hreference, path, e)
- response = xml_item_response(base_prefix, hreference,
- found_item=False)
- multistatus.append(response)
- continue
- if name:
- # Reference is an item
- yield name
- else:
- # Reference is a collection
- collection_requested = True
- for name, item in collection.get_multi(get_names()):
- if not item:
- uri = pathutils.unstrip_path(posixpath.join(collection.path, name))
- response = xml_item_response(base_prefix, uri, found_item=False)
- multistatus.append(response)
- else:
- yield item, False
- if collection_requested:
- yield from collection.get_filtered(filters)
- def test_filter(collection_tag: str, item: radicale_item.Item,
- filter_: ET.Element) -> bool:
- """Match an item against a filter."""
- if (collection_tag == "VCALENDAR" and
- filter_.tag != xmlutils.make_clark("C:%s" % filter_)):
- if len(filter_) == 0:
- return True
- if len(filter_) > 1:
- raise ValueError("Filter with %d children" % len(filter_))
- if filter_[0].tag != xmlutils.make_clark("C:comp-filter"):
- raise ValueError("Unexpected %r in filter" % filter_[0].tag)
- return radicale_filter.comp_match(item, filter_[0])
- if (collection_tag == "VADDRESSBOOK" and
- filter_.tag != xmlutils.make_clark("CR:%s" % filter_)):
- for child in filter_:
- if child.tag != xmlutils.make_clark("CR:prop-filter"):
- raise ValueError("Unexpected %r in filter" % child.tag)
- test = filter_.get("test", "anyof")
- if test == "anyof":
- return any(radicale_filter.prop_match(item.vobject_item, f, "CR")
- for f in filter_)
- if test == "allof":
- return all(radicale_filter.prop_match(item.vobject_item, f, "CR")
- for f in filter_)
- raise ValueError("Unsupported filter test: %r" % test)
- raise ValueError("Unsupported filter %r for %r" %
- (filter_.tag, collection_tag))
- class ApplicationPartReport(ApplicationBase):
- def do_REPORT(self, environ: types.WSGIEnviron, base_prefix: str,
- path: str, user: str) -> types.WSGIResponse:
- """Manage REPORT request."""
- access = Access(self._rights, user, path)
- if not access.check("r"):
- return httputils.NOT_ALLOWED
- try:
- xml_content = self._read_xml_request_body(environ)
- except RuntimeError as e:
- logger.warning("Bad REPORT request on %r: %s", path, e,
- exc_info=True)
- return httputils.BAD_REQUEST
- except socket.timeout:
- logger.debug("Client timed out", exc_info=True)
- return httputils.REQUEST_TIMEOUT
- with contextlib.ExitStack() as lock_stack:
- lock_stack.enter_context(self._storage.acquire_lock("r", user))
- item = next(iter(self._storage.discover(path)), None)
- if not item:
- return httputils.NOT_FOUND
- if not access.check("r", item):
- return httputils.NOT_ALLOWED
- if isinstance(item, storage.BaseCollection):
- collection = item
- else:
- assert item.collection is not None
- collection = item.collection
- max_occurrence = self.configuration.get("reporting", "max_freebusy_occurrence")
- if xml_content is not None and \
- xml_content.tag == xmlutils.make_clark("C:free-busy-query"):
- try:
- status, body = free_busy_report(
- base_prefix, path, xml_content, collection, self._encoding,
- lock_stack.close, max_occurrence)
- except ValueError as e:
- logger.warning(
- "Bad REPORT request on %r: %s", path, e, exc_info=True)
- return httputils.BAD_REQUEST
- headers = {"Content-Type": "text/calendar; charset=%s" % self._encoding}
- return status, headers, str(body)
- else:
- try:
- status, xml_answer = xml_report(
- base_prefix, path, xml_content, collection, self._encoding,
- lock_stack.close, max_occurrence)
- except ValueError as e:
- logger.warning(
- "Bad REPORT request on %r: %s", path, e, exc_info=True)
- return httputils.BAD_REQUEST
- headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
- return status, headers, self._xml_response(xml_answer)
|