inital commit
This commit is contained in:
commit
5c4db272a7
|
@ -0,0 +1,111 @@
|
||||||
|
from icalendar import Calendar, Event, vRecur
|
||||||
|
from dateutil.relativedelta import relativedelta
|
||||||
|
from dateutil.rrule import rruleset, rrulestr
|
||||||
|
import datetime as dt
|
||||||
|
from datetime import date, time
|
||||||
|
import glob
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from watchdog.observers import Observer
|
||||||
|
from watchdog.events import FileSystemEventHandler
|
||||||
|
|
||||||
|
# Get all .ics files from your directory
|
||||||
|
files = glob.glob(os.path.join('/home/mrsu/.local/share/caldav/personal-calendar/default', '*.ics'))
|
||||||
|
|
||||||
|
def calculate_recur_dates(dtstart, vrecur):
|
||||||
|
rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8'))
|
||||||
|
start_date = dtstart
|
||||||
|
if vrecur.get("COUNT") is None:
|
||||||
|
# If it doesn't, calculate an end date based on FREQ and INTERVAL
|
||||||
|
freq = vrecur.get('FREQ')[0] # Get the first frequency (e.g., 'DAILY', 'WEEKLY', etc.)
|
||||||
|
interval = vrecur.get('INTERVAL')[0]
|
||||||
|
|
||||||
|
delta = None
|
||||||
|
|
||||||
|
if freq == "DAILY":
|
||||||
|
delta = relativedelta(days=interval)
|
||||||
|
elif freq == "MONTHLY":
|
||||||
|
delta = relativedelta(months=interval)
|
||||||
|
elif freq == "YEARLY":
|
||||||
|
delta = relativedelta(years=interval)
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
current_date = dt.datetime.today().date()
|
||||||
|
origin_date = start_date.date() if isinstance(start_date, dt.datetime) else start_date
|
||||||
|
while origin_date < current_date:
|
||||||
|
count += interval
|
||||||
|
origin_date += delta*interval
|
||||||
|
|
||||||
|
rule_str += ";COUNT={}".format(count+10)
|
||||||
|
else:
|
||||||
|
None
|
||||||
|
# If 'COUNT' exists, set it to a high number so that the rrulestr method will stop generating dates after the specified count
|
||||||
|
#rule_str += ";UNTIL=21001231T000000Z" # This sets an end date far in the future
|
||||||
|
|
||||||
|
ruleset = rrulestr(rule_str, dtstart=start_date)
|
||||||
|
|
||||||
|
# Generate future dates according to the rules
|
||||||
|
dates = list(ruleset)
|
||||||
|
return [d for d in dates][1:] # Remove first date as the same as start_date
|
||||||
|
|
||||||
|
def calendar_parser(cal_str):
|
||||||
|
# Parse the calendar
|
||||||
|
cal = Calendar.from_ical(cal_str)
|
||||||
|
|
||||||
|
for component in cal.walk():
|
||||||
|
if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary
|
||||||
|
uid = component.get("UID")
|
||||||
|
dtstart = component.get("DTSTART").dt
|
||||||
|
summary = component.get("SUMMARY")
|
||||||
|
vrecur = component.get("RRULE")
|
||||||
|
recur_dates = [None]
|
||||||
|
if vrecur is not None:
|
||||||
|
recur_dates = calculate_recur_dates(dtstart, vrecur)
|
||||||
|
|
||||||
|
valarm_list = [] # List to hold all VALARM for this event
|
||||||
|
for subcomponent in component.walk(): # Find all VALARMs for this VEVENT
|
||||||
|
if subcomponent.name == "VALARM":
|
||||||
|
valarm = Event.from_ical(subcomponent.to_ical())
|
||||||
|
timedelta = valarm.get("TRIGGER").dt
|
||||||
|
valarm_list.append(timedelta) # Add this VALARM to the list
|
||||||
|
|
||||||
|
return {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_triggered": [False*len(valarm_list)]}
|
||||||
|
|
||||||
|
event_list = [] # List to hold dictionaries for each event
|
||||||
|
for file in files:
|
||||||
|
with open(file, 'r') as f:
|
||||||
|
cal_str = f.read()
|
||||||
|
event_dict = calendar_parser(cal_str)
|
||||||
|
event_list.append(event_dict)
|
||||||
|
|
||||||
|
class FileChangeHandler(FileSystemEventHandler):
|
||||||
|
def on_modified(self, event):
|
||||||
|
if not event.is_directory: # If it's a file and not a directory
|
||||||
|
print("Updating events")
|
||||||
|
with open(event.src_path, 'r') as f:
|
||||||
|
cal_str = f.read()
|
||||||
|
|
||||||
|
# Parse the calendar
|
||||||
|
event_dict = calendar_parser(cal_str)
|
||||||
|
event_list.append(event_dict)
|
||||||
|
|
||||||
|
observer = Observer()
|
||||||
|
handler = FileChangeHandler()
|
||||||
|
observer.schedule(handler, '/home/mrsu/.local/share/caldav/personal-calendar/default', recursive=True)
|
||||||
|
observer.start()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
current_time = dt.datetime.now()
|
||||||
|
for event in event_list:
|
||||||
|
None
|
||||||
|
#event["valarm"] = [a if isinstance(a, dt.datetime) else dt.datetime.combine(a, dt.datetime.min.time()) for a in event["valarm"]]
|
||||||
|
#print(event["summary"], event["freq"], current_time, [str(i.replace(tzinfo=None)) for i in event['valarm']], event["dtstart"])
|
||||||
|
print(event["summary"], str(event["dtstart"]), [str(i) for i in event["recur_dates"]])#, [str(alarm_time) for alarm_time in event['valarm']])
|
||||||
|
# if any(abs(current_time - alarm_time.total_seconds() <= 1 for alarm_time in event['valarm']) and event["alert_triggered"][0] is False:
|
||||||
|
# print("Alert for event with UID:", event["uid"], "Summary:", event["summary"])
|
||||||
|
# event["alert_triggered"][0] = True
|
||||||
|
|
||||||
|
time.sleep(1) # Wait for a second before checking again
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
observer.stop()
|
||||||
|
observer.join()
|
Loading…
Reference in New Issue