from pathlib import Path from icalendar import Calendar, Event import toml, argparse, os, sys, hashlib, json, pytz, os, time from dateutil.relativedelta import relativedelta from dateutil.rrule import rrulestr import datetime as dt import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import humanfriendly from pathlib import Path import argparse, textwrap, logging from alert_processor import AlertProcessor logger = logging.getLogger() def setup_log_location(logdir): if not Path(logdir).is_dir(): raise FileNotFoundError(f"Log dir '{logdir}' does not exist. Be sure to create it first.") log_location = os.path.join(logdir, "log") status_location = os.path.join(logdir, "status") alert_history_location = os.path.join(logdir, "alert_history") return log_location, status_location, alert_history_location def setup_logging(log_location): log_format='[%(levelname)s] %(asctime)s %(message)s' logging.basicConfig(filename = log_location, format=log_format, level=logging.INFO) def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--logdir', type=str, help='Path to logfile directory', default = "logs") parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) args = parser.parse_args() if args.config is None: logger.error("No config file provided. Please use --config path_to_config.toml") sys.exit(1) return args def read_file(filename): try: return Path(filename).read_text() except FileNotFoundError: raise FileNotFoundError("Error: The specified file does not exist.") def parse_toml(content): try: config = toml.loads(content) if config is None: logging.error("Invalid config") sys.exit(1) return config except Exception: raise RuntimeError("Error: Failed to parse TOML file.") def get_calendar_dir(config): cal_dir = Path(config["app"]["calendar_dir"]) if not cal_dir.is_dir(): logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'") sys.exit(1) return cal_dir def parse_calendar_files(cal_dir): files = [] no_files_detected = True logger.info(f"Looking for calendar files in {cal_dir}...") while no_files_detected is True: files = list(cal_dir.glob('*.ics')) if len(files) != 0: logger.info("Calendar files detected in sync location!") no_files_detected = False return files def construct_initial_event_dict(cal_dir): files = parse_calendar_files(cal_dir) calendar_parser = CalendarParser() event_list = [] for file in files: with open(file, 'r') as f: cal_str = f.read() try: event_dict = calendar_parser.parse_calendar(cal_str) except Exception as e: logger.warning(f"Error parsing event, skipping. {file}. Error message {e}") continue event_list.append(event_dict) return event_list def calculate_event_hash(event): return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() class DateTimeEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, (dt.datetime, dt.timedelta)): return str(o) # Convert the datetime or timedelta object to a string return super().default(o) class FileChangeHandler(FileSystemEventHandler): """ `FileChangeHandler` is a custom event handler for the `watchdog.observers.Observer` class that handles file system events such as file modifications, deletions and creations. """ def __init__(self, event_list): self.calendar_parser = CalendarParser() self.event_list = event_list def on_modified(self, event): logger.info(f"File modified: {event.src_path}") if not event.is_directory: try: with open(event.src_path, 'r') as f: cal_str = f.read() except Exception as e: logger.error(f"Not a valid file: {event.src_path}. Error: {e}") return try: event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as e: logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") return self.handle_modified(old_event=None, event_dict=event_dict) def on_deleted(self, event): logger.info(f"File deleted: {event.src_path}") if not event.is_directory: uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) def on_created(self, event): logger.info(f"File created: {event.src_path}") if not event.is_directory: try: with open(event.src_path, 'r') as f: cal_str = f.read() except Exception as e: logger.warning(f"Not a valid file: {event.src_path}. Error: {e}") return try: event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as e: logger.warning(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") return self.handle_modified(old_event=None, event_dict=event_dict) def handle_modified(self, old_event, event_dict, remove=False): if not remove: for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: old_hash = old_event["hash"] new_hash = calculate_event_hash(event_dict) if new_hash != old_hash: logger.info(f"Event with UID {old_event['uid']} has been modified or deleted") self.event_list[i] = event_dict break else: self.event_list.append(event_dict) else: # If remove is True, remove the event from the list for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: logger.info(f"Event with UID {old_event['uid']} has been deleted") del self.event_list[i] break class RecurringEventGenerator: """ A class to generate recurring events based on a start date and a recurrence rule. """ def __init__(self, dtstart, rrule): self.dtstart = dtstart self.rrule = rrule self.recur_info = { "recur_dates": [dtstart], "infinite_recur": False, "recur_freq": None, "recur_interval": None, "n_recur_dates_left": 1 } def generate(self): """ """ if self.rrule is None: return self.recur_info rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8')) start_date = self.dtstart infinite_recur = False freq = self.rrule.get('FREQ')[0] count = self.rrule.get("COUNT") interval = self.rrule.get('INTERVAL')[0] until = self.rrule.get('UNTIL') current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) if count is None or until is not None: # If there is no COUNT value in RRULE, we need to manually calculate # the dates else rrulestr method will return a very large number of # values. Here we iterate from the start_date to the current_date based # on the interval, then add an arbitrary number of days to that (here # it's 10). delta = None if freq == "DAILY": delta = relativedelta(days=interval) elif freq == "MONTHLY": delta = relativedelta(months=interval) elif freq == "YEARLY": delta = relativedelta(years=interval) count = 0 origin_date = start_date while origin_date < current_date: count += interval origin_date += delta*interval rule_str += ";COUNT={}".format(count+10) infinite_recur = True ruleset = rrulestr(rule_str, dtstart=start_date) recur_dates = [None] n_recur = None dates = list(ruleset) # Generate future dates according to the rules recur_dates = [i for i in dates if i >= current_date] n_recur = "inf" if infinite_recur is True else len(recur_dates) self.recur_info["recur_dates"] = recur_dates self.recur_info["infinite_recur"] = infinite_recur self.recur_info["recur_freq"] = freq self.recur_info["recur_interval"] = interval self.recur_info["n_recur_dates_left"] = n_recur return self.recur_info class CalendarParser: def parse_calendar(self, cal_str): """ Parse a calendar string and process each event. """ # Parse the calendar cal = self.parse_icalendar(cal_str) # Iterate over each event in the calendar for event in cal.walk('vevent'): event_dict = self.process_event(event) dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt) generator = RecurringEventGenerator(dtstart, event_dict["rrule"]) recur_info = generator.generate() event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"]) if len(event_dates) == 0: logging.warning(f"No event dates for event: '{event['summary']}'") valarms = self.process_valarm(event) event_dict = { "uid": str(event_dict["uid"]), "dtstart": dtstart, "summary": event_dict["summary"], "description": event_dict["description"], "location": event_dict["location"], "event_dates": event_dates, "recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format( recur_info["recur_freq"], recur_info["recur_interval"], recur_info["n_recur_dates_left"] ), "valarms": valarms, "alert_history": [] } try: new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary except Exception: raise RuntimeError("Error calculating event hash") event_dict["hash"] = new_hash # Store the hash in the event dictionary return event_dict def parse_icalendar(self, cal_str): """ Parse a calendar string into an iCalendar object. """ try: return Calendar.from_ical(cal_str) except Exception: raise RuntimeError("Error parsing icalendar.") def process_event(self, event): """ Process an event from a parsed calendar and extract relevant information. """ event_info = { "uid": None, "dtstart": "", "exdate": [], "summary": None, "description": None, "location": None, "rrule": None } # Catch errors for missing components for info in event_info: try: event_info[info] = event[info] except Exception: logging.info(f"CalDav componant '{info}' missing for event {event['summary']}") return event_info def dtstart_to_datetime(self, dtstart): """ Convert a date or datetime object into a datetime object with UTC timezone. """ # Ensure dates are always as datetime try: if isinstance(dtstart, dt.datetime): return dtstart.replace(tzinfo=pytz.UTC) else: return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) except Exception: raise RuntimeError("Error converting dtstart to datetime.") def remove_exdates(self, exdates, recur_dates): """ Remove dates from a list of recurring event dates that are in the exdate list. """ if exdates != []: try: if isinstance(exdates, list): exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates] else: exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] return [i for i in recur_dates if i not in exdates] except Exception: raise RuntimeError("Error processing exdates.") else: return recur_dates def process_valarm(self, event): """ Process VALARM components from an iCalendar event and extract trigger times. """ valarms = [] for subcomponent in event.walk("valarm"): valarm = Event.from_ical(subcomponent.to_ical()) timedelta = valarm["trigger"].dt valarms.append(timedelta) if len(valarms) == 0: logging.info(f"No reminders for event: {event['summary']}") return valarms def get_next_alert(event, current_time): """ Returns the next alert that should be processed based on the current time. """ event_dates = event["event_dates"] valarm_deltas = event["valarms"] if event_dates == [] or event_dates is None or current_time > event_dates[-1]: return None, None next_event = [i for i in event_dates if i >= current_time][0] next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time] if len(next_alert_list) == 0: next_alert = next_event else: next_alert = min(next_alert_list) return next_alert - dt.timedelta(seconds=5), next_event def process_alert(current_time, next_alert, next_event, event, config, alert_history_location): """ Processes a given alert and passes it to a messaging client. """ if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15): if len(event["alert_history"]) == 0: logger.info(f"First alert for '{event['summary']}' detected") event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}] elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]: return else: logger.info(f"Posting alert for {event['summary']}!") event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}) try: processor = AlertProcessor(config) processor.send_email(event, next_alert, next_event) except Exception as e: raise RuntimeError(f"Error sending alert for event. {e}") with open(alert_history_location, 'a') as f: f.write(str(event)) return def daemon(status_location, alert_history_location, config, event_list): with open(status_location, 'w') as f: f.write("") # Refresh the status file current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) for event in event_list: try: next_alert, next_event = get_next_alert(event, current_time) except RuntimeError as e: logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}") continue if next_alert == None: continue event_delta = next_alert-current_time total_seconds = event_delta.total_seconds() human_readable_time = humanfriendly.format_timespan(total_seconds) monitor_status = f"""\ Current time: {current_time} Monitoring: {event["summary"]} Event date: {next_event} Recur Dates: {[str(i) for i in event["event_dates"]]} Next alert on: {next_alert} in {human_readable_time} Recur info: {event["recur_info"]} Alert history: {event["alert_history"]}\n""" monitor_status = textwrap.dedent(monitor_status) with open(status_location, 'a') as f: f.write(monitor_status) # Write the output to the file f.write("\n") try: process_alert(current_time, next_alert, next_event, event, config, alert_history_location) except RuntimeError as e: logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}") return def main(): # Parse args and initiate logging args = parse_args() log_location, status_location, alert_history_location = setup_log_location(args.logdir) setup_logging(log_location) logger = logging.getLogger() # Redefine log level if args passed if args.loglevel is not None: numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer if isinstance(numeric_level, int): logger = logging.getLogger() logger.setLevel(numeric_level) # Set the log level # Setup initial event_list config_file = read_file(args.config) config = parse_toml(config_file) cal_dir = get_calendar_dir(config) event_list = construct_initial_event_dict(cal_dir) # Start file handler to detect changes to calendar dir observer = Observer() handler = FileChangeHandler(event_list) observer.schedule(handler, cal_dir, recursive=True) observer.start() # Start main loop try: while True: daemon(status_location, alert_history_location, config, event_list) time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": main()