remindme_caldav/remindme_caldav.py

268 lines
12 KiB
Python

from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrulestr
import datetime as dt
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import email_alert, xmpp_alert
# Parse args
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
args = parser.parse_args()
if not args.config:
print("Error: No config file provided.")
sys.exit(1) # Exit with error code
elif not os.path.isfile(args.config):
print("Error: The specified config file does not exist.")
sys.exit(1) # Exit with error code
else:
print("Config file path: ", args.config)
# Get config
try:
with open(args.config, 'r') as f:
config = toml.load(f)
except Exception as e:
print("Error: Failed to parse TOML file.")
print(e)
sys.exit(1) # Exit with error code
cal_dir = config["app"]["calendar_dir"]
# Check if the path is a directory
if not os.path.isdir(cal_dir):
print("The provided path to .ics files does not exist: '{}'".format(cal_dir))
exit(1)
# Get all .ics files from your directory
files = glob.glob(os.path.join(cal_dir, '*.ics'))
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (dt.datetime, dt.timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f"File modified: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
print("Not a valid file: {}. Error: {}".format(event.src_path, e))
return
# Parse the calendar
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for i, old_event in enumerate(event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
if new_hash != old_hash: # If the hashes don't match, it means that the event has been modified or deleted
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
# Update the event in the list
event_list[i] = event_dict
else: # If the hashes match, it means that the event hasn't been modified
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def on_deleted(self, event):
print(f"File deleted: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
for i, old_event in enumerate(event_list):
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
if old_event["uid"] == uid: # If the same event is found
print("Event with UID {} has been deleted".format(old_event["uid"]))
# Remove the event from the list
del event_list[i]
break
def on_created(self, event):
print(f"File created: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
print("Not a valid file: {}. Error: {}".format(event.src_path, e))
return
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for old_event in event_list:
if old_event["uid"] == event_dict["uid"]: # If the same event is found
print("Event with UID {} already exists".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def calculate_event_hash(self, event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
def calculate_recur_dates(dtstart, vrecur):
rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8'))
start_date = dtstart
infinite_recur = False
freq = vrecur.get('FREQ')[0]
interval = vrecur.get('INTERVAL')[0]
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
if vrecur.get("COUNT") is None:
# If no COUNT, calculate an end date based on FREQ and INTERVAL to prevent generating too many dates
delta = None
if freq == "DAILY":
delta = relativedelta(days=interval)
elif freq == "MONTHLY":
delta = relativedelta(months=interval)
elif freq == "YEARLY":
delta = relativedelta(years=interval)
count = 0
origin_date = start_date
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
infinite_recur = True
ruleset = rrulestr(rule_str, dtstart=start_date)
# Generate future dates according to the rules
dates = list(ruleset)
n_recur = len([d for d in dates if d > current_date])
return [d for d in dates if d > start_date], infinite_recur, freq, interval, n_recur
def calendar_parser(cal_str):
# Parse the calendar
cal = Calendar.from_ical(cal_str)
for component in cal.walk():
if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary
uid = component.get("UID")
dtstart = component.get("DTSTART").dt
dtstart = dtstart if isinstance(dtstart, dt.datetime) else dt.datetime.combine(dtstart, dt.time.min) # Ensure dates are always as datetime
dtstart = dtstart.replace(tzinfo=pytz.UTC)
summary = component.get("SUMMARY")
description = component.get("DESCRIPTION")
location = component.get("LOCATION")
vrecur = component.get("RRULE")
recur_dates = [None]
recur_info=None
if vrecur is not None:
recur_dates, infinite_recur, freq, interval, n_recur = calculate_recur_dates(dtstart, vrecur)
if infinite_recur:
recur_info = "Number of recurs: {}, Interval: {}, Freq: {}".format(str(n_recur)+"++", interval, freq)
else:
recur_info = "Number of recurs: {}, Interval: {}, Freq: {}".format(str(n_recur), interval, freq)
valarm_list = [] # List to hold all VALARM for this event
for subcomponent in component.walk(): # Find all VALARMs for this VEVENT
if subcomponent.name == "VALARM":
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm.get("TRIGGER").dt
valarm_list.append(timedelta) # Add this VALARM to the list
event_dict = {"uid": str(uid), "dtstart": dtstart, "summary": summary, "description": description, "location": location, "recur_dates": recur_dates, "recur_info": recur_info, "valarm": valarm_list, "alert_history": []}
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def get_next_alert(event, current_time):
recur_dates = event["recur_dates"]
valarm_deltas = event["valarm"]
dtstart = event["dtstart"]
next_event = None
if recur_dates[0] is None:
next_event = dtstart
elif current_time <= dtstart:
next_event = dtstart
else:
next_event = recur_dates[0]
if current_time > next_event:
return None, next_event
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if next_alert_list == []:
next_alert = next_event
else:
next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, event):
if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"]))
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return # continue is not needed here as it's the end of function
else:
print("Posting alert for {}!".format(event["summary"]))
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
#xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config)
email_alert.send_email(event, next_alert, next_event, config)
xmpp_alert.send_xmpp(event, next_alert, next_event, config)
with open("alert_history", 'a') as f:
f.write(str(event)) # write expects a str not dict
return
# Create initial event_list using calendar_parser
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
event_dict = calendar_parser(cal_str)
event_list.append(event_dict)
observer = Observer()
handler = FileChangeHandler()
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
try:
while True:
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None:
continue
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["recur_dates"]], next_alert, event["recur_info"], event["alert_history"])
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
process_alert(current_time, next_alert, event)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()