remindme_caldav/remindme_caldav.py

384 lines
16 KiB
Python

from pathlib import Path
from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, os, time
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrulestr
import datetime as dt
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import humanfriendly
from pathlib import Path
import argparse, textwrap, logging
from alert_processor import AlertProcessor
def setup_logger(loglevel):
"""Setup basic logging."""
loglevel = getattr(logging, loglevel.upper(), None)
if not isinstance(loglevel, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(loglevel)
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
args = parser.parse_args()
if args.config is None:
raise ValueError("No config file provided")
return args
def read_file(filename):
try:
return Path(filename).read_text()
except FileNotFoundError as e:
print(f"Error: The specified file does not exist. {e}")
sys.exit(1) # Exit with error code
def parse_toml(content):
try:
return toml.loads(content)
except Exception as e:
print("Error: Failed to parse TOML file.")
print(e)
sys.exit(1) # Exit with error code
def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (dt.datetime, dt.timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
"""
`FileChangeHandler` is a custom event handler for the
`watchdog.observers.Observer` class that handles file system events such as
file modifications, deletions and creations.
"""
def __init__(self, event_list):
self.calendar_parser = CalendarParser() # Create an instance of CalendarParser
self.event_list = event_list
def on_modified(self, event):
print(f"File modified: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
print("Not a valid file: {}. Error: {}".format(event.src_path, e))
return
try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
# Handle the modified event
self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event):
print(f"File deleted: {event.src_path}")
if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event):
print(f"File created: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
print("Not a valid file: {}. Error: {}".format(event.src_path, e))
return
try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
self.handle_modified(old_event=None, event_dict=event_dict)
def handle_modified(self, old_event, event_dict, remove=False):
if not remove:
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash:
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
self.event_list[i] = event_dict
else:
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break
else:
self.event_list.append(event_dict)
else: # If remove is True, remove the event from the list
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
print("Event with UID {} has been deleted".format(old_event["uid"]))
del self.event_list[i]
break
class RecurringEventGenerator:
def __init__(self, dtstart, rrule):
self.dtstart = dtstart
self.rrule = rrule
self.recur_info = {
"recur_dates": [dtstart],
"infinite_recur": False,
"recur_freq": None,
"recur_interval": None,
"n_recur_dates_left": 1
}
def generate(self):
if self.rrule is None:
return self.recur_info
rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8'))
start_date = self.dtstart
infinite_recur = False
freq = self.rrule.get('FREQ')[0]
count = self.rrule.get("COUNT")
interval = self.rrule.get('INTERVAL')[0]
until = self.rrule.get('UNTIL')
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
if count is None or until is not None:
delta = None
if freq == "DAILY":
delta = relativedelta(days=interval)
elif freq == "MONTHLY":
delta = relativedelta(months=interval)
elif freq == "YEARLY":
delta = relativedelta(years=interval)
count = 0
origin_date = start_date
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
infinite_recur = True
ruleset = rrulestr(rule_str, dtstart=start_date)
recur_dates = [None]
n_recur = None
dates = list(ruleset) # Generate future dates according to the rules
recur_dates = [i for i in dates if i >= current_date]
n_recur = "inf" if infinite_recur is True else len(recur_dates)
self.recur_info["recur_dates"] = recur_dates
self.recur_info["infinite_recur"] = infinite_recur
self.recur_info["recur_freq"] = freq
self.recur_info["recur_interval"] = interval
self.recur_info["n_recur_dates_left"] = n_recur
return self.recur_info
class CalendarParser:
def parse_calendar(self, cal_str):
# Parse the calendar
cal = self.parse_icalendar(cal_str)
# Iterate over each event in the calendar
for event in cal.walk('vevent'):
event_dict = self.process_event(event)
dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt)
generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
valarms = self.process_valarm(event)
event_dict = {
"uid": str(event_dict["uid"]),
"dtstart": dtstart,
"summary": event_dict["summary"],
"description": event_dict["description"],
"location": event_dict["location"],
"event_dates": event_dates,
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
recur_info["recur_freq"],
recur_info["recur_interval"],
recur_info["n_recur_dates_left"]
),
"valarms": valarms,
"alert_history": []
}
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def parse_icalendar(self, cal_str):
return Calendar.from_ical(cal_str)
def process_event(self, event):
# Catch errors for missing components
event_info = {
"uid": None,
"dtstart": "",
"exdate": [],
"summary": None,
"description": None,
"location": None,
"rrule": None
}
for info in event_info:
try:
event_info[info] = event[info]
except Exception:
pass
return event_info
def dtstart_to_datetime(self, dtstart):
# Ensure dates are always as datetime
if isinstance(dtstart, dt.datetime):
return dtstart.replace(tzinfo=pytz.UTC)
else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
def remove_exdates(self, exdates, recur_dates):
if exdates != []:
if isinstance(exdates, list):
exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates]
else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates]
else:
return recur_dates
def process_valarm(self, event):
valarms = []
for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt
valarms.append(timedelta)
return valarms
def get_next_alert(event, current_time):
"""
This function returns the next alert that should be processed based on the current time.
"""
event_dates = event["event_dates"]
valarm_deltas = event["valarms"]
if event_dates == [] or event_dates is None or current_time > event_dates[-1]:
return None, None
next_event = [i for i in event_dates if i >= current_time][0]
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if len(next_alert_list) == 0:
next_alert = next_event
else:
next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, next_event, event, config):
"""
This function processes a given alert and passes it to a messaging client.
"""
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
print(current_time, next_alert, next_alert + dt.timedelta(seconds=15))
print(True if current_time >= next_alert else False)
print(True if next_alert < next_alert + dt.timedelta(seconds=15) else False)
if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"]))
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return # continue is not needed here as it's the end of function
else:
print("Posting alert for {}!".format(event["summary"]))
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
#xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config)
processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event)
#processor.send_xmpp(event, next_alert, next_event)
with open("alert_history", 'a') as f:
f.write(str(event)) # write expects a str not dict
return
def main():
# Parse args and config
args = parse_args()
content = read_file(args.config)
config = parse_toml(content)
# Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code
#Parse calendar events
calendar_parser = CalendarParser()
files = list(cal_dir.glob('*.ics'))
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
event_dict = calendar_parser.parse_calendar(cal_str)
event_list.append(event_dict)
#Start file handler to detect changes to calendar dir
observer = Observer()
handler = FileChangeHandler(event_list) # Pass event_list here
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
#Start main loop
try:
while True:
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
process_alert(current_time, next_alert, next_event, event, config)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()