Compare commits

..

2 Commits

Author SHA1 Message Date
mrsu e4c42de74c created CalendarParser class 2024-02-11 19:43:33 +00:00
mrsu f239b4fb23 Complete refactor of script to make it more pythonic 2024-02-10 23:53:15 +00:00
1 changed files with 245 additions and 235 deletions

View File

@ -1,3 +1,4 @@
from pathlib import Path
from icalendar import Calendar, Event from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
@ -9,38 +10,49 @@ from watchdog.events import FileSystemEventHandler
import email_alert, xmpp_alert import email_alert, xmpp_alert
from pprint import pprint from pprint import pprint
import humanfriendly import humanfriendly
from pathlib import Path
import argparse
import textwrap
import logging
# Parse args def setup_logger(loglevel):
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") """Setup basic logging."""
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') loglevel = getattr(logging, loglevel.upper(), None)
args = parser.parse_args()
if not isinstance(loglevel, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(loglevel)
if not args.config: def parse_args():
print("Error: No config file provided.") """Parse command line arguments."""
sys.exit(1) # Exit with error code parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
elif not os.path.isfile(args.config): parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
print("Error: The specified config file does not exist.") parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
sys.exit(1) # Exit with error code args = parser.parse_args()
else: if args.config is None:
print("Config file path: ", args.config) raise ValueError("No config file provided")
return args
# Get config def read_file(filename):
try: try:
with open(args.config, 'r') as f: return Path(filename).read_text()
config = toml.load(f) except FileNotFoundError as e:
except Exception as e: print(f"Error: The specified file does not exist. {e}")
print("Error: Failed to parse TOML file.") sys.exit(1) # Exit with error code
print(e)
sys.exit(1) # Exit with error code def parse_toml(content):
try:
return toml.loads(content)
except Exception as e:
print("Error: Failed to parse TOML file.")
print(e)
sys.exit(1) # Exit with error code
cal_dir = config["app"]["calendar_dir"] def calculate_event_hash(event):
# Check if the path is a directory return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
if not os.path.isdir(cal_dir):
print("The provided path to .ics files does not exist: '{}'".format(cal_dir))
exit(1)
# Get all .ics files from your directory
files = glob.glob(os.path.join(cal_dir, '*.ics'))
class DateTimeEncoder(json.JSONEncoder): class DateTimeEncoder(json.JSONEncoder):
def default(self, o): def default(self, o):
@ -53,29 +65,11 @@ class FileChangeHandler(FileSystemEventHandler):
`FileChangeHandler` is a custom event handler for the `FileChangeHandler` is a custom event handler for the
`watchdog.observers.Observer` class that handles file system events such as `watchdog.observers.Observer` class that handles file system events such as
file modifications, deletions and creations. file modifications, deletions and creations.
It inherits from `watchdog.events.FileSystemEventHandler` providing methods
to handle these events: `on_modified`, `on_deleted`, and `on_created`. Each
method is overridden for specific functionality when a file system event
occurs.
The class also includes a method `calculate_event_hash` that generates an
MD5 hash for each event dictionary based on its contents. This is used to
track changes in the events and determine if they have been modified or
deleted.
For example, when a file is modified:
- It reads the content of the file, parses it into an event dictionary
using `calendar_parser`.
- Calculates the hash for the event and checks if there's already an
existing event with the same UID in `event_list`.
- If there is one, it compares the new hash with the old hash. If they
differ, it prints that the event has been modified or deleted. Otherwise,
it prints that the event hasn't been modified.
For file deletion and creation, similar operations are performed but
without comparison of hashes.
""" """
def __init__(self, event_list):
self.calendar_parser = CalendarParser() # Create an instance of CalendarParser
self.event_list = event_list
def on_modified(self, event): def on_modified(self, event):
print(f"File modified: {event.src_path}") print(f"File modified: {event.src_path}")
if not event.is_directory: if not event.is_directory:
@ -87,7 +81,7 @@ class FileChangeHandler(FileSystemEventHandler):
return return
try: try:
event_dict = calendar_parser(cal_str) event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as i: except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return return
@ -113,7 +107,7 @@ class FileChangeHandler(FileSystemEventHandler):
return return
try: try:
event_dict = calendar_parser(cal_str) event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as i: except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return return
@ -122,175 +116,111 @@ class FileChangeHandler(FileSystemEventHandler):
def handle_modified(self, old_event, event_dict, remove=False): def handle_modified(self, old_event, event_dict, remove=False):
if not remove: if not remove:
for i, old_event in enumerate(event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"] old_hash = old_event["hash"]
new_hash = self.calculate_event_hash(event_dict) new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash: if new_hash != old_hash:
print("Event with UID {} has been modified or deleted".format(old_event["uid"])) print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
event_list[i] = event_dict self.event_list[i] = event_dict
else: else:
print("Event with UID {} hasn't been modified".format(old_event["uid"])) print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break break
else: else:
event_list.append(event_dict) self.event_list.append(event_dict)
else: # If remove is True, remove the event from the list else: # If remove is True, remove the event from the list
for i, old_event in enumerate(event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
print("Event with UID {} has been deleted".format(old_event["uid"])) print("Event with UID {} has been deleted".format(old_event["uid"]))
del event_list[i] del self.event_list[i]
break break
def calculate_event_hash(self, event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
def generate_recurring_event_dates(dtstart, rrule): class RecurringEventGenerator:
""" def __init__(self, dtstart, rrule):
Generate recurring event dates based on a start date and an RRULE. self.dtstart = dtstart
self.rrule = rrule
This function takes in a start date (`dtstart`) and an RRULE (`rrule`), self.recur_info = {
which is used to generate future dates according to the rules specified by
the RRULE.
If no count rule is present, it generates a date array from `dtstart` to
the current datetime, then adds an arbitrary number of dates into the
future (here it's 10). This is done because generating a date array using
the rrulestr function without a count rule will return a very large number
of elements.
The function returns a dictionary containing information about the
recurring event:
- `recur_dates`: A list of future dates generated by the RRULE.
- `infinite_recur`: Boolean indicating whether the recurrence is infinite.
- `recur_freq`: The frequency of the recurrence (e.g., 'DAILY').
- `recur_interval`: The interval between each occurrence of the event.
- `n_recur_dates_left`: The number of future dates left.
Parameters:
dtstart (datetime): The start date of the recurring event.
rrule (rrule): The RRULE object specifying the recurrence rules.
Returns:
dict: A dictionary containing information about the recurring event
dates, frequency, interval and count.
"""
recur_info = {
"recur_dates": [dtstart], "recur_dates": [dtstart],
"infinite_recur": False, "infinite_recur": False,
"recur_freq": None, "recur_freq": None,
"recur_interval": None, "recur_interval": None,
"n_recur_dates_left": None "n_recur_dates_left": None
}
if rrule is None:
return recur_info
rule_str = "RRULE:{}".format(rrule.to_ical().decode('utf-8'))
start_date = dtstart
infinite_recur = False
freq = rrule.get('FREQ')[0]
count = rrule.get("COUNT")
interval = rrule.get('INTERVAL')[0]
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
if count is None:# or until is not None:
delta = None
if freq == "DAILY":
delta = relativedelta(days=interval)
elif freq == "MONTHLY":
delta = relativedelta(months=interval)
elif freq == "YEARLY":
delta = relativedelta(years=interval)
count = 0
origin_date = start_date
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
infinite_recur = True
ruleset = rrulestr(rule_str, dtstart=start_date)
recur_dates = [None]
n_recur = None
dates = list(ruleset) # Generate future dates according to the rules
recur_dates = [i for i in dates if i >= current_date]
n_recur = "inf" if infinite_recur is True else len(recur_dates)
recur_info["recur_dates"] = recur_dates
recur_info["infinite_recur"] = infinite_recur
recur_info["recur_freq"] = freq
recur_info["recur_interval"] = interval
recur_info["n_recur_dates_left"] = n_recur
return recur_info
def calendar_parser(cal_str):
# Parse the calendar
cal = Calendar.from_ical(cal_str)
# Iterate over each event in the calendar
for event in cal.walk('vevent'):
event_info = {
"uid": None,
"dtstart": "",
"exdate": [],
"summary": None,
"description": None,
"location": None,
"rrule": None
} }
# Catch errors for missing components
for info in event_info: def generate(self):
try: if self.rrule is None:
event_info[info] = event[info] return self.recur_info
except Exception:
pass rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8'))
start_date = self.dtstart
uid = str(event_info["uid"]) infinite_recur = False
dtstart = event_info["dtstart"].dt freq = self.rrule.get('FREQ')[0]
exdates = event_info["exdate"] count = self.rrule.get("COUNT")
if exdates is not []: interval = self.rrule.get('INTERVAL')[0]
if isinstance(exdates, list): current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
exdates = [i.dts.dt.replace(tzinfo=pytz.UTC) for i in exdates]
else: if count is None:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] delta = None
summary = event["summary"]
description = event_info["description"] if freq == "DAILY":
location = event_info["location"] delta = relativedelta(days=interval)
rrule = event_info["rrule"] elif freq == "MONTHLY":
delta = relativedelta(months=interval)
# Ensure dates are always as datetime elif freq == "YEARLY":
if isinstance(dtstart, dt.datetime): delta = relativedelta(years=interval)
dtstart = dtstart.replace(tzinfo=pytz.UTC)
else: count = 0
dtstart = dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) origin_date = start_date
while origin_date < current_date:
# Get recurring events if they exist count += interval
recur_info = generate_recurring_event_dates(dtstart, rrule) origin_date += delta*interval
event_dates = recur_info["recur_dates"]
rule_str += ";COUNT={}".format(count+10)
infinite_recur = True
ruleset = rrulestr(rule_str, dtstart=start_date)
recur_dates = [None]
n_recur = None
dates = list(ruleset) # Generate future dates according to the rules
recur_dates = [i for i in dates if i >= current_date]
n_recur = "inf" if infinite_recur is True else len(recur_dates)
self.recur_info["recur_dates"] = recur_dates
self.recur_info["infinite_recur"] = infinite_recur
self.recur_info["recur_freq"] = freq
self.recur_info["recur_interval"] = interval
self.recur_info["n_recur_dates_left"] = n_recur
return self.recur_info
# Remove exdates class CalendarParser:
event_dates = [i for i in event_dates if i not in exdates] def parse_calendar(self, cal_str):
# Parse the calendar
valarms = [] cal = self.parse_icalendar(cal_str)
for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical()) # Iterate over each event in the calendar
timedelta = valarm["trigger"].dt for event in cal.walk('vevent'):
valarms.append(timedelta) event_dict = self.process_event(event)
dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt)
generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
event_dict = { valarms = self.process_valarm(event)
"uid": uid,
event_dict = {
"uid": str(event_dict["uid"]),
"dtstart": dtstart, "dtstart": dtstart,
"summary": summary, "summary": event_dict["summary"],
"description": description, "description": event_dict["description"],
"location": location, "location": event_dict["location"],
"event_dates": event_dates, "event_dates": event_dates,
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format( "recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
recur_info["recur_freq"], recur_info["recur_freq"],
@ -299,24 +229,76 @@ def calendar_parser(cal_str):
), ),
"valarms": valarms, "valarms": valarms,
"alert_history": [] "alert_history": []
} }
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def parse_icalendar(self, cal_str):
return Calendar.from_ical(cal_str)
def process_event(self, event):
# Catch errors for missing components
event_info = {
"uid": None,
"dtstart": "",
"exdate": [],
"summary": None,
"description": None,
"location": None,
"rrule": None
}
for info in event_info:
try:
event_info[info] = event[info]
except Exception:
pass
return event_info
def dtstart_to_datetime(self, dtstart):
# Ensure dates are always as datetime
if isinstance(dtstart, dt.datetime):
return dtstart.replace(tzinfo=pytz.UTC)
else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class def remove_exdates(self, exdates, recur_dates):
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary if exdates != []:
event_dict["hash"] = new_hash # Store the hash in the event dictionary if isinstance(exdates, list):
return event_dict exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates]
else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates]
else:
return recur_dates
def process_valarm(self, event):
valarms = []
for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt
valarms.append(timedelta)
return valarms
def get_next_alert(event, current_time): def get_next_alert(event, current_time):
"""
This function returns the next alert that should be processed based on the current time.
"""
event_dates = event["event_dates"] event_dates = event["event_dates"]
valarm_deltas = event["valarms"] valarm_deltas = event["valarms"]
if event_dates == [] or current_time > event_dates[-1]: if event_dates == [] or event_dates is None or current_time > event_dates[-1]:
return None, None return None, None
next_event = [i for i in event_dates if i >= current_time][0] next_event = [i for i in event_dates if i >= current_time][0]
next_alert_list = [next_event + i for i in valarm_deltas] next_alert_list = [next_event + i for i in valarm_deltas]
next_alert = min(next_alert_list) next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, event): def process_alert(current_time, next_alert, next_event, event, config):
"""
This function processes a given alert and passes it to a messaging client.
"""
if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15): if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0: if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"])) print("First alert for '{}' detected".format(event["summary"]))
@ -333,38 +315,66 @@ def process_alert(current_time, next_alert, event):
f.write(str(event)) # write expects a str not dict f.write(str(event)) # write expects a str not dict
return return
# Create initial event_list using calendar_parser def main():
event_list = [] # List to hold dictionaries for each event # Parse args and config
for file in files: args = parse_args()
with open(file, 'r') as f: content = read_file(args.config)
cal_str = f.read() config = parse_toml(content)
event_dict = calendar_parser(cal_str)
event_list.append(event_dict) # Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code
observer = Observer() #Parse calendar events
handler = FileChangeHandler() calendar_parser = CalendarParser()
observer.schedule(handler, cal_dir, recursive=True) files = list(cal_dir.glob('*.ics'))
observer.start() event_list = [] # List to hold dictionaries for each event
try: for file in files:
while True: with open(file, 'r') as f:
with open("status", 'w') as f: cal_str = f.read()
#Refresh the status file event_dict = calendar_parser.parse_calendar(cal_str)
f.write("") event_list.append(event_dict)
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list: #Start file handler to detect changes to calendar dir
next_alert, next_event = get_next_alert(event, current_time) observer = Observer()
if next_alert == None: handler = FileChangeHandler(event_list) # Pass event_list here
continue observer.schedule(handler, cal_dir, recursive=True)
event_delta = next_alert-current_time observer.start()
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds) #Start main loop
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {} in {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["event_dates"]], next_alert, human_readable_time, event["recur_info"], event["alert_history"]) try:
with open("status", 'a') as f: while True:
# Write the output to the file with open("status", 'w') as f:
f.write(monitor_status) #Refresh the status file
f.write("\n") f.write("")
process_alert(current_time, next_alert, event) current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
time.sleep(1) for event in event_list:
except KeyboardInterrupt: next_alert, next_event = get_next_alert(event, current_time)
observer.stop() if next_alert == None:
observer.join() continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
process_alert(current_time, next_alert, next_event, event, config)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()