from pathlib import Path from icalendar import Calendar, Event import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time from dateutil.relativedelta import relativedelta from dateutil.rrule import rrulestr import datetime as dt import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import email_alert, xmpp_alert from pprint import pprint import humanfriendly from pathlib import Path import argparse import textwrap import logging def setup_logger(loglevel): """Setup basic logging.""" loglevel = getattr(logging, loglevel.upper(), None) if not isinstance(loglevel, int): raise ValueError('Invalid log level: %s' % loglevel) logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s') logger = logging.getLogger() logger.setLevel(loglevel) def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level') args = parser.parse_args() if args.config is None: raise ValueError("No config file provided") return args def read_file(filename): try: return Path(filename).read_text() except FileNotFoundError as e: print(f"Error: The specified file does not exist. {e}") sys.exit(1) # Exit with error code def parse_toml(content): try: return toml.loads(content) except Exception as e: print("Error: Failed to parse TOML file.") print(e) sys.exit(1) # Exit with error code def calculate_event_hash(event): return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() class DateTimeEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, (dt.datetime, dt.timedelta)): return str(o) # Convert the datetime or timedelta object to a string return super().default(o) class FileChangeHandler(FileSystemEventHandler): """ `FileChangeHandler` is a custom event handler for the `watchdog.observers.Observer` class that handles file system events such as file modifications, deletions and creations. """ def __init__(self, event_list): self.calendar_parser = CalendarParser() # Create an instance of CalendarParser self.event_list = event_list def on_modified(self, event): print(f"File modified: {event.src_path}") if not event.is_directory: try: with open(event.src_path, 'r') as f: cal_str = f.read() except Exception as e: print("Not a valid file: {}. Error: {}".format(event.src_path, e)) return try: event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as i: print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) return # Handle the modified event self.handle_modified(old_event=None, event_dict=event_dict) def on_deleted(self, event): print(f"File deleted: {event.src_path}") if not event.is_directory: uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) def on_created(self, event): print(f"File created: {event.src_path}") if not event.is_directory: try: with open(event.src_path, 'r') as f: cal_str = f.read() except Exception as e: print("Not a valid file: {}. Error: {}".format(event.src_path, e)) return try: event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as i: print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) return self.handle_modified(old_event=None, event_dict=event_dict) def handle_modified(self, old_event, event_dict, remove=False): if not remove: for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: old_hash = old_event["hash"] new_hash = calculate_event_hash(event_dict) if new_hash != old_hash: print("Event with UID {} has been modified or deleted".format(old_event["uid"])) self.event_list[i] = event_dict else: print("Event with UID {} hasn't been modified".format(old_event["uid"])) break else: self.event_list.append(event_dict) else: # If remove is True, remove the event from the list for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: print("Event with UID {} has been deleted".format(old_event["uid"])) del self.event_list[i] break class RecurringEventGenerator: def __init__(self, dtstart, rrule): self.dtstart = dtstart self.rrule = rrule self.recur_info = { "recur_dates": [dtstart], "infinite_recur": False, "recur_freq": None, "recur_interval": None, "n_recur_dates_left": None } def generate(self): if self.rrule is None: return self.recur_info rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8')) start_date = self.dtstart infinite_recur = False freq = self.rrule.get('FREQ')[0] count = self.rrule.get("COUNT") interval = self.rrule.get('INTERVAL')[0] current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) if count is None: delta = None if freq == "DAILY": delta = relativedelta(days=interval) elif freq == "MONTHLY": delta = relativedelta(months=interval) elif freq == "YEARLY": delta = relativedelta(years=interval) count = 0 origin_date = start_date while origin_date < current_date: count += interval origin_date += delta*interval rule_str += ";COUNT={}".format(count+10) infinite_recur = True ruleset = rrulestr(rule_str, dtstart=start_date) recur_dates = [None] n_recur = None dates = list(ruleset) # Generate future dates according to the rules recur_dates = [i for i in dates if i >= current_date] n_recur = "inf" if infinite_recur is True else len(recur_dates) self.recur_info["recur_dates"] = recur_dates self.recur_info["infinite_recur"] = infinite_recur self.recur_info["recur_freq"] = freq self.recur_info["recur_interval"] = interval self.recur_info["n_recur_dates_left"] = n_recur return self.recur_info class CalendarParser: def parse_calendar(self, cal_str): # Parse the calendar cal = self.parse_icalendar(cal_str) # Iterate over each event in the calendar for event in cal.walk('vevent'): event_dict = self.process_event(event) dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt) generator = RecurringEventGenerator(dtstart, event_dict["rrule"]) recur_info = generator.generate() event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"]) valarms = self.process_valarm(event) event_dict = { "uid": str(event_dict["uid"]), "dtstart": dtstart, "summary": event_dict["summary"], "description": event_dict["description"], "location": event_dict["location"], "event_dates": event_dates, "recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format( recur_info["recur_freq"], recur_info["recur_interval"], recur_info["n_recur_dates_left"] ), "valarms": valarms, "alert_history": [] } new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary event_dict["hash"] = new_hash # Store the hash in the event dictionary return event_dict def parse_icalendar(self, cal_str): return Calendar.from_ical(cal_str) def process_event(self, event): # Catch errors for missing components event_info = { "uid": None, "dtstart": "", "exdate": [], "summary": None, "description": None, "location": None, "rrule": None } for info in event_info: try: event_info[info] = event[info] except Exception: pass return event_info def dtstart_to_datetime(self, dtstart): # Ensure dates are always as datetime if isinstance(dtstart, dt.datetime): return dtstart.replace(tzinfo=pytz.UTC) else: return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) def remove_exdates(self, exdates, recur_dates): if exdates != []: if isinstance(exdates, list): exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates] else: exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] return [i for i in recur_dates if i not in exdates] else: return recur_dates def process_valarm(self, event): valarms = [] for subcomponent in event.walk("valarm"): valarm = Event.from_ical(subcomponent.to_ical()) timedelta = valarm["trigger"].dt valarms.append(timedelta) return valarms def get_next_alert(event, current_time): """ This function returns the next alert that should be processed based on the current time. """ event_dates = event["event_dates"] valarm_deltas = event["valarms"] if event_dates == [] or event_dates is None or current_time > event_dates[-1]: return None, None next_event = [i for i in event_dates if i >= current_time][0] next_alert_list = [next_event + i for i in valarm_deltas] next_alert = min(next_alert_list) return next_alert - dt.timedelta(seconds=5), next_event def process_alert(current_time, next_alert, next_event, event, config): """ This function processes a given alert and passes it to a messaging client. """ if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15): if len(event["alert_history"]) == 0: print("First alert for '{}' detected".format(event["summary"])) event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}] elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]: return # continue is not needed here as it's the end of function else: print("Posting alert for {}!".format(event["summary"])) event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}) #xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config) email_alert.send_email(event, next_alert, next_event, config) xmpp_alert.send_xmpp(event, next_alert, next_event, config) with open("alert_history", 'a') as f: f.write(str(event)) # write expects a str not dict return def main(): # Parse args and config args = parse_args() content = read_file(args.config) config = parse_toml(content) # Get calendar dir cal_dir = Path(config["app"]["calendar_dir"]) if not cal_dir.is_dir(): print(f"The provided path to .ics files does not exist: '{cal_dir}'") sys.exit(1) # Exit with error code #Parse calendar events calendar_parser = CalendarParser() files = list(cal_dir.glob('*.ics')) event_list = [] # List to hold dictionaries for each event for file in files: with open(file, 'r') as f: cal_str = f.read() event_dict = calendar_parser.parse_calendar(cal_str) event_list.append(event_dict) #Start file handler to detect changes to calendar dir observer = Observer() handler = FileChangeHandler(event_list) # Pass event_list here observer.schedule(handler, cal_dir, recursive=True) observer.start() #Start main loop try: while True: with open("status", 'w') as f: #Refresh the status file f.write("") current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) for event in event_list: next_alert, next_event = get_next_alert(event, current_time) if next_alert == None: continue event_delta = next_alert-current_time total_seconds = event_delta.total_seconds() human_readable_time = humanfriendly.format_timespan(total_seconds) monitor_status = f"""\ Current time: {current_time} Monitoring: {event["summary"]} Event date: {next_event} Recur Dates: {[str(i) for i in event["event_dates"]]} Next alert on: {next_alert} in {human_readable_time} Recur info: {event["recur_info"]} Alert history: {event["alert_history"]}\n""" monitor_status = textwrap.dedent(monitor_status) with open("status", 'a') as f: # Write the output to the file f.write(monitor_status) f.write("\n") process_alert(current_time, next_alert, next_event, event, config) time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": main()