From 892ed61bcb8f4c8521ace74dc51a77d6a61a3c35 Mon Sep 17 00:00:00 2001 From: Ryan N Johnson Date: Wed, 13 Feb 2019 09:07:27 +0000 Subject: [PATCH] Refactor icalendar helpers into their own module --- khal/controllers.py | 9 +- khal/icalendar.py | 501 ++++++++++++++++++++++++++++++++++ khal/khalendar/backend.py | 15 +- khal/khalendar/event.py | 4 +- khal/ui/__init__.py | 6 +- khal/utils.py | 478 +------------------------------- tests/icalendar_test.py | 73 +++++ tests/khalendar_test.py | 3 +- tests/khalendar_utils_test.py | 84 +++--- tests/parse_datetime_test.py | 2 +- tests/utils_test.py | 73 ----- 11 files changed, 639 insertions(+), 609 deletions(-) create mode 100644 khal/icalendar.py create mode 100644 tests/icalendar_test.py diff --git a/khal/controllers.py b/khal/controllers.py index e774d6cbd..d45fa898a 100644 --- a/khal/controllers.py +++ b/khal/controllers.py @@ -36,9 +36,10 @@ from khal.khalendar.exceptions import DuplicateUid, ReadOnlyCalendarError from .exceptions import ConfigurationError +from .icalendar import (cal_from_ics, new_event as new_vevent, split_ics, + sort_key as sort_vevent_key) from .khalendar.vdir import Item from .terminal import merge_columns -from .utils import cal_from_ics logger = logging.getLogger('khal') @@ -358,7 +359,7 @@ def new_from_args(collection, calendar_name, conf, dtstart=None, dtend=None, if isinstance(categories, str): categories = list([category.strip() for category in categories.split(',')]) try: - event = utils.new_event( + event = new_vevent( locale=conf['locale'], location=location, categories=categories, repeat=repeat, until=until, alarms=alarms, dtstart=dtstart, dtend=dtend, summary=summary, description=description, timezone=timezone, @@ -554,7 +555,7 @@ def import_ics(collection, conf, ics, batch=False, random_uid=False, format=None """ if format is None: format = conf['view']['event_format'] - vevents = utils.split_ics(ics, random_uid, conf['locale']['default_timezone']) + vevents = split_ics(ics, random_uid, conf['locale']['default_timezone']) for vevent in vevents: import_event(vevent, collection, conf['locale'], batch, format, env) @@ -624,7 +625,7 @@ def print_ics(conf, name, ics, format): vevents = list() for uid in events_grouped: - vevents.append(sorted(events_grouped[uid], key=utils.sort_key)) + vevents.append(sorted(events_grouped[uid], key=sort_vevent_key)) echo('{} events found in {}'.format(len(vevents), name)) for sub_event in vevents: diff --git a/khal/icalendar.py b/khal/icalendar.py new file mode 100644 index 000000000..869615c11 --- /dev/null +++ b/khal/icalendar.py @@ -0,0 +1,501 @@ +# Copyright (c) 2013-2017 Christian Geier et al. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +"""collection of icalendar helper functions""" + +import datetime as dt +import dateutil.rrule +import icalendar +import logging +import pytz +from collections import defaultdict + +from .exceptions import UnsupportedRecurrence +from .parse_datetime import guesstimedeltafstr, rrulefstr +from .utils import generate_random_uid, localize_strip_tz, to_unix_time + +logger = logging.getLogger('khal') + + +def split_ics(ics, random_uid=False, default_timezone=None): + """split an ics string into several according to VEVENT's UIDs + + and sort the right VTIMEZONEs accordingly + ignores all other ics components + :type ics: str + :param random_uid: assign random uids to all events + :type random_uid: bool + :rtype list: + """ + cal = cal_from_ics(ics) + tzs = {item['TZID']: item for item in cal.walk() if item.name == 'VTIMEZONE'} + + events_grouped = defaultdict(list) + for item in cal.walk(): + if item.name == 'VEVENT': + events_grouped[item['UID']].append(item) + else: + continue + return [ics_from_list(events, tzs, random_uid, default_timezone) for uid, events in + sorted(events_grouped.items())] + + +def new_event(locale, dtstart=None, dtend=None, summary=None, timezone=None, + allday=False, description=None, location=None, categories=None, + repeat=None, until=None, alarms=None): + """create a new event + + :param dtstart: starttime of that event + :type dtstart: datetime + :param dtend: end time of that event, if this is a *date*, this value is + interpreted as being the last date the event is scheduled on, i.e. + the VEVENT DTEND will be *one day later* + :type dtend: datetime + :param summary: description of the event, used in the SUMMARY property + :type summary: unicode + :param timezone: timezone of the event (start and end) + :type timezone: pytz.timezone + :param allday: if set to True, we will not transform dtstart and dtend to + datetime + :type allday: bool + :returns: event + :rtype: icalendar.Event + """ + + if dtstart is None: + raise ValueError("no start given") + if dtend is None: + raise ValueError("no end given") + if summary is None: + raise ValueError("no summary given") + + if not allday and timezone is not None: + dtstart = timezone.localize(dtstart) + dtend = timezone.localize(dtend) + + event = icalendar.Event() + event.add('dtstart', dtstart) + event.add('dtend', dtend) + event.add('dtstamp', dt.datetime.now()) + event.add('summary', summary) + event.add('uid', generate_random_uid()) + # event.add('sequence', 0) + + if description: + event.add('description', description) + if location: + event.add('location', location) + if categories: + event.add('categories', categories) + if repeat and repeat != "none": + rrule = rrulefstr(repeat, until, locale) + event.add('rrule', rrule) + if alarms: + for alarm in alarms.split(","): + alarm = alarm.strip() + alarm_trig = -1 * guesstimedeltafstr(alarm) + new_alarm = icalendar.Alarm() + new_alarm.add('ACTION', 'DISPLAY') + new_alarm.add('TRIGGER', alarm_trig) + new_alarm.add('DESCRIPTION', description) + event.add_component(new_alarm) + return event + + +def ics_from_list(events, tzs, random_uid=False, default_timezone=None): + """convert an iterable of icalendar.Events to an icalendar.Calendar + + :params events: list of events all with the same uid + :type events: list(icalendar.cal.Event) + :param random_uid: assign random uids to all events + :type random_uid: bool + :param tzs: collection of timezones + :type tzs: dict(icalendar.cal.Vtimzone + """ + calendar = icalendar.Calendar() + calendar.add('version', '2.0') + calendar.add( + 'prodid', '-//PIMUTILS.ORG//NONSGML khal / icalendar //EN' + ) + + if random_uid: + new_uid = generate_random_uid() + + needed_tz, missing_tz = set(), set() + for sub_event in events: + sub_event = sanitize(sub_event, default_timezone=default_timezone) + if random_uid: + sub_event['UID'] = new_uid + # icalendar round-trip converts `TZID=a b` to `TZID="a b"` investigate, file bug XXX + for prop in ['DTSTART', 'DTEND', 'DUE', 'EXDATE', 'RDATE', 'RECURRENCE-ID', 'DUE']: + if isinstance(sub_event.get(prop), list): + items = sub_event.get(prop) + else: + items = [sub_event.get(prop)] + + for item in items: + if not (hasattr(item, 'dt') or hasattr(item, 'dts')): + continue + # if prop is a list, all items have the same parameters + datetime_ = item.dts[0].dt if hasattr(item, 'dts') else item.dt + + if not hasattr(datetime_, 'tzinfo'): + continue + + # check for datetimes' timezones which are not understood by + # icalendar + if datetime_.tzinfo is None and 'TZID' in item.params and \ + item.params['TZID'] not in missing_tz: + logger.warning( + 'Cannot find timezone `{}` in .ics file, using default timezone. ' + 'This can lead to erroneous time shifts'.format(item.params['TZID']) + ) + missing_tz.add(item.params['TZID']) + elif datetime_.tzinfo and datetime_.tzinfo != pytz.UTC and \ + datetime_.tzinfo not in needed_tz: + needed_tz.add(datetime_.tzinfo) + + for tzid in needed_tz: + if str(tzid) in tzs: + calendar.add_component(tzs[str(tzid)]) + else: + logger.warning( + 'Cannot find timezone `{}` in .ics file, this could be a bug, ' + 'please report this issue at http://github.com/pimutils/khal/.'.format(tzid)) + for sub_event in events: + calendar.add_component(sub_event) + return calendar.to_ical().decode('utf-8') + + +def expand(vevent, href=''): + """ + Constructs a list of start and end dates for all recurring instances of the + event defined in vevent. + + It considers RRULE as well as RDATE and EXDATE properties. In case of + unsupported recursion rules an UnsupportedRecurrence exception is thrown. + + If the vevent contains a RECURRENCE-ID property, no expansion is done, + the function still returns a tuple of start and end (date)times. + + :param vevent: vevent to be expanded + :type vevent: icalendar.cal.Event + :param href: the href of the vevent, used for more informative logging and + nothing else + :type href: str + :returns: list of start and end (date)times of the expanded event + :rtype: list(tuple(datetime, datetime)) + """ + # we do this now and than never care about the "real" end time again + if 'DURATION' in vevent: + duration = vevent['DURATION'].dt + else: + duration = vevent['DTEND'].dt - vevent['DTSTART'].dt + + # if this vevent has a RECURRENCE_ID property, no expansion will be + # performed + expand = not bool(vevent.get('RECURRENCE-ID')) + + events_tz = getattr(vevent['DTSTART'].dt, 'tzinfo', None) + allday = not isinstance(vevent['DTSTART'].dt, dt.datetime) + + def sanitize_datetime(date): + if allday and isinstance(date, dt.datetime): + date = date.date() + if events_tz is not None: + date = events_tz.localize(date) + return date + + rrule_param = vevent.get('RRULE') + if expand and rrule_param is not None: + vevent = sanitize_rrule(vevent) + + # dst causes problem while expanding the rrule, therefore we transform + # everything to naive datetime objects and transform back after + # expanding + # See https://github.com/dateutil/dateutil/issues/102 + dtstart = vevent['DTSTART'].dt + if events_tz: + dtstart = dtstart.replace(tzinfo=None) + if events_tz and 'Z' not in rrule_param.to_ical().decode(): + logger.warning( + "In event {}, DTSTART has a timezone, but UNTIL does not. This " + "might lead to errenous repeating instances (like missing the " + "last intended instance or adding an extra one)." + "".format(href)) + elif not events_tz and 'Z' in rrule_param.to_ical().decode(): + logger.warning( + "In event {}, DTSTART has no timezone, but UNTIL has one. This " + "might lead to errenous repeating instances (like missing the " + "last intended instance or adding an extra one)." + "".format(href)) + + rrule = dateutil.rrule.rrulestr( + rrule_param.to_ical().decode(), + dtstart=dtstart, + ignoretz=True, + ) + + if rrule._until is None: + # rrule really doesn't like to calculate all recurrences until + # eternity, so we only do it until 2037, because a) I'm not sure + # if python can deal with larger datetime values yet and b) pytz + # doesn't know any larger transition times + rrule._until = dt.datetime(2037, 12, 31) + elif events_tz and 'Z' in rrule_param.to_ical().decode(): + rrule._until = pytz.UTC.localize( + rrule._until).astimezone(events_tz).replace(tzinfo=None) + + rrule = map(sanitize_datetime, rrule) + + logger.debug('calculating recurrence dates for {}, this might take some time.'.format(href)) + + # RRULE and RDATE may specify the same date twice, it is recommended by + # the RFC to consider this as only one instance + dtstartl = set(rrule) + if not dtstartl: + raise UnsupportedRecurrence() + else: + dtstartl = {vevent['DTSTART'].dt} + + def get_dates(vevent, key): + # TODO replace with get_all_properties + dates = vevent.get(key) + if dates is None: + return + if not isinstance(dates, list): + dates = [dates] + + dates = (leaf.dt for tree in dates for leaf in tree.dts) + dates = localize_strip_tz(dates, events_tz) + return map(sanitize_datetime, dates) + + # include explicitly specified recursion dates + if expand: + dtstartl.update(get_dates(vevent, 'RDATE') or ()) + + # remove excluded dates + if expand: + for date in get_dates(vevent, 'EXDATE') or (): + try: + dtstartl.remove(date) + except KeyError: + logger.warning( + 'In event {}, excluded instance starting at {} not found, ' + 'event might be invalid.'.format(href, date)) + + dtstartend = [(start, start + duration) for start in dtstartl] + # not necessary, but I prefer deterministic output + dtstartend.sort() + return dtstartend + + +def assert_only_one_uid(cal: icalendar.Calendar): + """assert the all VEVENTs in cal have the same UID""" + uids = set() + for item in cal.walk(): + if item.name == 'VEVENT': + uids.add(item['UID']) + if len(uids) > 1: + return False + else: + return True + + +def sanitize(vevent, default_timezone, href='', calendar=''): + """ + clean up vevents we do not understand + + :param vevent: the vevent that needs to be cleaned + :type vevent: icalendar.cal.Event + :param default_timezone: timezone to apply to start and/or end dates which + were supposed to be localized but which timezone was not understood + by icalendar + :type timezone: pytz.timezone + :param href: used for logging to inform user which .ics files are + problematic + :type href: str + :param calendar: used for logging to inform user which .ics files are + problematic + :type calendar: str + :returns: clean vevent + :rtype: icalendar.cal.Event + """ + # convert localized datetimes with timezone information we don't + # understand to the default timezone + # TODO do this for everything where a TZID can appear (RDATE, EXDATE) + for prop in ['DTSTART', 'DTEND', 'DUE', 'RECURRENCE-ID']: + if prop in vevent and invalid_timezone(vevent[prop]): + timezone = vevent[prop].params.get('TZID') + value = default_timezone.localize(vevent.pop(prop).dt) + vevent.add(prop, value) + logger.warning( + "{} localized in invalid or incomprehensible timezone `{}` in {}/{}. " + "This could lead to this event being wrongly displayed." + "".format(prop, timezone, calendar, href) + ) + + vdtstart = vevent.pop('DTSTART', None) + vdtend = vevent.pop('DTEND', None) + dtstart = getattr(vdtstart, 'dt', None) + dtend = getattr(vdtend, 'dt', None) + + # event with missing DTSTART + if dtstart is None: + raise ValueError('Event has no start time (DTSTART).') + dtstart, dtend = sanitize_timerange( + dtstart, dtend, duration=vevent.get('DURATION', None)) + + vevent.add('DTSTART', dtstart) + if dtend is not None: + vevent.add('DTEND', dtend) + return vevent + + +def sanitize_timerange(dtstart, dtend, duration=None): + '''return sensible dtstart and end for events that have an invalid or + missing DTEND, assuming the event just lasts one hour.''' + + if isinstance(dtstart, dt.datetime) and isinstance(dtend, dt.datetime): + if dtstart.tzinfo and not dtend.tzinfo: + logger.warning( + "Event end time has no timezone. " + "Assuming it's the same timezone as the start time" + ) + dtend = dtstart.tzinfo.localize(dtend) + if not dtstart.tzinfo and dtend.tzinfo: + logger.warning( + "Event start time has no timezone. " + "Assuming it's the same timezone as the end time" + ) + dtstart = dtend.tzinfo.localize(dtstart) + + if dtend is None and duration is None: + if isinstance(dtstart, dt.datetime): + dtstart = dtstart.date() + dtend = dtstart + dt.timedelta(days=1) + elif dtend is not None: + if dtend < dtstart: + raise ValueError('The event\'s end time (DTEND) is older than ' + 'the event\'s start time (DTSTART).') + elif dtend == dtstart: + logger.warning( + "Event start time and end time are the same. " + "Assuming the event's duration is one hour." + ) + dtend += dt.timedelta(hours=1) + + return dtstart, dtend + + +def sanitize_rrule(vevent): + """fix problems with RRULE:UNTIL""" + if 'rrule' in vevent and 'UNTIL' in vevent['rrule']: + until = vevent['rrule']['UNTIL'][0] + dtstart = vevent['dtstart'].dt + # DTSTART is date, UNTIL is datetime + if not isinstance(dtstart, dt.datetime) and isinstance(until, dt.datetime): + vevent['rrule']['until'] = until.date() + return vevent + + +def invalid_timezone(prop): + """check if an icalendar property has a timezone attached we don't understand""" + if hasattr(prop.dt, 'tzinfo') and prop.dt.tzinfo is None and 'TZID' in prop.params: + return True + else: + return False + + +def _get_all_properties(vevent, prop): + """Get all properties from a vevent, even if there are several entries + + example input: + EXDATE:1234,4567 + EXDATE:7890 + + returns: [1234, 4567, 7890] + + :type vevent: icalendar.cal.Event + :type prop: str + """ + if prop not in vevent: + return list() + if isinstance(vevent[prop], list): + rdates = [leaf.dt for tree in vevent[prop] for leaf in tree.dts] + else: + rdates = [vddd.dt for vddd in vevent[prop].dts] + return rdates + + +def delete_instance(vevent, instance): + """remove a recurrence instance from a VEVENT's RRDATE list or add it + to the EXDATE list + + :type vevent: icalendar.cal.Event + :type instance: datetime.datetime + """ + # TODO check where this instance is coming from and only call the + # appropriate function + if 'RRULE' in vevent: + exdates = _get_all_properties(vevent, 'EXDATE') + exdates += [instance] + vevent.pop('EXDATE') + vevent.add('EXDATE', exdates) + if 'RDATE' in vevent: + rdates = [one for one in _get_all_properties(vevent, 'RDATE') if one != instance] + vevent.pop('RDATE') + if rdates != []: + vevent.add('RDATE', rdates) + + +def sort_key(vevent): + """helper function to determine order of VEVENTS + so that recurrence-id events come after the corresponding rrule event, etc + :param vevent: icalendar.Event + :rtype: tuple(str, int) + """ + assert isinstance(vevent, icalendar.Event) + uid = str(vevent['UID']) + rec_id = vevent.get('RECURRENCE-ID') + if rec_id is None: + return uid, 0 + rrange = rec_id.params.get('RANGE') + if rrange == 'THISANDFUTURE': + return uid, to_unix_time(rec_id.dt) + else: + return uid, 1 + + +def cal_from_ics(ics): + try: + cal = icalendar.Calendar.from_ical(ics) + except ValueError as error: + if (len(error.args) > 0 and isinstance(error.args[0], str) and + error.args[0].startswith('Offset must be less than 24 hours')): + logger.warning( + 'Invalid timezone offset encountered, ' + 'timezone information may be wrong: ' + str(error.args[0]) + ) + icalendar.vUTCOffset.ignore_exceptions = True + cal = icalendar.Calendar.from_ical(ics) + icalendar.vUTCOffset.ignore_exceptions = False + return cal diff --git a/khal/khalendar/backend.py b/khal/khalendar/backend.py index b54421916..34cb2b7d4 100644 --- a/khal/khalendar/backend.py +++ b/khal/khalendar/backend.py @@ -35,6 +35,9 @@ from dateutil import parser from .. import utils +from ..icalendar import (cal_from_ics, assert_only_one_uid, + expand as expand_vevent, sanitize as sanitize_vevent, + sort_key as sort_vevent_key) from .exceptions import (CouldNotCreateDbDir, OutdatedDbVersionError, UpdateFailed, NonUniqueUID) @@ -211,9 +214,9 @@ def update(self, vevent_str: str, href: str, etag: str='', calendar: str=None) - """ assert calendar is not None assert href is not None - ical = utils.cal_from_ics(vevent_str) + ical = cal_from_ics(vevent_str) check_for_errors(ical, calendar, href) - if not utils.assert_only_one_uid(ical): + if not assert_only_one_uid(ical): logger.warning( "The .ics file at {}/{} contains multiple UIDs.\n" "This should not occur in vdir .ics files.\n" @@ -223,7 +226,7 @@ def update(self, vevent_str: str, href: str, etag: str='', calendar: str=None) - "".format(calendar, href) ) raise NonUniqueUID - vevents = (utils.sanitize(c, self.locale['default_timezone'], href, calendar) for + vevents = (sanitize_vevent(c, self.locale['default_timezone'], href, calendar) for c in ical.walk() if c.name == 'VEVENT') # Need to delete the whole event in case we are updating a # recurring event with an event which is either not recurring any @@ -231,7 +234,7 @@ def update(self, vevent_str: str, href: str, etag: str='', calendar: str=None) - # tables. There are obviously better ways to achieve the same # result. self.delete(href, calendar=calendar) - for vevent in sorted(vevents, key=utils.sort_key): + for vevent in sorted(vevents, key=sort_vevent_key): check_for_errors(vevent, calendar, href) check_support(vevent, href, calendar) self._update_impl(vevent, href, calendar) @@ -261,7 +264,7 @@ def update_vcf_dates(self, vevent_str: str, href: str, etag: str='', assert href is not None # Delete all event entries for this contact self.deletelike(href + '%', calendar=calendar) - ical = utils.cal_from_ics(vevent_str) + ical = cal_from_ics(vevent_str) vcard = ical.walk()[0] for key in vcard.keys(): if key in ['BDAY', 'X-ANNIVERSARY', 'ANNIVERSARY'] or key.endswith('X-ABDATE'): @@ -350,7 +353,7 @@ def _update_impl(self, vevent: icalendar.cal.Event, href: str, calendar: str) -> start_shift_seconds = start_shift.days * 3600 * 24 + start_shift.seconds duration_seconds = duration.days * 3600 * 24 + duration.seconds - dtstartend = utils.expand(vevent, href) + dtstartend = expand_vevent(vevent, href) if not dtstartend: # Does this event even have dates? Technically it is possible for # events to be empty/non-existent by deleting all their recurrences diff --git a/khal/khalendar/event.py b/khal/khalendar/event.py index 9c0acf241..deface69a 100644 --- a/khal/khalendar/event.py +++ b/khal/khalendar/event.py @@ -31,9 +31,9 @@ from click import style from ..exceptions import FatalError +from ..icalendar import cal_from_ics, delete_instance, invalid_timezone from ..terminal import get_color -from ..utils import (cal_from_ics, delete_instance, generate_random_uid, - invalid_timezone, is_aware, to_naive_utc, to_unix_time) +from ..utils import generate_random_uid, is_aware, to_naive_utc, to_unix_time from ..parse_datetime import timedelta2str logger = logging.getLogger('khal') diff --git a/khal/ui/__init__.py b/khal/ui/__init__.py index f37a7631a..b2324daa3 100644 --- a/khal/ui/__init__.py +++ b/khal/ui/__init__.py @@ -27,7 +27,7 @@ import click import urwid -from .. import utils +from .. import icalendar as icalendar_helpers, utils from ..khalendar.event import Event from ..khalendar.exceptions import ReadOnlyCalendarError from . import colors @@ -874,13 +874,13 @@ def new(self, date, end=None): if end is None: start = dt.datetime.combine(date, dt.time(dt.datetime.now().hour)) end = start + dt.timedelta(minutes=60) - event = utils.new_event( + event = icalendar_helpers.new_event( dtstart=start, dtend=end, summary='', timezone=self._conf['locale']['default_timezone'], locale=self._conf['locale'], ) else: - event = utils.new_event( + event = icalendar_helpers.new_event( dtstart=date, dtend=end + dt.timedelta(days=1), summary='', allday=True, locale=self._conf['locale'], ) diff --git a/khal/utils.py b/khal/utils.py index 6e6013b34..1f402982a 100644 --- a/khal/utils.py +++ b/khal/utils.py @@ -23,23 +23,13 @@ import datetime as dt -import logging +import pytz import random import re import string from calendar import month_abbr, timegm -from collections import defaultdict from textwrap import wrap -import dateutil.rrule -import icalendar -import khal.parse_datetime as parse_datetime # TODO get this out of here -import pytz - -from .exceptions import UnsupportedRecurrence - -logger = logging.getLogger('khal') - def generate_random_uid(): """generate a random uid @@ -50,156 +40,6 @@ def generate_random_uid(): return ''.join([random.choice(choice) for _ in range(36)]) -def new_event(locale, dtstart=None, dtend=None, summary=None, timezone=None, - allday=False, description=None, location=None, categories=None, - repeat=None, until=None, alarms=None): - """create a new event - - :param dtstart: starttime of that event - :type dtstart: datetime - :param dtend: end time of that event, if this is a *date*, this value is - interpreted as being the last date the event is scheduled on, i.e. - the VEVENT DTEND will be *one day later* - :type dtend: datetime - :param summary: description of the event, used in the SUMMARY property - :type summary: unicode - :param timezone: timezone of the event (start and end) - :type timezone: pytz.timezone - :param allday: if set to True, we will not transform dtstart and dtend to - datetime - :type allday: bool - :returns: event - :rtype: icalendar.Event - """ - - if dtstart is None: - raise ValueError("no start given") - if dtend is None: - raise ValueError("no end given") - if summary is None: - raise ValueError("no summary given") - - if not allday and timezone is not None: - dtstart = timezone.localize(dtstart) - dtend = timezone.localize(dtend) - - event = icalendar.Event() - event.add('dtstart', dtstart) - event.add('dtend', dtend) - event.add('dtstamp', dt.datetime.now()) - event.add('summary', summary) - event.add('uid', generate_random_uid()) - # event.add('sequence', 0) - - if description: - event.add('description', description) - if location: - event.add('location', location) - if categories: - event.add('categories', categories) - if repeat and repeat != "none": - rrule = parse_datetime.rrulefstr(repeat, until, locale) - event.add('rrule', rrule) - if alarms: - for alarm in alarms.split(","): - alarm = alarm.strip() - alarm_trig = -1 * parse_datetime.guesstimedeltafstr(alarm) - new_alarm = icalendar.Alarm() - new_alarm.add('ACTION', 'DISPLAY') - new_alarm.add('TRIGGER', alarm_trig) - new_alarm.add('DESCRIPTION', description) - event.add_component(new_alarm) - return event - - -def split_ics(ics, random_uid=False, default_timezone=None): - """split an ics string into several according to VEVENT's UIDs - - and sort the right VTIMEZONEs accordingly - ignores all other ics components - :type ics: str - :param random_uid: assign random uids to all events - :type random_uid: bool - :rtype list: - """ - cal = cal_from_ics(ics) - tzs = {item['TZID']: item for item in cal.walk() if item.name == 'VTIMEZONE'} - - events_grouped = defaultdict(list) - for item in cal.walk(): - if item.name == 'VEVENT': - events_grouped[item['UID']].append(item) - else: - continue - return [ics_from_list(events, tzs, random_uid, default_timezone) for uid, events in - sorted(events_grouped.items())] - - -def ics_from_list(events, tzs, random_uid=False, default_timezone=None): - """convert an iterable of icalendar.Events to an icalendar.Calendar - - :params events: list of events all with the same uid - :type events: list(icalendar.cal.Event) - :param random_uid: assign random uids to all events - :type random_uid: bool - :param tzs: collection of timezones - :type tzs: dict(icalendar.cal.Vtimzone - """ - calendar = icalendar.Calendar() - calendar.add('version', '2.0') - calendar.add( - 'prodid', '-//PIMUTILS.ORG//NONSGML khal / icalendar //EN' - ) - - if random_uid: - new_uid = generate_random_uid() - - needed_tz, missing_tz = set(), set() - for sub_event in events: - sub_event = sanitize(sub_event, default_timezone=default_timezone) - if random_uid: - sub_event['UID'] = new_uid - # icalendar round-trip converts `TZID=a b` to `TZID="a b"` investigate, file bug XXX - for prop in ['DTSTART', 'DTEND', 'DUE', 'EXDATE', 'RDATE', 'RECURRENCE-ID', 'DUE']: - if isinstance(sub_event.get(prop), list): - items = sub_event.get(prop) - else: - items = [sub_event.get(prop)] - - for item in items: - if not (hasattr(item, 'dt') or hasattr(item, 'dts')): - continue - # if prop is a list, all items have the same parameters - datetime_ = item.dts[0].dt if hasattr(item, 'dts') else item.dt - - if not hasattr(datetime_, 'tzinfo'): - continue - - # check for datetimes' timezones which are not understood by - # icalendar - if datetime_.tzinfo is None and 'TZID' in item.params and \ - item.params['TZID'] not in missing_tz: - logger.warning( - 'Cannot find timezone `{}` in .ics file, using default timezone. ' - 'This can lead to erroneous time shifts'.format(item.params['TZID']) - ) - missing_tz.add(item.params['TZID']) - elif datetime_.tzinfo and datetime_.tzinfo != pytz.UTC and \ - datetime_.tzinfo not in needed_tz: - needed_tz.add(datetime_.tzinfo) - - for tzid in needed_tz: - if str(tzid) in tzs: - calendar.add_component(tzs[str(tzid)]) - else: - logger.warning( - 'Cannot find timezone `{}` in .ics file, this could be a bug, ' - 'please report this issue at http://github.com/pimutils/khal/.'.format(tzid)) - for sub_event in events: - calendar.add_component(sub_event) - return calendar.to_ical().decode('utf-8') - - RESET = '\x1b[0m' ansi_reset = re.compile(r'\x1b\[0m') @@ -272,238 +112,6 @@ def get_month_abbr_len(): return max(len(month_abbr[i]) for i in range(1, 13)) + 1 -def expand(vevent, href=''): - """ - Constructs a list of start and end dates for all recurring instances of the - event defined in vevent. - - It considers RRULE as well as RDATE and EXDATE properties. In case of - unsupported recursion rules an UnsupportedRecurrence exception is thrown. - - If the vevent contains a RECURRENCE-ID property, no expansion is done, - the function still returns a tuple of start and end (date)times. - - :param vevent: vevent to be expanded - :type vevent: icalendar.cal.Event - :param href: the href of the vevent, used for more informative logging and - nothing else - :type href: str - :returns: list of start and end (date)times of the expanded event - :rtype: list(tuple(datetime, datetime)) - """ - # we do this now and than never care about the "real" end time again - if 'DURATION' in vevent: - duration = vevent['DURATION'].dt - else: - duration = vevent['DTEND'].dt - vevent['DTSTART'].dt - - # if this vevent has a RECURRENCE_ID property, no expansion will be - # performed - expand = not bool(vevent.get('RECURRENCE-ID')) - - events_tz = getattr(vevent['DTSTART'].dt, 'tzinfo', None) - allday = not isinstance(vevent['DTSTART'].dt, dt.datetime) - - def sanitize_datetime(date): - if allday and isinstance(date, dt.datetime): - date = date.date() - if events_tz is not None: - date = events_tz.localize(date) - return date - - rrule_param = vevent.get('RRULE') - if expand and rrule_param is not None: - vevent = sanitize_rrule(vevent) - - # dst causes problem while expanding the rrule, therefore we transform - # everything to naive datetime objects and transform back after - # expanding - # See https://github.com/dateutil/dateutil/issues/102 - dtstart = vevent['DTSTART'].dt - if events_tz: - dtstart = dtstart.replace(tzinfo=None) - if events_tz and 'Z' not in rrule_param.to_ical().decode(): - logger.warning( - "In event {}, DTSTART has a timezone, but UNTIL does not. This " - "might lead to errenous repeating instances (like missing the " - "last intended instance or adding an extra one)." - "".format(href)) - elif not events_tz and 'Z' in rrule_param.to_ical().decode(): - logger.warning( - "In event {}, DTSTART has no timezone, but UNTIL has one. This " - "might lead to errenous repeating instances (like missing the " - "last intended instance or adding an extra one)." - "".format(href)) - - rrule = dateutil.rrule.rrulestr( - rrule_param.to_ical().decode(), - dtstart=dtstart, - ignoretz=True, - ) - - if rrule._until is None: - # rrule really doesn't like to calculate all recurrences until - # eternity, so we only do it until 2037, because a) I'm not sure - # if python can deal with larger datetime values yet and b) pytz - # doesn't know any larger transition times - rrule._until = dt.datetime(2037, 12, 31) - elif events_tz and 'Z' in rrule_param.to_ical().decode(): - rrule._until = pytz.UTC.localize( - rrule._until).astimezone(events_tz).replace(tzinfo=None) - - rrule = map(sanitize_datetime, rrule) - - logger.debug('calculating recurrence dates for {}, this might take some time.'.format(href)) - - # RRULE and RDATE may specify the same date twice, it is recommended by - # the RFC to consider this as only one instance - dtstartl = set(rrule) - if not dtstartl: - raise UnsupportedRecurrence() - else: - dtstartl = {vevent['DTSTART'].dt} - - def get_dates(vevent, key): - # TODO replace with get_all_properties - dates = vevent.get(key) - if dates is None: - return - if not isinstance(dates, list): - dates = [dates] - - dates = (leaf.dt for tree in dates for leaf in tree.dts) - dates = localize_strip_tz(dates, events_tz) - return map(sanitize_datetime, dates) - - # include explicitly specified recursion dates - if expand: - dtstartl.update(get_dates(vevent, 'RDATE') or ()) - - # remove excluded dates - if expand: - for date in get_dates(vevent, 'EXDATE') or (): - try: - dtstartl.remove(date) - except KeyError: - logger.warning( - 'In event {}, excluded instance starting at {} not found, ' - 'event might be invalid.'.format(href, date)) - - dtstartend = [(start, start + duration) for start in dtstartl] - # not necessary, but I prefer deterministic output - dtstartend.sort() - return dtstartend - - -def assert_only_one_uid(cal: icalendar.Calendar): - """assert the all VEVENTs in cal have the same UID""" - uids = set() - for item in cal.walk(): - if item.name == 'VEVENT': - uids.add(item['UID']) - if len(uids) > 1: - return False - else: - return True - - -def sanitize(vevent, default_timezone, href='', calendar=''): - """ - clean up vevents we do not understand - - :param vevent: the vevent that needs to be cleaned - :type vevent: icalendar.cal.Event - :param default_timezone: timezone to apply to start and/or end dates which - were supposed to be localized but which timezone was not understood - by icalendar - :type timezone: pytz.timezone - :param href: used for logging to inform user which .ics files are - problematic - :type href: str - :param calendar: used for logging to inform user which .ics files are - problematic - :type calendar: str - :returns: clean vevent - :rtype: icalendar.cal.Event - """ - # convert localized datetimes with timezone information we don't - # understand to the default timezone - # TODO do this for everything where a TZID can appear (RDATE, EXDATE) - for prop in ['DTSTART', 'DTEND', 'DUE', 'RECURRENCE-ID']: - if prop in vevent and invalid_timezone(vevent[prop]): - timezone = vevent[prop].params.get('TZID') - value = default_timezone.localize(vevent.pop(prop).dt) - vevent.add(prop, value) - logger.warning( - "{} localized in invalid or incomprehensible timezone `{}` in {}/{}. " - "This could lead to this event being wrongly displayed." - "".format(prop, timezone, calendar, href) - ) - - vdtstart = vevent.pop('DTSTART', None) - vdtend = vevent.pop('DTEND', None) - dtstart = getattr(vdtstart, 'dt', None) - dtend = getattr(vdtend, 'dt', None) - - # event with missing DTSTART - if dtstart is None: - raise ValueError('Event has no start time (DTSTART).') - dtstart, dtend = sanitize_timerange( - dtstart, dtend, duration=vevent.get('DURATION', None)) - - vevent.add('DTSTART', dtstart) - if dtend is not None: - vevent.add('DTEND', dtend) - return vevent - - -def sanitize_timerange(dtstart, dtend, duration=None): - '''return sensible dtstart and end for events that have an invalid or - missing DTEND, assuming the event just lasts one hour.''' - - if isinstance(dtstart, dt.datetime) and isinstance(dtend, dt.datetime): - if dtstart.tzinfo and not dtend.tzinfo: - logger.warning( - "Event end time has no timezone. " - "Assuming it's the same timezone as the start time" - ) - dtend = dtstart.tzinfo.localize(dtend) - if not dtstart.tzinfo and dtend.tzinfo: - logger.warning( - "Event start time has no timezone. " - "Assuming it's the same timezone as the end time" - ) - dtstart = dtend.tzinfo.localize(dtstart) - - if dtend is None and duration is None: - if isinstance(dtstart, dt.datetime): - dtstart = dtstart.date() - dtend = dtstart + dt.timedelta(days=1) - elif dtend is not None: - if dtend < dtstart: - raise ValueError('The event\'s end time (DTEND) is older than ' - 'the event\'s start time (DTSTART).') - elif dtend == dtstart: - logger.warning( - "Event start time and end time are the same. " - "Assuming the event's duration is one hour." - ) - dtend += dt.timedelta(hours=1) - - return dtstart, dtend - - -def sanitize_rrule(vevent): - """fix problems with RRULE:UNTIL""" - if 'rrule' in vevent and 'UNTIL' in vevent['rrule']: - until = vevent['rrule']['UNTIL'][0] - dtstart = vevent['dtstart'].dt - # DTSTART is date, UNTIL is datetime - if not isinstance(dtstart, dt.datetime) and isinstance(until, dt.datetime): - vevent['rrule']['until'] = until.date() - return vevent - - def localize_strip_tz(dates, timezone): """converts a list of dates to timezone, than removes tz info""" for one_date in dates: @@ -533,56 +141,6 @@ def to_naive_utc(dtime): return dtime_naive -def invalid_timezone(prop): - """check if an icalendar property has a timezone attached we don't understand""" - if hasattr(prop.dt, 'tzinfo') and prop.dt.tzinfo is None and 'TZID' in prop.params: - return True - else: - return False - - -def _get_all_properties(vevent, prop): - """Get all properties from a vevent, even if there are several entries - - example input: - EXDATE:1234,4567 - EXDATE:7890 - - returns: [1234, 4567, 7890] - - :type vevent: icalendar.cal.Event - :type prop: str - """ - if prop not in vevent: - return list() - if isinstance(vevent[prop], list): - rdates = [leaf.dt for tree in vevent[prop] for leaf in tree.dts] - else: - rdates = [vddd.dt for vddd in vevent[prop].dts] - return rdates - - -def delete_instance(vevent, instance): - """remove a recurrence instance from a VEVENT's RRDATE list or add it - to the EXDATE list - - :type vevent: icalendar.cal.Event - :type instance: datetime.datetime - """ - # TODO check where this instance is coming from and only call the - # appropriate function - if 'RRULE' in vevent: - exdates = _get_all_properties(vevent, 'EXDATE') - exdates += [instance] - vevent.pop('EXDATE') - vevent.add('EXDATE', exdates) - if 'RDATE' in vevent: - rdates = [one for one in _get_all_properties(vevent, 'RDATE') if one != instance] - vevent.pop('RDATE') - if rdates != []: - vevent.add('RDATE', rdates) - - def is_aware(dtime): """test if a datetime instance is timezone aware""" if dtime.tzinfo is not None and dtime.tzinfo.utcoffset(dtime) is not None: @@ -627,39 +185,5 @@ def relative_timedelta_str(day): ) -def sort_key(vevent): - """helper function to determine order of VEVENTS - so that recurrence-id events come after the corresponding rrule event, etc - :param vevent: icalendar.Event - :rtype: tuple(str, int) - """ - assert isinstance(vevent, icalendar.Event) - uid = str(vevent['UID']) - rec_id = vevent.get('RECURRENCE-ID') - if rec_id is None: - return uid, 0 - rrange = rec_id.params.get('RANGE') - if rrange == 'THISANDFUTURE': - return uid, to_unix_time(rec_id.dt) - else: - return uid, 1 - - def get_wrapped_text(widget): return widget.original_widget.get_edit_text() - - -def cal_from_ics(ics): - try: - cal = icalendar.Calendar.from_ical(ics) - except ValueError as error: - if (len(error.args) > 0 and isinstance(error.args[0], str) and - error.args[0].startswith('Offset must be less than 24 hours')): - logger.warning( - 'Invalid timezone offset encountered, ' - 'timezone information may be wrong: ' + str(error.args[0]) - ) - icalendar.vUTCOffset.ignore_exceptions = True - cal = icalendar.Calendar.from_ical(ics) - icalendar.vUTCOffset.ignore_exceptions = False - return cal diff --git a/tests/icalendar_test.py b/tests/icalendar_test.py new file mode 100644 index 000000000..5cf0f0edb --- /dev/null +++ b/tests/icalendar_test.py @@ -0,0 +1,73 @@ +import icalendar +import random +import textwrap + +from khal.icalendar import split_ics + +from .utils import (LOCALE_BERLIN, _get_text, normalize_component) + + +def _get_TZIDs(lines): + """from a list of strings, get all unique strings that start with TZID""" + return sorted((line for line in lines if line.startswith('TZID'))) + + +def test_normalize_component(): + assert normalize_component(textwrap.dedent(""" + BEGIN:VEVENT + DTSTART;TZID=Europe/Berlin;VALUE=DATE-TIME:20140409T093000 + END:VEVENT + """)) != normalize_component(textwrap.dedent(""" + BEGIN:VEVENT + DTSTART;TZID=Oyrope/Berlin;VALUE=DATE-TIME:20140409T093000 + END:VEVENT + """)) + + +def test_split_ics(): + cal = _get_text('cal_lots_of_timezones') + vevents = split_ics(cal) + + vevents0 = vevents[0].split('\r\n') + vevents1 = vevents[1].split('\r\n') + + part0 = _get_text('part0').split('\n') + part1 = _get_text('part1').split('\n') + + assert _get_TZIDs(vevents0) == _get_TZIDs(part0) + assert _get_TZIDs(vevents1) == _get_TZIDs(part1) + + assert sorted(vevents0) == sorted(part0) + assert sorted(vevents1) == sorted(part1) + + +def test_split_ics_random_uid(): + random.seed(123) + cal = _get_text('cal_lots_of_timezones') + vevents = split_ics(cal, random_uid=True) + + part0 = _get_text('part0').split('\n') + part1 = _get_text('part1').split('\n') + + for item in icalendar.Calendar.from_ical(vevents[0]).walk(): + if item.name == 'VEVENT': + assert item['UID'] == 'DRF0RGCY89VVDKIV9VPKA1FYEAU2GCFJIBS1' + for item in icalendar.Calendar.from_ical(vevents[1]).walk(): + if item.name == 'VEVENT': + assert item['UID'] == '4Q4CTV74N7UAZ618570X6CLF5QKVV9ZE3YVB' + + # after replacing the UIDs, everything should be as above + vevents0 = vevents[0].replace('DRF0RGCY89VVDKIV9VPKA1FYEAU2GCFJIBS1', '123').split('\r\n') + vevents1 = vevents[1].replace('4Q4CTV74N7UAZ618570X6CLF5QKVV9ZE3YVB', 'abcde').split('\r\n') + + assert _get_TZIDs(vevents0) == _get_TZIDs(part0) + assert _get_TZIDs(vevents1) == _get_TZIDs(part1) + + assert sorted(vevents0) == sorted(part0) + assert sorted(vevents1) == sorted(part1) + + +def test_split_ics_missing_timezone(): + """testing if we detect the missing timezone in splitting""" + cal = _get_text('event_dt_local_missing_tz') + split_ics(cal, random_uid=True, default_timezone=LOCALE_BERLIN['default_timezone']) diff --git a/tests/khalendar_test.py b/tests/khalendar_test.py index b671f5609..08effff55 100644 --- a/tests/khalendar_test.py +++ b/tests/khalendar_test.py @@ -8,6 +8,7 @@ import khal.utils import pytest from freezegun import freeze_time +from khal import icalendar as icalendar_helpers from khal.khalendar import CalendarCollection from khal.khalendar.backend import CouldNotCreateDbDir from khal.khalendar.event import Event @@ -247,7 +248,7 @@ def test_newevent(self, coll_vdirs): coll, vdirs = coll_vdirs bday = dt.datetime.combine(aday, dt.time.min) anend = bday + dt.timedelta(hours=1) - event = khal.utils.new_event( + event = icalendar_helpers.new_event( dtstart=bday, dtend=anend, summary="hi", timezone=utils.BERLIN, locale=LOCALE_BERLIN, ) diff --git a/tests/khalendar_utils_test.py b/tests/khalendar_utils_test.py index 12de87c4d..6d4663890 100644 --- a/tests/khalendar_utils_test.py +++ b/tests/khalendar_utils_test.py @@ -1,7 +1,7 @@ import datetime as dt import icalendar -import khal.utils as utils +from khal import icalendar as icalendar_helpers, utils import pytz from .utils import _get_text, _get_vevent_file @@ -311,7 +311,7 @@ class TestExpand(object): def test_expand_dt(self): vevent = _get_vevent(event_dt) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_berlin assert [start.utcoffset() for start, _ in dtstart] == self.offset_berlin @@ -319,7 +319,7 @@ def test_expand_dt(self): def test_expand_dtb(self): vevent = _get_vevent(event_dtb) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_berlin assert [start.utcoffset() for start, _ in dtstart] == self.offset_berlin @@ -327,38 +327,38 @@ def test_expand_dtb(self): def test_expand_dttz(self): vevent = _get_vevent(event_dttz) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_utc assert [start.utcoffset() for start, _ in dtstart] == self.offset_utc assert [end.utcoffset() for _, end in dtstart] == self.offset_utc def test_expand_dtf(self): vevent = _get_vevent(event_dtf) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_float assert [start.utcoffset() for start, _ in dtstart] == self.offset_none assert [end.utcoffset() for _, end in dtstart] == self.offset_none def test_expand_d(self): vevent = _get_vevent(event_d) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dstartend def test_expand_dtz(self): vevent = _get_vevent(event_dtz) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dstartend def test_expand_dtzb(self): vevent = _get_vevent(event_dtzb) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dstartend def test_expand_invalid_exdate(self): """testing if we can expand an event with EXDATEs that do not much its RRULE""" vevent = _get_vevent_file('event_invalid_exdate') - dtstartl = utils.expand(vevent, berlin) + dtstartl = icalendar_helpers.expand(vevent, berlin) # TODO test for logging message assert dtstartl == [ (new_york.localize(dt.datetime(2011, 11, 12, 15, 50)), @@ -397,7 +397,7 @@ class TestExpandNoRR(object): def test_expand_dt(self): vevent = _get_vevent(event_dt_norr) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_berlin assert [start.utcoffset() for start, _ in dtstart] == self.offset_berlin @@ -405,7 +405,7 @@ def test_expand_dt(self): def test_expand_dtb(self): vevent = _get_vevent(event_dtb_norr) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_berlin assert [start.utcoffset() for start, _ in dtstart] == self.offset_berlin @@ -413,21 +413,21 @@ def test_expand_dtb(self): def test_expand_dttz(self): vevent = _get_vevent(event_dttz_norr) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_utc assert [start.utcoffset() for start, _ in dtstart] == self.offset_utc assert [end.utcoffset() for _, end in dtstart] == self.offset_utc def test_expand_dtf(self): vevent = _get_vevent(event_dtf_norr) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == self.dtstartend_float assert [start.utcoffset() for start, _ in dtstart] == self.offset_none assert [end.utcoffset() for _, end in dtstart] == self.offset_none def test_expand_d(self): vevent = _get_vevent(event_d_norr) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == [ (dt.date(2013, 3, 1,), dt.date(2013, 3, 2,)), @@ -437,7 +437,7 @@ def test_expand_dtr_exdatez(self): """a recurring event with an EXDATE in Zulu time while DTSTART is localized""" vevent = _get_vevent_file('event_dtr_exdatez') - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 3 def test_expand_rrule_exdate_z(self): @@ -445,8 +445,8 @@ def test_expand_rrule_exdate_z(self): exdate """ vevent = _get_vevent_file('event_dtr_no_tz_exdatez') - vevent = utils.sanitize(vevent, berlin, '', '') - dtstart = utils.expand(vevent, berlin) + vevent = icalendar_helpers.sanitize(vevent, berlin, '', '') + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 5 dtstarts = [start for start, end in dtstart] assert dtstarts == [ @@ -462,8 +462,8 @@ def test_expand_rrule_notz_until_z(self): exdate """ vevent = _get_vevent_file('event_dtr_notz_untilz') - vevent = utils.sanitize(vevent, new_york, '', '') - dtstart = utils.expand(vevent, new_york) + vevent = icalendar_helpers.sanitize(vevent, new_york, '', '') + dtstart = icalendar_helpers.expand(vevent, new_york) assert len(dtstart) == 7 dtstarts = [start for start, end in dtstart] assert dtstarts == [ @@ -580,7 +580,7 @@ class TestSpecial(object): def test_count(self): vevent = _get_vevent(vevent_count) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) starts = [start for start, _ in dtstart] assert len(starts) == 18 assert dtstart[0][0] == dt.datetime(2014, 2, 3, 7, 0) @@ -588,7 +588,7 @@ def test_count(self): def test_until_notz(self): vevent = _get_vevent(vevent_until_notz) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) starts = [start for start, _ in dtstart] assert len(starts) == 18 assert dtstart[0][0] == berlin.localize( @@ -598,7 +598,7 @@ def test_until_notz(self): def test_until_d_notz(self): vevent = _get_vevent(event_until_d_notz) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) starts = [start for start, _ in dtstart] assert len(starts) == 6 assert dtstart[0][0] == dt.date(2014, 1, 10) @@ -606,13 +606,13 @@ def test_until_d_notz(self): def test_latest_bug(self): vevent = _get_vevent(latest_bug) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart[0][0] == dt.date(2009, 10, 31) assert dtstart[-1][0] == dt.date(2037, 10, 31) def test_recurrence_id_with_timezone(self): vevent = _get_vevent(recurrence_id_with_timezone) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 1 assert dtstart[0][0] == berlin.localize( dt.datetime(2013, 11, 13, 19, 0)) @@ -620,7 +620,7 @@ def test_recurrence_id_with_timezone(self): def test_event_exdate_dt(self): """recurring event, one date excluded via EXCLUDE""" vevent = _get_vevent(event_exdate_dt) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 9 assert dtstart[0][0] == berlin.localize( dt.datetime(2014, 7, 2, 19, 0)) @@ -630,7 +630,7 @@ def test_event_exdate_dt(self): def test_event_exdates_dt(self): """recurring event, two dates excluded via EXCLUDE""" vevent = _get_vevent(event_exdates_dt) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 8 assert dtstart[0][0] == berlin.localize( dt.datetime(2014, 7, 2, 19, 0)) @@ -640,7 +640,7 @@ def test_event_exdates_dt(self): def test_event_exdatesl_dt(self): """recurring event, three dates exclude via two EXCLUDEs""" vevent = _get_vevent(event_exdatesl_dt) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 7 assert dtstart[0][0] == berlin.localize( dt.datetime(2014, 7, 2, 19, 0)) @@ -650,24 +650,24 @@ def test_event_exdatesl_dt(self): def test_event_exdates_remove(self): """check if we can remove one more instance""" vevent = _get_vevent(event_exdatesl_dt) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 7 exdate1 = pytz.UTC.localize(dt.datetime(2014, 7, 11, 17, 0)) - utils.delete_instance(vevent, exdate1) - dtstart = utils.expand(vevent, berlin) + icalendar_helpers.delete_instance(vevent, exdate1) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 6 exdate2 = berlin.localize(dt.datetime(2014, 7, 9, 19, 0)) - utils.delete_instance(vevent, exdate2) - dtstart = utils.expand(vevent, berlin) + icalendar_helpers.delete_instance(vevent, exdate2) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 5 def test_event_dt_rrule_invalid_until(self): """DTSTART and RRULE:UNTIL should be of the same type, but might not be""" vevent = _get_vevent(_get_text('event_dt_rrule_invalid_until')) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert dtstart == [(dt.date(2007, 12, 1), dt.date(2007, 12, 2)), (dt.date(2008, 1, 1), dt.date(2008, 1, 2)), (dt.date(2008, 2, 1), dt.date(2008, 2, 2))] @@ -676,7 +676,7 @@ def test_event_dt_rrule_invalid_until2(self): """same as above, but now dtstart is of type date and until is datetime """ vevent = _get_vevent(_get_text('event_dt_rrule_invalid_until2')) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 35 assert dtstart[0] == (berlin.localize(dt.datetime(2014, 4, 9, 9, 30)), berlin.localize(dt.datetime(2014, 4, 9, 10, 30))) @@ -708,25 +708,25 @@ class TestRDate(object): """Testing expanding of recurrence rules""" def test_simple_rdate(self): vevent = _get_vevent(simple_rdate) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 4 def test_rrule_and_rdate(self): vevent = _get_vevent(rrule_and_rdate) - dtstart = utils.expand(vevent, berlin) + dtstart = icalendar_helpers.expand(vevent, berlin) assert len(dtstart) == 7 def test_rrule_past(self): vevent = _get_vevent_file('event_r_past') assert vevent is not None - dtstarts = utils.expand(vevent, berlin) + dtstarts = icalendar_helpers.expand(vevent, berlin) assert len(dtstarts) == 73 assert dtstarts[0][0] == dt.date(1965, 4, 23) assert dtstarts[-1][0] == dt.date(2037, 4, 23) def test_rdate_date(self): vevent = _get_vevent_file('event_d_rdate') - dtstarts = utils.expand(vevent, berlin) + dtstarts = icalendar_helpers.expand(vevent, berlin) assert len(dtstarts) == 4 assert dtstarts == [(dt.date(2015, 8, 12), dt.date(2015, 8, 13)), (dt.date(2015, 8, 13), dt.date(2015, 8, 14)), @@ -770,24 +770,24 @@ class TestSanitize(object): def test_noend_date(self): vevent = _get_vevent(noend_date) - vevent = utils.sanitize(vevent, berlin, '', '') + vevent = icalendar_helpers.sanitize(vevent, berlin, '', '') assert vevent['DTSTART'].dt == dt.date(2014, 8, 29) assert vevent['DTEND'].dt == dt.date(2014, 8, 30) def test_noend_datetime(self): vevent = _get_vevent(noend_datetime) - vevent = utils.sanitize(vevent, berlin, '', '') + vevent = icalendar_helpers.sanitize(vevent, berlin, '', '') assert vevent['DTSTART'].dt == dt.date(2014, 8, 29) assert vevent['DTEND'].dt == dt.date(2014, 8, 30) def test_duration(self): vevent = _get_vevent_file('event_dtr_exdatez') - vevent = utils.sanitize(vevent, berlin, '', '') + vevent = icalendar_helpers.sanitize(vevent, berlin, '', '') def test_instant(self): vevent = _get_vevent(instant) assert vevent['DTEND'].dt - vevent['DTSTART'].dt == dt.timedelta() - vevent = utils.sanitize(vevent, berlin, '', '') + vevent = icalendar_helpers.sanitize(vevent, berlin, '', '') assert vevent['DTEND'].dt - vevent['DTSTART'].dt == dt.timedelta(hours=1) diff --git a/tests/parse_datetime_test.py b/tests/parse_datetime_test.py index 741399bd8..294117bb2 100644 --- a/tests/parse_datetime_test.py +++ b/tests/parse_datetime_test.py @@ -8,7 +8,7 @@ guessdatetimefstr, guessrangefstr, guesstimedeltafstr, timedelta2str, weekdaypstr) -from khal.utils import new_event +from khal.icalendar import new_event from .utils import (LOCALE_BERLIN, LOCALE_NEW_YORK, _replace_uid, normalize_component) diff --git a/tests/utils_test.py b/tests/utils_test.py index 45aef8956..98074ac24 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -1,82 +1,9 @@ """testing functions from the khal.utils""" import datetime as dt -import random -import textwrap - from freezegun import freeze_time -import icalendar - from khal import utils -from .utils import (LOCALE_BERLIN, _get_text, normalize_component) - - -def _get_TZIDs(lines): - """from a list of strings, get all unique strings that start with TZID""" - return sorted((line for line in lines if line.startswith('TZID'))) - - -def test_normalize_component(): - assert normalize_component(textwrap.dedent(""" - BEGIN:VEVENT - DTSTART;TZID=Europe/Berlin;VALUE=DATE-TIME:20140409T093000 - END:VEVENT - """)) != normalize_component(textwrap.dedent(""" - BEGIN:VEVENT - DTSTART;TZID=Oyrope/Berlin;VALUE=DATE-TIME:20140409T093000 - END:VEVENT - """)) - - -def test_split_ics(): - cal = _get_text('cal_lots_of_timezones') - vevents = utils.split_ics(cal) - - vevents0 = vevents[0].split('\r\n') - vevents1 = vevents[1].split('\r\n') - - part0 = _get_text('part0').split('\n') - part1 = _get_text('part1').split('\n') - - assert _get_TZIDs(vevents0) == _get_TZIDs(part0) - assert _get_TZIDs(vevents1) == _get_TZIDs(part1) - - assert sorted(vevents0) == sorted(part0) - assert sorted(vevents1) == sorted(part1) - - -def test_split_ics_random_uid(): - random.seed(123) - cal = _get_text('cal_lots_of_timezones') - vevents = utils.split_ics(cal, random_uid=True) - - part0 = _get_text('part0').split('\n') - part1 = _get_text('part1').split('\n') - - for item in icalendar.Calendar.from_ical(vevents[0]).walk(): - if item.name == 'VEVENT': - assert item['UID'] == 'DRF0RGCY89VVDKIV9VPKA1FYEAU2GCFJIBS1' - for item in icalendar.Calendar.from_ical(vevents[1]).walk(): - if item.name == 'VEVENT': - assert item['UID'] == '4Q4CTV74N7UAZ618570X6CLF5QKVV9ZE3YVB' - - # after replacing the UIDs, everything should be as above - vevents0 = vevents[0].replace('DRF0RGCY89VVDKIV9VPKA1FYEAU2GCFJIBS1', '123').split('\r\n') - vevents1 = vevents[1].replace('4Q4CTV74N7UAZ618570X6CLF5QKVV9ZE3YVB', 'abcde').split('\r\n') - - assert _get_TZIDs(vevents0) == _get_TZIDs(part0) - assert _get_TZIDs(vevents1) == _get_TZIDs(part1) - - assert sorted(vevents0) == sorted(part0) - assert sorted(vevents1) == sorted(part1) - - -def test_split_ics_missing_timezone(): - """testing if we detect the missing timezone in splitting""" - cal = _get_text('event_dt_local_missing_tz') - utils.split_ics(cal, random_uid=True, default_timezone=LOCALE_BERLIN['default_timezone']) - def test_relative_timedelta_str(): with freeze_time('2016-9-19'):