remindme_caldav/remindme_caldav.py

241 lines
10 KiB
Python
Raw Normal View History

from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
2024-02-04 02:53:30 +00:00
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrulestr
2024-02-04 02:53:30 +00:00
import datetime as dt
from datetime import time, timedelta, datetime
2024-02-04 02:53:30 +00:00
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import xmpp_alert, email_alert
# Parse args
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
args = parser.parse_args()
if not args.config:
print("Error: No config file provided.")
sys.exit(1) # Exit with error code
elif not os.path.isfile(args.config):
print("Error: The specified config file does not exist.")
sys.exit(1) # Exit with error code
else:
print("Config file path: ", args.config)
# Get config
try:
with open(args.config, 'r') as f:
config = toml.load(f)
except Exception as e:
print("Error: Failed to parse TOML file.")
print(e)
sys.exit(1) # Exit with error code
cal_dir = config["app"]["calendar_dir"]
2024-02-04 02:53:30 +00:00
# Get all .ics files from your directory
files = glob.glob(os.path.join(cal_dir, '*.ics'))
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (datetime, timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f"File modified: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(datetime.now()), "Sync detected, updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
# Parse the calendar
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for i, old_event in enumerate(event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
if new_hash != old_hash: # If the hashes don't match, it means that the event has been modified or deleted
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
# Update the event in the list
event_list[i] = event_dict
else: # If the hashes match, it means that the event hasn't been modified
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def on_deleted(self, event):
print(f"File deleted: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
for i, old_event in enumerate(event_list):
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
if old_event["uid"] == uid: # If the same event is found
print("Event with UID {} has been deleted".format(old_event["uid"]))
# Remove the event from the list
del event_list[i]
break
def on_created(self, event):
print(f"File created: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for old_event in event_list:
if old_event["uid"] == event_dict["uid"]: # If the same event is found
print("Event with UID {} already exists".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def calculate_event_hash(self, event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
2024-02-04 02:53:30 +00:00
def calculate_recur_dates(dtstart, vrecur):
rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8'))
start_date = dtstart
if vrecur.get("COUNT") is None:
# If no COUNT, calculate an end date based on FREQ and INTERVAL to prevent generating too many dates
freq = vrecur.get('FREQ')[0]
2024-02-04 02:53:30 +00:00
interval = vrecur.get('INTERVAL')[0]
delta = None
if freq == "DAILY":
delta = relativedelta(days=interval)
elif freq == "MONTHLY":
delta = relativedelta(months=interval)
elif freq == "YEARLY":
delta = relativedelta(years=interval)
count = 0
current_date = datetime.now().replace(tzinfo=pytz.UTC)
origin_date = start_date
2024-02-04 02:53:30 +00:00
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
ruleset = rrulestr(rule_str, dtstart=start_date)
# Generate future dates according to the rules
dates = list(ruleset)
return [d for d in dates if d > start_date]
2024-02-04 02:53:30 +00:00
def calendar_parser(cal_str):
# Parse the calendar
cal = Calendar.from_ical(cal_str)
for component in cal.walk():
if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary
uid = component.get("UID")
dtstart = component.get("DTSTART").dt
dtstart = dtstart if isinstance(dtstart, dt.datetime) else datetime.combine(dtstart, dt.time.min) # Ensure dates are always as datetime
dtstart = dtstart.replace(tzinfo=pytz.UTC)
2024-02-04 02:53:30 +00:00
summary = component.get("SUMMARY")
vrecur = component.get("RRULE")
recur_dates = [None]
if vrecur is not None:
recur_dates = calculate_recur_dates(dtstart, vrecur)
valarm_list = [] # List to hold all VALARM for this event
for subcomponent in component.walk(): # Find all VALARMs for this VEVENT
if subcomponent.name == "VALARM":
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm.get("TRIGGER").dt
valarm_list.append(timedelta) # Add this VALARM to the list
event_dict = {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_history": []}
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def get_next_alert(event, current_time):
recur_dates = event["recur_dates"]
valarm_deltas = event["valarm"]
dtstart = event["dtstart"]
next_event = None
if recur_dates[0] is None:
next_event = dtstart
elif current_time <= dtstart:
next_event = dtstart
else:
next_event = recur_dates[0]
if current_time > next_event:
return None, next_event
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if next_alert_list == []:
next_alert = next_event
else:
next_alert = min(next_alert_list)
return next_alert - timedelta(seconds=5), next_event
2024-02-04 02:53:30 +00:00
# Create initial event_list using calendar_parser
2024-02-04 02:53:30 +00:00
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
event_dict = calendar_parser(cal_str)
event_list.append(event_dict)
observer = Observer()
handler = FileChangeHandler()
observer.schedule(handler, cal_dir, recursive=True)
2024-02-04 02:53:30 +00:00
observer.start()
try:
while True:
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
2024-02-04 02:53:30 +00:00
for event in event_list:
next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None:
continue
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nNext alert on: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, next_alert, event["alert_history"])
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
if current_time >= next_alert and next_alert < next_alert + timedelta(seconds=15):
if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"]))
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
continue
else:
print("Posting alert for {}!".format(event["summary"]))
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config)
email_alert.send_email(event["summary"], next_alert, next_event, args.config)
with open("alert_history", 'a') as f:
f.write(event)
time.sleep(1)
2024-02-04 02:53:30 +00:00
except KeyboardInterrupt:
observer.stop()
observer.join()