From 5c4db272a797cd3be03328d637df9fd330d55fb5 Mon Sep 17 00:00:00 2001 From: mrsu Date: Sun, 4 Feb 2024 02:53:30 +0000 Subject: [PATCH] inital commit --- remindme_caldav.py | 111 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 remindme_caldav.py diff --git a/remindme_caldav.py b/remindme_caldav.py new file mode 100644 index 0000000..28986a4 --- /dev/null +++ b/remindme_caldav.py @@ -0,0 +1,111 @@ +from icalendar import Calendar, Event, vRecur +from dateutil.relativedelta import relativedelta +from dateutil.rrule import rruleset, rrulestr +import datetime as dt +from datetime import date, time +import glob +import os +import time +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + +# Get all .ics files from your directory +files = glob.glob(os.path.join('/home/mrsu/.local/share/caldav/personal-calendar/default', '*.ics')) + +def calculate_recur_dates(dtstart, vrecur): + rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8')) + start_date = dtstart + if vrecur.get("COUNT") is None: + # If it doesn't, calculate an end date based on FREQ and INTERVAL + freq = vrecur.get('FREQ')[0] # Get the first frequency (e.g., 'DAILY', 'WEEKLY', etc.) + interval = vrecur.get('INTERVAL')[0] + + delta = None + + if freq == "DAILY": + delta = relativedelta(days=interval) + elif freq == "MONTHLY": + delta = relativedelta(months=interval) + elif freq == "YEARLY": + delta = relativedelta(years=interval) + + count = 0 + current_date = dt.datetime.today().date() + origin_date = start_date.date() if isinstance(start_date, dt.datetime) else start_date + while origin_date < current_date: + count += interval + origin_date += delta*interval + + rule_str += ";COUNT={}".format(count+10) + else: + None + # If 'COUNT' exists, set it to a high number so that the rrulestr method will stop generating dates after the specified count + #rule_str += ";UNTIL=21001231T000000Z" # This sets an end date far in the future + + ruleset = rrulestr(rule_str, dtstart=start_date) + + # Generate future dates according to the rules + dates = list(ruleset) + return [d for d in dates][1:] # Remove first date as the same as start_date + +def calendar_parser(cal_str): + # Parse the calendar + cal = Calendar.from_ical(cal_str) + + for component in cal.walk(): + if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary + uid = component.get("UID") + dtstart = component.get("DTSTART").dt + summary = component.get("SUMMARY") + vrecur = component.get("RRULE") + recur_dates = [None] + if vrecur is not None: + recur_dates = calculate_recur_dates(dtstart, vrecur) + + valarm_list = [] # List to hold all VALARM for this event + for subcomponent in component.walk(): # Find all VALARMs for this VEVENT + if subcomponent.name == "VALARM": + valarm = Event.from_ical(subcomponent.to_ical()) + timedelta = valarm.get("TRIGGER").dt + valarm_list.append(timedelta) # Add this VALARM to the list + + return {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_triggered": [False*len(valarm_list)]} + +event_list = [] # List to hold dictionaries for each event +for file in files: + with open(file, 'r') as f: + cal_str = f.read() + event_dict = calendar_parser(cal_str) + event_list.append(event_dict) + +class FileChangeHandler(FileSystemEventHandler): + def on_modified(self, event): + if not event.is_directory: # If it's a file and not a directory + print("Updating events") + with open(event.src_path, 'r') as f: + cal_str = f.read() + + # Parse the calendar + event_dict = calendar_parser(cal_str) + event_list.append(event_dict) + +observer = Observer() +handler = FileChangeHandler() +observer.schedule(handler, '/home/mrsu/.local/share/caldav/personal-calendar/default', recursive=True) +observer.start() +try: + while True: + current_time = dt.datetime.now() + for event in event_list: + None + #event["valarm"] = [a if isinstance(a, dt.datetime) else dt.datetime.combine(a, dt.datetime.min.time()) for a in event["valarm"]] + #print(event["summary"], event["freq"], current_time, [str(i.replace(tzinfo=None)) for i in event['valarm']], event["dtstart"]) + print(event["summary"], str(event["dtstart"]), [str(i) for i in event["recur_dates"]])#, [str(alarm_time) for alarm_time in event['valarm']]) +# if any(abs(current_time - alarm_time.total_seconds() <= 1 for alarm_time in event['valarm']) and event["alert_triggered"][0] is False: +# print("Alert for event with UID:", event["uid"], "Summary:", event["summary"]) +# event["alert_triggered"][0] = True + + time.sleep(1) # Wait for a second before checking again +except KeyboardInterrupt: + observer.stop() +observer.join()