remindme_caldav/remindme_caldav.py

431 lines
17 KiB
Python
Raw Normal View History

from pathlib import Path
from icalendar import Calendar, Event
2024-02-12 01:21:50 +00:00
import toml, argparse, os, sys, hashlib, json, pytz, os, time
2024-02-04 02:53:30 +00:00
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrulestr
2024-02-04 02:53:30 +00:00
import datetime as dt
2024-02-08 15:32:49 +00:00
import time
2024-02-04 02:53:30 +00:00
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import humanfriendly
from pathlib import Path
import argparse, textwrap, logging
from alert_processor import AlertProcessor
2024-02-11 19:43:33 +00:00
log_format='[%(levelname)s] %(asctime)s %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)
logger = logging.getLogger()
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--logfile', type=str, help='Path to logfile file. Default logfile', default = "none")
parser.add_argument('--loglevel', choices=['info', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
return parser.parse_args()
def read_file(filename):
try:
return Path(filename).read_text()
except FileNotFoundError as e:
logger.error("Error: The specified file does not exist.")
sys.exit(1)
def parse_toml(content):
try:
config = toml.loads(content)
return config
except Exception as e:
logger.error("Error: Failed to parse TOML file.")
sys.exit(1)
2024-02-11 19:43:33 +00:00
def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
2024-02-08 15:32:49 +00:00
if isinstance(o, (dt.datetime, dt.timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
"""
`FileChangeHandler` is a custom event handler for the
`watchdog.observers.Observer` class that handles file system events such as
file modifications, deletions and creations.
"""
2024-02-11 19:43:33 +00:00
def __init__(self, event_list):
self.calendar_parser = CalendarParser()
2024-02-11 19:43:33 +00:00
self.event_list = event_list
def on_modified(self, event):
logger.info(f"File modified: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return
try:
2024-02-11 19:43:33 +00:00
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return
self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event):
logger.info(f"File deleted: {event.src_path}")
if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event):
logger.info(f"File created: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return
try:
2024-02-11 19:43:33 +00:00
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return
self.handle_modified(old_event=None, event_dict=event_dict)
def handle_modified(self, old_event, event_dict, remove=False):
if not remove:
2024-02-11 19:43:33 +00:00
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
2024-02-11 19:43:33 +00:00
new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash:
logger.info(f"Event with UID {old_event['uid']} has been modified or deleted")
2024-02-11 19:43:33 +00:00
self.event_list[i] = event_dict
break
else:
2024-02-11 19:43:33 +00:00
self.event_list.append(event_dict)
else: # If remove is True, remove the event from the list
2024-02-11 19:43:33 +00:00
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
logger.info(f"Event with UID {old_event['uid']} has been deleted")
2024-02-11 19:43:33 +00:00
del self.event_list[i]
break
2024-02-11 19:43:33 +00:00
class RecurringEventGenerator:
"""
A class to generate recurring events based on a start date and a recurrence rule.
"""
2024-02-11 19:43:33 +00:00
def __init__(self, dtstart, rrule):
self.dtstart = dtstart
self.rrule = rrule
self.recur_info = {
"recur_dates": [dtstart],
"infinite_recur": False,
"recur_freq": None,
"recur_interval": None,
"n_recur_dates_left": 1
2024-02-11 19:43:33 +00:00
}
def generate(self):
"""
"""
2024-02-11 19:43:33 +00:00
if self.rrule is None:
return self.recur_info
2024-02-11 19:43:33 +00:00
rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8'))
start_date = self.dtstart
infinite_recur = False
freq = self.rrule.get('FREQ')[0]
count = self.rrule.get("COUNT")
interval = self.rrule.get('INTERVAL')[0]
until = self.rrule.get('UNTIL')
2024-02-11 19:43:33 +00:00
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
2024-02-04 02:53:30 +00:00
if count is None or until is not None:
# If there is no COUNT value in RRULE, we need to manually calculate
# the dates else rrulestr method will return a very large number of
# values. Here we iterate from the start_date to the current_date based
# on the interval, then add an arbitrary number of days to that (here
# it's 10).
2024-02-11 19:43:33 +00:00
delta = None
if freq == "DAILY":
delta = relativedelta(days=interval)
elif freq == "MONTHLY":
delta = relativedelta(months=interval)
elif freq == "YEARLY":
delta = relativedelta(years=interval)
count = 0
origin_date = start_date
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
infinite_recur = True
2024-02-04 02:53:30 +00:00
2024-02-11 19:43:33 +00:00
ruleset = rrulestr(rule_str, dtstart=start_date)
recur_dates = [None]
n_recur = None
dates = list(ruleset) # Generate future dates according to the rules
recur_dates = [i for i in dates if i >= current_date]
n_recur = "inf" if infinite_recur is True else len(recur_dates)
self.recur_info["recur_dates"] = recur_dates
self.recur_info["infinite_recur"] = infinite_recur
self.recur_info["recur_freq"] = freq
self.recur_info["recur_interval"] = interval
self.recur_info["n_recur_dates_left"] = n_recur
return self.recur_info
2024-02-11 19:43:33 +00:00
class CalendarParser:
def parse_calendar(self, cal_str):
"""
Parse a calendar string and process each event.
"""
2024-02-11 19:43:33 +00:00
# Parse the calendar
cal = self.parse_icalendar(cal_str)
# Iterate over each event in the calendar
for event in cal.walk('vevent'):
event_dict = self.process_event(event)
dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt)
generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
2024-02-11 19:43:33 +00:00
valarms = self.process_valarm(event)
event_dict = {
"uid": str(event_dict["uid"]),
"dtstart": dtstart,
"summary": event_dict["summary"],
"description": event_dict["description"],
"location": event_dict["location"],
"event_dates": event_dates,
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
recur_info["recur_freq"],
recur_info["recur_interval"],
recur_info["n_recur_dates_left"]
),
"valarms": valarms,
"alert_history": []
}
try:
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
except Exception as e:
raise RuntimeError("Error calculating event hash")
2024-02-11 19:43:33 +00:00
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
2024-02-11 19:43:33 +00:00
def parse_icalendar(self, cal_str):
"""
Parse a calendar string into an iCalendar object.
"""
try:
2024-02-11 19:43:33 +00:00
return Calendar.from_ical(cal_str)
except Exception as e:
raise RuntimeError("Error parsing calendar.")
2024-02-11 19:43:33 +00:00
def process_event(self, event):
"""
Process an event from a parsed calendar and extract relevant information.
"""
2024-02-11 19:43:33 +00:00
event_info = {
"uid": None,
"dtstart": "",
"exdate": [],
"summary": None,
"description": None,
"location": None,
"rrule": None
}
# Catch errors for missing components
for info in event_info:
try:
event_info[info] = event[info]
except Exception:
pass
2024-02-11 19:43:33 +00:00
return event_info
2024-02-11 19:43:33 +00:00
def dtstart_to_datetime(self, dtstart):
"""
Convert a date or datetime object into a datetime object with UTC timezone.
"""
# Ensure dates are always as datetime
try:
if isinstance(dtstart, dt.datetime):
return dtstart.replace(tzinfo=pytz.UTC)
else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
except Exception as e:
raise RuntimeError("Error converting dtstart to datetime.")
2024-02-11 19:43:33 +00:00
def remove_exdates(self, exdates, recur_dates):
"""
Remove dates from a list of recurring event dates that are in the exdate list.
"""
2024-02-11 19:43:33 +00:00
if exdates != []:
try:
if isinstance(exdates, list):
exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates]
else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates]
except Exception as e:
raise RuntimeError("Error processing exdates.")
2024-02-11 19:43:33 +00:00
else:
return recur_dates
def process_valarm(self, event):
"""
Process VALARM components from an iCalendar event and extract trigger times.
"""
valarms = []
for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt
valarms.append(timedelta)
2024-02-11 19:43:33 +00:00
return valarms
def get_next_alert(event, current_time):
2024-02-11 19:43:33 +00:00
"""
Returns the next alert that should be processed based on the current time.
2024-02-11 19:43:33 +00:00
"""
event_dates = event["event_dates"]
valarm_deltas = event["valarms"]
2024-02-11 19:43:33 +00:00
if event_dates == [] or event_dates is None or current_time > event_dates[-1]:
return None, None
next_event = [i for i in event_dates if i >= current_time][0]
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if len(next_alert_list) == 0:
next_alert = next_event
else:
next_alert = min(next_alert_list)
2024-02-08 15:32:49 +00:00
return next_alert - dt.timedelta(seconds=5), next_event
2024-02-04 02:53:30 +00:00
def process_alert(current_time, next_alert, next_event, event, config):
2024-02-11 19:43:33 +00:00
"""
Processes a given alert and passes it to a messaging client.
2024-02-11 19:43:33 +00:00
"""
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0:
logger.info(f"First alert for '{event['summary']}' detected")
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return
else:
logger.info(f"Posting alert for {event['summary']}!")
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
try:
processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event)
except Exception as e:
raise RuntimeError("Error sending alert for event.")
#processor.send_xmpp(event, next_alert, next_event)
with open("alert_history", 'a') as f:
f.write(str(event))
return
2024-02-11 19:43:33 +00:00
def main():
# Parse args and config
args = parse_args()
if args.logfile != "none":
file_handler = logging.FileHandler(args.logfile, mode='a')
formatter = logging.Formatter(log_format)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
if args.config is None:
logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1)
config_file = read_file(args.config)
config = parse_toml(config_file)
# Write logs to logfile
2024-02-11 19:43:33 +00:00
# Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code
#Parse calendar events
calendar_parser = CalendarParser()
files = list(cal_dir.glob('*.ics'))
if len(files) == 0:
logger.info("No calendar files in destination location. Did you sync with the caldav server?")
sys.exit(1) # Exit with error code
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception:
logger.error(f"Error parsing event, skipping. {file}")
continue
event_list.append(event_dict)
2024-02-11 19:43:33 +00:00
#Start file handler to detect changes to calendar dir
observer = Observer()
2024-02-11 19:43:33 +00:00
handler = FileChangeHandler(event_list) # Pass event_list here
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
2024-02-11 19:43:33 +00:00
#Start main loop
try:
while True:
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
process_alert(current_time, next_alert, next_event, event, config)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()