from icalendar import Calendar, Event import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time from dateutil.relativedelta import relativedelta from dateutil.rrule import rrulestr import datetime as dt import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import email_alert #import xmpp_alert # Parse args parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') args = parser.parse_args() if not args.config: print("Error: No config file provided.") sys.exit(1) # Exit with error code elif not os.path.isfile(args.config): print("Error: The specified config file does not exist.") sys.exit(1) # Exit with error code else: print("Config file path: ", args.config) # Get config try: with open(args.config, 'r') as f: config = toml.load(f) except Exception as e: print("Error: Failed to parse TOML file.") print(e) sys.exit(1) # Exit with error code cal_dir = config["app"]["calendar_dir"] # Get all .ics files from your directory files = glob.glob(os.path.join(cal_dir, '*.ics')) class DateTimeEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, (dt.datetime, dt.timedelta)): return str(o) # Convert the datetime or timedelta object to a string return super().default(o) class FileChangeHandler(FileSystemEventHandler): def on_modified(self, event): print(f"File modified: {event.src_path}") if not event.is_directory: # If it's a file and not a directory print(str(dt.datetime.now()), "Sync detected, updating events") with open(event.src_path, 'r') as f: cal_str = f.read() # Parse the calendar try: event_dict = calendar_parser(cal_str) new_hash = self.calculate_event_hash(event_dict) except Exception as i: print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) return for i, old_event in enumerate(event_list): if old_event["uid"] == event_dict["uid"]: old_hash = old_event["hash"] if new_hash != old_hash: # If the hashes don't match, it means that the event has been modified or deleted print("Event with UID {} has been modified or deleted".format(old_event["uid"])) # Update the event in the list event_list[i] = event_dict else: # If the hashes match, it means that the event hasn't been modified print("Event with UID {} hasn't been modified".format(old_event["uid"])) break else: # If no matching event is found in the list, add the new event to the list event_list.append(event_dict) def on_deleted(self, event): print(f"File deleted: {event.src_path}") if not event.is_directory: # If it's a file and not a directory print(str(dt.datetime.now()), "Sync detected, updating events") for i, old_event in enumerate(event_list): uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension if old_event["uid"] == uid: # If the same event is found print("Event with UID {} has been deleted".format(old_event["uid"])) # Remove the event from the list del event_list[i] break def on_created(self, event): print(f"File created: {event.src_path}") if not event.is_directory: # If it's a file and not a directory print(str(dt.datetime.now()), "Sync detected, updating events") with open(event.src_path, 'r') as f: cal_str = f.read() try: event_dict = calendar_parser(cal_str) new_hash = self.calculate_event_hash(event_dict) except Exception as i: print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) return for old_event in event_list: if old_event["uid"] == event_dict["uid"]: # If the same event is found print("Event with UID {} already exists".format(old_event["uid"])) break else: # If no matching event is found in the list, add the new event to the list event_list.append(event_dict) def calculate_event_hash(self, event): return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() def calculate_recur_dates(dtstart, vrecur): rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8')) start_date = dtstart if vrecur.get("COUNT") is None: # If no COUNT, calculate an end date based on FREQ and INTERVAL to prevent generating too many dates freq = vrecur.get('FREQ')[0] interval = vrecur.get('INTERVAL')[0] delta = None if freq == "DAILY": delta = relativedelta(days=interval) elif freq == "MONTHLY": delta = relativedelta(months=interval) elif freq == "YEARLY": delta = relativedelta(years=interval) count = 0 current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) origin_date = start_date while origin_date < current_date: count += interval origin_date += delta*interval rule_str += ";COUNT={}".format(count+10) ruleset = rrulestr(rule_str, dtstart=start_date) # Generate future dates according to the rules dates = list(ruleset) return [d for d in dates if d > start_date] def calendar_parser(cal_str): # Parse the calendar cal = Calendar.from_ical(cal_str) for component in cal.walk(): if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary uid = component.get("UID") dtstart = component.get("DTSTART").dt dtstart = dtstart if isinstance(dtstart, dt.datetime) else dt.datetime.combine(dtstart, dt.time.min) # Ensure dates are always as datetime dtstart = dtstart.replace(tzinfo=pytz.UTC) summary = component.get("SUMMARY") vrecur = component.get("RRULE") recur_dates = [None] if vrecur is not None: recur_dates = calculate_recur_dates(dtstart, vrecur) valarm_list = [] # List to hold all VALARM for this event for subcomponent in component.walk(): # Find all VALARMs for this VEVENT if subcomponent.name == "VALARM": valarm = Event.from_ical(subcomponent.to_ical()) timedelta = valarm.get("TRIGGER").dt valarm_list.append(timedelta) # Add this VALARM to the list event_dict = {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_history": []} handler = FileChangeHandler() # Create an instance of the FileChangeHandler class new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary event_dict["hash"] = new_hash # Store the hash in the event dictionary return event_dict def get_next_alert(event, current_time): recur_dates = event["recur_dates"] valarm_deltas = event["valarm"] dtstart = event["dtstart"] next_event = None if recur_dates[0] is None: next_event = dtstart elif current_time <= dtstart: next_event = dtstart else: next_event = recur_dates[0] if current_time > next_event: return None, next_event next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time] if next_alert_list == []: next_alert = next_event else: next_alert = min(next_alert_list) return next_alert - dt.timedelta(seconds=5), next_event # Create initial event_list using calendar_parser event_list = [] # List to hold dictionaries for each event for file in files: with open(file, 'r') as f: cal_str = f.read() event_dict = calendar_parser(cal_str) event_list.append(event_dict) observer = Observer() handler = FileChangeHandler() observer.schedule(handler, cal_dir, recursive=True) observer.start() try: while True: with open("status", 'w') as f: #Refresh the status file f.write("") current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) for event in event_list: next_alert, next_event = get_next_alert(event, current_time) if next_alert == None: continue monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nNext alert on: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, next_alert, event["alert_history"]) with open("status", 'a') as f: # Write the output to the file f.write(monitor_status) f.write("\n") if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15): if len(event["alert_history"]) == 0: print("First alert for '{}' detected".format(event["summary"])) event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}] elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]: continue else: print("Posting alert for {}!".format(event["summary"])) event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}) #xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config) email_alert.send_email(event["summary"], next_alert, next_event, args.config) with open("alert_history", 'a') as f: f.write(event) time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()