remindme_caldav/remindme_caldav.py

463 lines
19 KiB
Python
Raw Normal View History

from pathlib import Path
from icalendar import Calendar, Event
2024-02-12 01:21:50 +00:00
import toml, argparse, os, sys, hashlib, json, pytz, os, time
2024-02-04 02:53:30 +00:00
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrulestr
2024-02-04 02:53:30 +00:00
import datetime as dt
2024-02-08 15:32:49 +00:00
import time
2024-02-04 02:53:30 +00:00
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import humanfriendly
from pathlib import Path
import argparse, textwrap, logging
from alert_processor import AlertProcessor
2024-02-11 19:43:33 +00:00
logger = logging.getLogger()
def setup_log_location(logdir):
if not Path.is_dir(logdir):
raise FileNotFoundError(f"Log dir '{logdir}' does not exist. Be sure to create it first.")
log_location = os.path.join(logdir, "log")
status_location = os.path.join(logdir, "status")
alert_history_location = os.path.join(logdir, "alert_history")
return log_location, status_location, alert_history_location
def setup_logging(log_location):
log_format='[%(levelname)s] %(asctime)s %(message)s'
logging.basicConfig(filename = log_location, format=log_format, level=logging.INFO)
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--logdir', type=str, help='Path to logfile directory', default = "logs")
parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
args = parser.parse_args()
if args.config is None:
logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1)
return args
def read_file(filename):
try:
return Path(filename).read_text()
except FileNotFoundError:
raise FileNotFoundError("Error: The specified file does not exist.")
def parse_toml(content):
try:
config = toml.loads(content)
if config is None:
logging.error("Invalid config")
sys.exit(1)
return config
except Exception:
raise RuntimeError("Error: Failed to parse TOML file.")
def get_calendar_dir(config):
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1)
return cal_dir
def parse_calendar_files(cal_dir):
files = []
no_files_detected = True
logger.info(f"Looking for calendar files in {cal_dir}...")
while no_files_detected is True:
files = list(cal_dir.glob('*.ics'))
if len(files) != 0:
logger.info("Calendar files detected in sync location!")
no_files_detected = False
return files
def construct_initial_event_dict(cal_dir):
files = parse_calendar_files(cal_dir)
calendar_parser = CalendarParser()
event_list = []
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception as e:
logger.warning(f"Error parsing event, skipping. {file}. Error message {e}")
continue
event_list.append(event_dict)
return event_list
2024-02-11 19:43:33 +00:00
def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
2024-02-08 15:32:49 +00:00
if isinstance(o, (dt.datetime, dt.timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
"""
`FileChangeHandler` is a custom event handler for the
`watchdog.observers.Observer` class that handles file system events such as
file modifications, deletions and creations.
"""
2024-02-11 19:43:33 +00:00
def __init__(self, event_list):
self.calendar_parser = CalendarParser()
2024-02-11 19:43:33 +00:00
self.event_list = event_list
def on_modified(self, event):
logger.info(f"File modified: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return
try:
2024-02-11 19:43:33 +00:00
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return
self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event):
logger.info(f"File deleted: {event.src_path}")
if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event):
logger.info(f"File created: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
logger.warning(f"Not a valid file: {event.src_path}. Error: {e}")
return
try:
2024-02-11 19:43:33 +00:00
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e:
logger.warning(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return
self.handle_modified(old_event=None, event_dict=event_dict)
def handle_modified(self, old_event, event_dict, remove=False):
if not remove:
2024-02-11 19:43:33 +00:00
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
2024-02-11 19:43:33 +00:00
new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash:
logger.info(f"Event with UID {old_event['uid']} has been modified or deleted")
2024-02-11 19:43:33 +00:00
self.event_list[i] = event_dict
break
else:
2024-02-11 19:43:33 +00:00
self.event_list.append(event_dict)
else: # If remove is True, remove the event from the list
2024-02-11 19:43:33 +00:00
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
logger.info(f"Event with UID {old_event['uid']} has been deleted")
2024-02-11 19:43:33 +00:00
del self.event_list[i]
break
2024-02-11 19:43:33 +00:00
class RecurringEventGenerator:
"""
A class to generate recurring events based on a start date and a recurrence rule.
"""
2024-02-11 19:43:33 +00:00
def __init__(self, dtstart, rrule):
self.dtstart = dtstart
self.rrule = rrule
self.recur_info = {
"recur_dates": [dtstart],
"infinite_recur": False,
"recur_freq": None,
"recur_interval": None,
"n_recur_dates_left": 1
2024-02-11 19:43:33 +00:00
}
def generate(self):
"""
"""
2024-02-11 19:43:33 +00:00
if self.rrule is None:
return self.recur_info
2024-02-11 19:43:33 +00:00
rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8'))
start_date = self.dtstart
infinite_recur = False
freq = self.rrule.get('FREQ')[0]
count = self.rrule.get("COUNT")
interval = self.rrule.get('INTERVAL')[0]
until = self.rrule.get('UNTIL')
2024-02-11 19:43:33 +00:00
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
2024-02-04 02:53:30 +00:00
if count is None or until is not None:
# If there is no COUNT value in RRULE, we need to manually calculate
# the dates else rrulestr method will return a very large number of
# values. Here we iterate from the start_date to the current_date based
# on the interval, then add an arbitrary number of days to that (here
# it's 10).
2024-02-11 19:43:33 +00:00
delta = None
if freq == "DAILY":
delta = relativedelta(days=interval)
elif freq == "MONTHLY":
delta = relativedelta(months=interval)
elif freq == "YEARLY":
delta = relativedelta(years=interval)
count = 0
origin_date = start_date
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
infinite_recur = True
2024-02-04 02:53:30 +00:00
2024-02-11 19:43:33 +00:00
ruleset = rrulestr(rule_str, dtstart=start_date)
recur_dates = [None]
n_recur = None
dates = list(ruleset) # Generate future dates according to the rules
recur_dates = [i for i in dates if i >= current_date]
n_recur = "inf" if infinite_recur is True else len(recur_dates)
self.recur_info["recur_dates"] = recur_dates
self.recur_info["infinite_recur"] = infinite_recur
self.recur_info["recur_freq"] = freq
self.recur_info["recur_interval"] = interval
self.recur_info["n_recur_dates_left"] = n_recur
return self.recur_info
2024-02-11 19:43:33 +00:00
class CalendarParser:
def parse_calendar(self, cal_str):
"""
Parse a calendar string and process each event.
"""
2024-02-11 19:43:33 +00:00
# Parse the calendar
cal = self.parse_icalendar(cal_str)
# Iterate over each event in the calendar
for event in cal.walk('vevent'):
event_dict = self.process_event(event)
dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt)
generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
if len(event_dates) == 0:
logging.warning(f"No event dates for event: '{event['summary']}'")
2024-02-11 19:43:33 +00:00
valarms = self.process_valarm(event)
event_dict = {
"uid": str(event_dict["uid"]),
"dtstart": dtstart,
"summary": event_dict["summary"],
"description": event_dict["description"],
"location": event_dict["location"],
"event_dates": event_dates,
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
recur_info["recur_freq"],
recur_info["recur_interval"],
recur_info["n_recur_dates_left"]
),
"valarms": valarms,
"alert_history": []
}
try:
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
except Exception:
raise RuntimeError("Error calculating event hash")
2024-02-11 19:43:33 +00:00
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
2024-02-11 19:43:33 +00:00
def parse_icalendar(self, cal_str):
"""
Parse a calendar string into an iCalendar object.
"""
try:
2024-02-11 19:43:33 +00:00
return Calendar.from_ical(cal_str)
except Exception:
raise RuntimeError("Error parsing icalendar.")
2024-02-11 19:43:33 +00:00
def process_event(self, event):
"""
Process an event from a parsed calendar and extract relevant information.
"""
2024-02-11 19:43:33 +00:00
event_info = {
"uid": None,
"dtstart": "",
"exdate": [],
"summary": None,
"description": None,
"location": None,
"rrule": None
}
# Catch errors for missing components
for info in event_info:
try:
event_info[info] = event[info]
except Exception:
logging.info(f"CalDav componant '{info}' missing for event {event['summary']}")
2024-02-11 19:43:33 +00:00
return event_info
2024-02-11 19:43:33 +00:00
def dtstart_to_datetime(self, dtstart):
"""
Convert a date or datetime object into a datetime object with UTC timezone.
"""
# Ensure dates are always as datetime
try:
if isinstance(dtstart, dt.datetime):
return dtstart.replace(tzinfo=pytz.UTC)
else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
except Exception:
raise RuntimeError("Error converting dtstart to datetime.")
2024-02-11 19:43:33 +00:00
def remove_exdates(self, exdates, recur_dates):
"""
Remove dates from a list of recurring event dates that are in the exdate list.
"""
2024-02-11 19:43:33 +00:00
if exdates != []:
try:
if isinstance(exdates, list):
exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates]
else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates]
except Exception:
raise RuntimeError("Error processing exdates.")
2024-02-11 19:43:33 +00:00
else:
return recur_dates
def process_valarm(self, event):
"""
Process VALARM components from an iCalendar event and extract trigger times.
"""
valarms = []
for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt
valarms.append(timedelta)
if len(valarms) == 0:
logging.info(f"No reminders for event: {event['summary']}")
2024-02-11 19:43:33 +00:00
return valarms
def get_next_alert(event, current_time):
2024-02-11 19:43:33 +00:00
"""
Returns the next alert that should be processed based on the current time.
2024-02-11 19:43:33 +00:00
"""
event_dates = event["event_dates"]
valarm_deltas = event["valarms"]
2024-02-11 19:43:33 +00:00
if event_dates == [] or event_dates is None or current_time > event_dates[-1]:
return None, None
next_event = [i for i in event_dates if i >= current_time][0]
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if len(next_alert_list) == 0:
next_alert = next_event
else:
next_alert = min(next_alert_list)
2024-02-08 15:32:49 +00:00
return next_alert - dt.timedelta(seconds=5), next_event
2024-02-04 02:53:30 +00:00
def process_alert(current_time, next_alert, next_event, event, config, alert_history_location):
2024-02-11 19:43:33 +00:00
"""
Processes a given alert and passes it to a messaging client.
2024-02-11 19:43:33 +00:00
"""
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0:
logger.info(f"First alert for '{event['summary']}' detected")
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return
else:
logger.info(f"Posting alert for {event['summary']}!")
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
try:
processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event)
except Exception as e:
raise RuntimeError(f"Error sending alert for event. {e}")
with open(alert_history_location, 'a') as f:
f.write(str(event))
return
def daemon(status_location, alert_history_location, config, event_list):
with open(status_location, 'w') as f:
f.write("") # Refresh the status file
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
try:
next_alert, next_event = get_next_alert(event, current_time)
except RuntimeError as e:
logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}")
continue
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open(status_location, 'a') as f:
f.write(monitor_status) # Write the output to the file
f.write("\n")
try:
process_alert(current_time, next_alert, next_event, event, config, alert_history_location)
except RuntimeError as e:
logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}")
return
2024-02-11 19:43:33 +00:00
def main():
# Parse args and initiate logging
2024-02-11 19:43:33 +00:00
args = parse_args()
log_location, status_location, alert_history_location = setup_log_location(args.logdir)
setup_logging(log_location)
logger = logging.getLogger()
# Redefine log level if args passed
if args.loglevel is not None:
numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer
if isinstance(numeric_level, int):
logger = logging.getLogger()
logger.setLevel(numeric_level) # Set the log level
# Setup initial event_list
config_file = read_file(args.config)
config = parse_toml(config_file)
cal_dir = get_calendar_dir(config)
event_list = construct_initial_event_dict(cal_dir)
# Start file handler to detect changes to calendar dir
observer = Observer()
handler = FileChangeHandler(event_list)
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
# Start main loop
try:
while True:
daemon(status_location, alert_history_location, config, event_list)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()