Refactor remindme_caldav.py for improved code maintainability and readability

* Refactored `remindme_caldav.py` to separate concerns into different functions:
  - `calendar_parser()`: Parses the calendar and generates event dictionaries.
  - `generate_recurring_event_dates()`: Generates recurring event dates based on a start date and an RRULE.
  - `get_next_alert()`: Calculates the next alert time for each event.
  - `process_alert()`: Processes alerts for each event.
* Implemented error handling in `calendar_parser()` to catch missing components without crashing the program.
* Added comments and docstrings for better code readability and maintainability.
* Modified `get_next_alert()` to handle events with no recurring dates.
* Updated `process_alert()` to subtract 15 seconds from the next alert time to ensure alerts are triggered before the event occurs.
* Added a human-readable format for the time until the next alert in the status message.
This commit is contained in:
mrsu 2024-02-10 19:37:37 +00:00
parent 92e1578bbc
commit f6b1b2d5ee
1 changed files with 201 additions and 98 deletions

View File

@ -7,6 +7,8 @@ import time
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
import email_alert, xmpp_alert import email_alert, xmpp_alert
from pprint import pprint
import humanfriendly
# Parse args # Parse args
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
@ -47,9 +49,36 @@ class DateTimeEncoder(json.JSONEncoder):
return super().default(o) return super().default(o)
class FileChangeHandler(FileSystemEventHandler): class FileChangeHandler(FileSystemEventHandler):
"""
`FileChangeHandler` is a custom event handler for the
`watchdog.observers.Observer` class that handles file system events such as
file modifications, deletions and creations.
It inherits from `watchdog.events.FileSystemEventHandler` providing methods
to handle these events: `on_modified`, `on_deleted`, and `on_created`. Each
method is overridden for specific functionality when a file system event
occurs.
The class also includes a method `calculate_event_hash` that generates an
MD5 hash for each event dictionary based on its contents. This is used to
track changes in the events and determine if they have been modified or
deleted.
For example, when a file is modified:
- It reads the content of the file, parses it into an event dictionary
using `calendar_parser`.
- Calculates the hash for the event and checks if there's already an
existing event with the same UID in `event_list`.
- If there is one, it compares the new hash with the old hash. If they
differ, it prints that the event has been modified or deleted. Otherwise,
it prints that the event hasn't been modified.
For file deletion and creation, similar operations are performed but
without comparison of hashes.
"""
def on_modified(self, event): def on_modified(self, event):
print(f"File modified: {event.src_path}") print(f"File modified: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
@ -57,50 +86,25 @@ class FileChangeHandler(FileSystemEventHandler):
print("Not a valid file: {}. Error: {}".format(event.src_path, e)) print("Not a valid file: {}. Error: {}".format(event.src_path, e))
return return
# Parse the calendar
try: try:
event_dict = calendar_parser(cal_str) event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i: except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return return
for i, old_event in enumerate(event_list): # Handle the modified event
if old_event["uid"] == event_dict["uid"]: self.handle_modified(old_event=None, event_dict=event_dict)
old_hash = old_event["hash"]
if new_hash != old_hash: # If the hashes don't match, it means that the event has been modified or deleted
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
# Update the event in the list
event_list[i] = event_dict
else: # If the hashes match, it means that the event hasn't been modified
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def on_deleted(self, event): def on_deleted(self, event):
print(f"File deleted: {event.src_path}") print(f"File deleted: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory if not event.is_directory:
print(str(dt.datetime.now()), "Sync detected, updating events") uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
for i, old_event in enumerate(event_list): self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
if old_event["uid"] == uid: # If the same event is found
print("Event with UID {} has been deleted".format(old_event["uid"]))
# Remove the event from the list
del event_list[i]
break
def on_created(self, event): def on_created(self, event):
print(f"File created: {event.src_path}") print(f"File created: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory if not event.is_directory:
print(str(dt.datetime.now()), "Sync detected, updating events")
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
@ -110,32 +114,89 @@ class FileChangeHandler(FileSystemEventHandler):
try: try:
event_dict = calendar_parser(cal_str) event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i: except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return return
for old_event in event_list: self.handle_modified(old_event=None, event_dict=event_dict)
if old_event["uid"] == event_dict["uid"]: # If the same event is found
print("Event with UID {} already exists".format(old_event["uid"])) def handle_modified(self, old_event, event_dict, remove=False):
if not remove:
for i, old_event in enumerate(event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
new_hash = self.calculate_event_hash(event_dict)
if new_hash != old_hash:
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
event_list[i] = event_dict
else:
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break break
else: # If no matching event is found in the list, add the new event to the list else:
event_list.append(event_dict) event_list.append(event_dict)
else: # If remove is True, remove the event from the list
for i, old_event in enumerate(event_list):
if old_event["uid"] == event_dict["uid"]:
print("Event with UID {} has been deleted".format(old_event["uid"]))
del event_list[i]
break
def calculate_event_hash(self, event): def calculate_event_hash(self, event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
def calculate_recur_dates(dtstart, vrecur): def generate_recurring_event_dates(dtstart, rrule):
rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8')) """
Generate recurring event dates based on a start date and an RRULE.
This function takes in a start date (`dtstart`) and an RRULE (`rrule`),
which is used to generate future dates according to the rules specified by
the RRULE.
If no count rule is present, it generates a date array from `dtstart` to
the current datetime, then adds an arbitrary number of dates into the
future (here it's 10). This is done because generating a date array using
the rrulestr function without a count rule will return a very large number
of elements.
The function returns a dictionary containing information about the
recurring event:
- `recur_dates`: A list of future dates generated by the RRULE.
- `infinite_recur`: Boolean indicating whether the recurrence is infinite.
- `recur_freq`: The frequency of the recurrence (e.g., 'DAILY').
- `recur_interval`: The interval between each occurrence of the event.
- `n_recur_dates_left`: The number of future dates left.
Parameters:
dtstart (datetime): The start date of the recurring event.
rrule (rrule): The RRULE object specifying the recurrence rules.
Returns:
dict: A dictionary containing information about the recurring event
dates, frequency, interval and count.
"""
recur_info = {
"recur_dates": [dtstart],
"infinite_recur": False,
"recur_freq": None,
"recur_interval": None,
"n_recur_dates_left": None
}
if rrule is None:
return recur_info
rule_str = "RRULE:{}".format(rrule.to_ical().decode('utf-8'))
start_date = dtstart start_date = dtstart
infinite_recur = False infinite_recur = False
freq = vrecur.get('FREQ')[0] freq = rrule.get('FREQ')[0]
interval = vrecur.get('INTERVAL')[0] count = rrule.get("COUNT")
interval = rrule.get('INTERVAL')[0]
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
if vrecur.get("COUNT") is None: if count is None:# or until is not None:
# If no COUNT, calculate an end date based on FREQ and INTERVAL to prevent generating too many dates
delta = None delta = None
if freq == "DAILY": if freq == "DAILY":
@ -150,70 +211,109 @@ def calculate_recur_dates(dtstart, vrecur):
while origin_date < current_date: while origin_date < current_date:
count += interval count += interval
origin_date += delta*interval origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10) rule_str += ";COUNT={}".format(count+10)
infinite_recur = True infinite_recur = True
ruleset = rrulestr(rule_str, dtstart=start_date) ruleset = rrulestr(rule_str, dtstart=start_date)
# Generate future dates according to the rules recur_dates = [None]
dates = list(ruleset) n_recur = None
n_recur = len([d for d in dates if d > current_date]) dates = list(ruleset) # Generate future dates according to the rules
return [d for d in dates if d > start_date], infinite_recur, freq, interval, n_recur recur_dates = [i for i in dates if i >= current_date]
n_recur = "inf" if infinite_recur is True else len(recur_dates)
recur_info["recur_dates"] = recur_dates
recur_info["infinite_recur"] = infinite_recur
recur_info["recur_freq"] = freq
recur_info["recur_interval"] = interval
recur_info["n_recur_dates_left"] = n_recur
return recur_info
def calendar_parser(cal_str): def calendar_parser(cal_str):
# Parse the calendar # Parse the calendar
cal = Calendar.from_ical(cal_str) cal = Calendar.from_ical(cal_str)
for component in cal.walk(): # Iterate over each event in the calendar
if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary for event in cal.walk('vevent'):
uid = component.get("UID")
dtstart = component.get("DTSTART").dt event_info = {
dtstart = dtstart if isinstance(dtstart, dt.datetime) else dt.datetime.combine(dtstart, dt.time.min) # Ensure dates are always as datetime "uid": None,
"dtstart": "",
"exdate": [],
"summary": None,
"description": None,
"location": None,
"rrule": None
}
# Catch errors for missing components
for info in event_info:
try:
event_info[info] = event[info]
except Exception:
pass
uid = str(event_info["uid"])
dtstart = event_info["dtstart"].dt
exdates = event_info["exdate"]
if exdates is not []:
if isinstance(exdates, list):
exdates = [i.dts.dt.replace(tzinfo=pytz.UTC) for i in exdates]
else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
summary = event["summary"]
description = event_info["description"]
location = event_info["location"]
rrule = event_info["rrule"]
# Ensure dates are always as datetime
if isinstance(dtstart, dt.datetime):
dtstart = dtstart.replace(tzinfo=pytz.UTC) dtstart = dtstart.replace(tzinfo=pytz.UTC)
summary = component.get("SUMMARY") else:
description = component.get("DESCRIPTION") dtstart = dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
location = component.get("LOCATION")
vrecur = component.get("RRULE") # Get recurring events if they exist
recur_dates = [None] recur_info = generate_recurring_event_dates(dtstart, rrule)
recur_info=None event_dates = recur_info["recur_dates"]
if vrecur is not None:
recur_dates, infinite_recur, freq, interval, n_recur = calculate_recur_dates(dtstart, vrecur)
if infinite_recur:
recur_info = "Number of recurs: {}, Interval: {}, Freq: {}".format(str(n_recur)+"++", interval, freq)
else:
recur_info = "Number of recurs: {}, Interval: {}, Freq: {}".format(str(n_recur), interval, freq)
valarm_list = [] # List to hold all VALARM for this event # Remove exdates
for subcomponent in component.walk(): # Find all VALARMs for this VEVENT event_dates = [i for i in event_dates if i not in exdates]
if subcomponent.name == "VALARM":
valarm = Event.from_ical(subcomponent.to_ical()) valarms = []
timedelta = valarm.get("TRIGGER").dt for subcomponent in event.walk("valarm"):
valarm_list.append(timedelta) # Add this VALARM to the list valarm = Event.from_ical(subcomponent.to_ical())
event_dict = {"uid": str(uid), "dtstart": dtstart, "summary": summary, "description": description, "location": location, "recur_dates": recur_dates, "recur_info": recur_info, "valarm": valarm_list, "alert_history": []} timedelta = valarm["trigger"].dt
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class valarms.append(timedelta)
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary event_dict = {
return event_dict "uid": uid,
"dtstart": dtstart,
"summary": summary,
"description": description,
"location": location,
"event_dates": event_dates,
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
recur_info["recur_freq"],
recur_info["recur_interval"],
recur_info["n_recur_dates_left"]
),
"valarms": valarms,
"alert_history": []
}
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def get_next_alert(event, current_time): def get_next_alert(event, current_time):
recur_dates = event["recur_dates"] event_dates = event["event_dates"]
valarm_deltas = event["valarm"] valarm_deltas = event["valarms"]
dtstart = event["dtstart"] if event_dates == [] or current_time > event_dates[-1]:
next_event = None return None, None
if recur_dates[0] is None: next_event = [i for i in event_dates if i >= current_time][0]
next_event = dtstart next_alert_list = [next_event + i for i in valarm_deltas]
elif current_time <= dtstart: next_alert = min(next_alert_list)
next_event = dtstart
else:
next_event = recur_dates[0]
if current_time > next_event:
return None, next_event
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if next_alert_list == []:
next_alert = next_event
else:
next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, event): def process_alert(current_time, next_alert, event):
@ -255,7 +355,10 @@ try:
next_alert, next_event = get_next_alert(event, current_time) next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None: if next_alert == None:
continue continue
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["recur_dates"]], next_alert, event["recur_info"], event["alert_history"]) event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {} in {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["event_dates"]], next_alert, human_readable_time, event["recur_info"], event["alert_history"])
with open("status", 'a') as f: with open("status", 'a') as f:
# Write the output to the file # Write the output to the file
f.write(monitor_status) f.write(monitor_status)