From e4c42de74c599b3b26217dd22b359b3d394273d3 Mon Sep 17 00:00:00 2001 From: mrsu Date: Sun, 11 Feb 2024 19:43:33 +0000 Subject: [PATCH] created CalendarParser class --- remindme_caldav.py | 372 +++++++++++++++++++++------------------------ 1 file changed, 176 insertions(+), 196 deletions(-) diff --git a/remindme_caldav.py b/remindme_caldav.py index 71823a7..49c9d8a 100644 --- a/remindme_caldav.py +++ b/remindme_caldav.py @@ -13,6 +13,18 @@ import humanfriendly from pathlib import Path import argparse import textwrap +import logging + +def setup_logger(loglevel): + """Setup basic logging.""" + loglevel = getattr(logging, loglevel.upper(), None) + + if not isinstance(loglevel, int): + raise ValueError('Invalid log level: %s' % loglevel) + + logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s') + logger = logging.getLogger() + logger.setLevel(loglevel) def parse_args(): """Parse command line arguments.""" @@ -39,6 +51,9 @@ def parse_toml(content): print(e) sys.exit(1) # Exit with error code +def calculate_event_hash(event): + return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() + class DateTimeEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, (dt.datetime, dt.timedelta)): @@ -50,29 +65,11 @@ class FileChangeHandler(FileSystemEventHandler): `FileChangeHandler` is a custom event handler for the `watchdog.observers.Observer` class that handles file system events such as file modifications, deletions and creations. - - It inherits from `watchdog.events.FileSystemEventHandler` providing methods - to handle these events: `on_modified`, `on_deleted`, and `on_created`. Each - method is overridden for specific functionality when a file system event - occurs. - - The class also includes a method `calculate_event_hash` that generates an - MD5 hash for each event dictionary based on its contents. This is used to - track changes in the events and determine if they have been modified or - deleted. - - For example, when a file is modified: - - It reads the content of the file, parses it into an event dictionary - using `calendar_parser`. - - Calculates the hash for the event and checks if there's already an - existing event with the same UID in `event_list`. - - If there is one, it compares the new hash with the old hash. If they - differ, it prints that the event has been modified or deleted. Otherwise, - it prints that the event hasn't been modified. - - For file deletion and creation, similar operations are performed but - without comparison of hashes. """ + def __init__(self, event_list): + self.calendar_parser = CalendarParser() # Create an instance of CalendarParser + self.event_list = event_list + def on_modified(self, event): print(f"File modified: {event.src_path}") if not event.is_directory: @@ -84,7 +81,7 @@ class FileChangeHandler(FileSystemEventHandler): return try: - event_dict = calendar_parser(cal_str) + event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as i: print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) return @@ -110,7 +107,7 @@ class FileChangeHandler(FileSystemEventHandler): return try: - event_dict = calendar_parser(cal_str) + event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as i: print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) return @@ -119,175 +116,111 @@ class FileChangeHandler(FileSystemEventHandler): def handle_modified(self, old_event, event_dict, remove=False): if not remove: - for i, old_event in enumerate(event_list): + for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: old_hash = old_event["hash"] - new_hash = self.calculate_event_hash(event_dict) + new_hash = calculate_event_hash(event_dict) if new_hash != old_hash: print("Event with UID {} has been modified or deleted".format(old_event["uid"])) - event_list[i] = event_dict + self.event_list[i] = event_dict else: print("Event with UID {} hasn't been modified".format(old_event["uid"])) break else: - event_list.append(event_dict) + self.event_list.append(event_dict) else: # If remove is True, remove the event from the list - for i, old_event in enumerate(event_list): + for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: print("Event with UID {} has been deleted".format(old_event["uid"])) - del event_list[i] + del self.event_list[i] break - def calculate_event_hash(self, event): - return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() -def generate_recurring_event_dates(dtstart, rrule): - """ - Generate recurring event dates based on a start date and an RRULE. - - This function takes in a start date (`dtstart`) and an RRULE (`rrule`), - which is used to generate future dates according to the rules specified by - the RRULE. - - If no count rule is present, it generates a date array from `dtstart` to - the current datetime, then adds an arbitrary number of dates into the - future (here it's 10). This is done because generating a date array using - the rrulestr function without a count rule will return a very large number - of elements. - - The function returns a dictionary containing information about the - recurring event: - - `recur_dates`: A list of future dates generated by the RRULE. - - `infinite_recur`: Boolean indicating whether the recurrence is infinite. - - `recur_freq`: The frequency of the recurrence (e.g., 'DAILY'). - - `recur_interval`: The interval between each occurrence of the event. - - `n_recur_dates_left`: The number of future dates left. - - Parameters: - dtstart (datetime): The start date of the recurring event. - rrule (rrule): The RRULE object specifying the recurrence rules. - - Returns: - dict: A dictionary containing information about the recurring event - dates, frequency, interval and count. - """ - recur_info = { +class RecurringEventGenerator: + def __init__(self, dtstart, rrule): + self.dtstart = dtstart + self.rrule = rrule + self.recur_info = { "recur_dates": [dtstart], "infinite_recur": False, "recur_freq": None, "recur_interval": None, "n_recur_dates_left": None - } - if rrule is None: - return recur_info - rule_str = "RRULE:{}".format(rrule.to_ical().decode('utf-8')) - start_date = dtstart - infinite_recur = False - freq = rrule.get('FREQ')[0] - count = rrule.get("COUNT") - interval = rrule.get('INTERVAL')[0] - current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) - - if count is None:# or until is not None: - - delta = None - - if freq == "DAILY": - delta = relativedelta(days=interval) - elif freq == "MONTHLY": - delta = relativedelta(months=interval) - elif freq == "YEARLY": - delta = relativedelta(years=interval) - - count = 0 - origin_date = start_date - while origin_date < current_date: - count += interval - origin_date += delta*interval - - rule_str += ";COUNT={}".format(count+10) - infinite_recur = True - - ruleset = rrulestr(rule_str, dtstart=start_date) - - recur_dates = [None] - n_recur = None - dates = list(ruleset) # Generate future dates according to the rules - recur_dates = [i for i in dates if i >= current_date] - n_recur = "inf" if infinite_recur is True else len(recur_dates) - - recur_info["recur_dates"] = recur_dates - recur_info["infinite_recur"] = infinite_recur - recur_info["recur_freq"] = freq - recur_info["recur_interval"] = interval - recur_info["n_recur_dates_left"] = n_recur - return recur_info - -def calendar_parser(cal_str): - # Parse the calendar - cal = Calendar.from_ical(cal_str) - - # Iterate over each event in the calendar - for event in cal.walk('vevent'): - - event_info = { - "uid": None, - "dtstart": "", - "exdate": [], - "summary": None, - "description": None, - "location": None, - "rrule": None } - # Catch errors for missing components - for info in event_info: - try: - event_info[info] = event[info] - except Exception: - pass - - uid = str(event_info["uid"]) - dtstart = event_info["dtstart"].dt - exdates = event_info["exdate"] - if exdates is not []: - if isinstance(exdates, list): - exdates = [i.dts.dt.replace(tzinfo=pytz.UTC) for i in exdates] - else: - exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] - summary = event["summary"] - description = event_info["description"] - location = event_info["location"] - rrule = event_info["rrule"] - - # Ensure dates are always as datetime - if isinstance(dtstart, dt.datetime): - dtstart = dtstart.replace(tzinfo=pytz.UTC) - else: - dtstart = dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) - - # Get recurring events if they exist - recur_info = generate_recurring_event_dates(dtstart, rrule) - event_dates = recur_info["recur_dates"] + + def generate(self): + if self.rrule is None: + return self.recur_info + + rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8')) + start_date = self.dtstart + infinite_recur = False + freq = self.rrule.get('FREQ')[0] + count = self.rrule.get("COUNT") + interval = self.rrule.get('INTERVAL')[0] + current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) + + if count is None: + delta = None + + if freq == "DAILY": + delta = relativedelta(days=interval) + elif freq == "MONTHLY": + delta = relativedelta(months=interval) + elif freq == "YEARLY": + delta = relativedelta(years=interval) + + count = 0 + origin_date = start_date + while origin_date < current_date: + count += interval + origin_date += delta*interval + + rule_str += ";COUNT={}".format(count+10) + infinite_recur = True + + ruleset = rrulestr(rule_str, dtstart=start_date) + + recur_dates = [None] + n_recur = None + dates = list(ruleset) # Generate future dates according to the rules + recur_dates = [i for i in dates if i >= current_date] + n_recur = "inf" if infinite_recur is True else len(recur_dates) + + self.recur_info["recur_dates"] = recur_dates + self.recur_info["infinite_recur"] = infinite_recur + self.recur_info["recur_freq"] = freq + self.recur_info["recur_interval"] = interval + self.recur_info["n_recur_dates_left"] = n_recur + + return self.recur_info - # Remove exdates - event_dates = [i for i in event_dates if i not in exdates] - - valarms = [] - for subcomponent in event.walk("valarm"): - valarm = Event.from_ical(subcomponent.to_ical()) - timedelta = valarm["trigger"].dt - valarms.append(timedelta) +class CalendarParser: + def parse_calendar(self, cal_str): + # Parse the calendar + cal = self.parse_icalendar(cal_str) + + # Iterate over each event in the calendar + for event in cal.walk('vevent'): + event_dict = self.process_event(event) + dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt) + + generator = RecurringEventGenerator(dtstart, event_dict["rrule"]) + recur_info = generator.generate() + event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"]) - event_dict = { - "uid": uid, + valarms = self.process_valarm(event) + + event_dict = { + "uid": str(event_dict["uid"]), "dtstart": dtstart, - "summary": summary, - "description": description, - "location": location, + "summary": event_dict["summary"], + "description": event_dict["description"], + "location": event_dict["location"], "event_dates": event_dates, "recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format( recur_info["recur_freq"], @@ -296,17 +229,66 @@ def calendar_parser(cal_str): ), "valarms": valarms, "alert_history": [] - } + } + new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary + event_dict["hash"] = new_hash # Store the hash in the event dictionary + return event_dict + + def parse_icalendar(self, cal_str): + return Calendar.from_ical(cal_str) + + def process_event(self, event): + # Catch errors for missing components + event_info = { + "uid": None, + "dtstart": "", + "exdate": [], + "summary": None, + "description": None, + "location": None, + "rrule": None + } + + for info in event_info: + try: + event_info[info] = event[info] + except Exception: + pass + + return event_info + + def dtstart_to_datetime(self, dtstart): + # Ensure dates are always as datetime + if isinstance(dtstart, dt.datetime): + return dtstart.replace(tzinfo=pytz.UTC) + else: + return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) - handler = FileChangeHandler() # Create an instance of the FileChangeHandler class - new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary - event_dict["hash"] = new_hash # Store the hash in the event dictionary - return event_dict + def remove_exdates(self, exdates, recur_dates): + if exdates != []: + if isinstance(exdates, list): + exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates] + else: + exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] + return [i for i in recur_dates if i not in exdates] + else: + return recur_dates + + def process_valarm(self, event): + valarms = [] + for subcomponent in event.walk("valarm"): + valarm = Event.from_ical(subcomponent.to_ical()) + timedelta = valarm["trigger"].dt + valarms.append(timedelta) + return valarms def get_next_alert(event, current_time): + """ + This function returns the next alert that should be processed based on the current time. + """ event_dates = event["event_dates"] valarm_deltas = event["valarms"] - if event_dates == [] or current_time > event_dates[-1]: + if event_dates == [] or event_dates is None or current_time > event_dates[-1]: return None, None next_event = [i for i in event_dates if i >= current_time][0] next_alert_list = [next_event + i for i in valarm_deltas] @@ -314,6 +296,9 @@ def get_next_alert(event, current_time): return next_alert - dt.timedelta(seconds=5), next_event def process_alert(current_time, next_alert, next_event, event, config): + """ + This function processes a given alert and passes it to a messaging client. + """ if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15): if len(event["alert_history"]) == 0: print("First alert for '{}' detected".format(event["summary"])) @@ -330,24 +315,35 @@ def process_alert(current_time, next_alert, next_event, event, config): f.write(str(event)) # write expects a str not dict return -def create_event_list(files): +def main(): + # Parse args and config + args = parse_args() + content = read_file(args.config) + config = parse_toml(content) + + # Get calendar dir + cal_dir = Path(config["app"]["calendar_dir"]) + if not cal_dir.is_dir(): + print(f"The provided path to .ics files does not exist: '{cal_dir}'") + sys.exit(1) # Exit with error code + + #Parse calendar events + calendar_parser = CalendarParser() + files = list(cal_dir.glob('*.ics')) event_list = [] # List to hold dictionaries for each event for file in files: with open(file, 'r') as f: cal_str = f.read() - event_dict = calendar_parser(cal_str) + event_dict = calendar_parser.parse_calendar(cal_str) event_list.append(event_dict) - return event_list -def start_observer(cal_dir): + #Start file handler to detect changes to calendar dir observer = Observer() - handler = FileChangeHandler() + handler = FileChangeHandler(event_list) # Pass event_list here observer.schedule(handler, cal_dir, recursive=True) observer.start() - return observer -def main_loop(event_list, config, cal_dir): - observer = start_observer(cal_dir) + #Start main loop try: while True: with open("status", 'w') as f: @@ -380,21 +376,5 @@ def main_loop(event_list, config, cal_dir): observer.stop() observer.join() -# Main script execution starts here -def main(): - args = parse_args() - print(args) - content = read_file(args.config) - config = parse_toml(content) - - cal_dir = Path(config["app"]["calendar_dir"]) - if not cal_dir.is_dir(): - print(f"The provided path to .ics files does not exist: '{cal_dir}'") - sys.exit(1) # Exit with error code - - files = list(cal_dir.glob('*.ics')) - event_list = create_event_list(files) - main_loop(event_list, config, cal_dir) - if __name__ == "__main__": main()