Improved logging, error handling and documentation

This commit is contained in:
mrsu 2024-02-12 23:57:58 +00:00
parent 6b2d3bef53
commit 740ac039dd
1 changed files with 142 additions and 47 deletions

View File

@ -30,23 +30,20 @@ def parse_args():
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level') parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
args = parser.parse_args() args = parser.parse_args()
if args.config is None: if args.config is None:
raise ValueError("No config file provided") raise RuntimeError("No config file provided. Please use --config path_to_config.toml")
return args return args
def read_file(filename): def read_file(filename):
try: try:
return Path(filename).read_text() return Path(filename).read_text()
except FileNotFoundError as e: except FileNotFoundError as e:
print(f"Error: The specified file does not exist. {e}") raise RuntimeError(f"Error: The specified file does not exist. {e}")
sys.exit(1) # Exit with error code
def parse_toml(content): def parse_toml(content):
try: try:
return toml.loads(content) return toml.loads(content)
except Exception as e: except Exception as e:
print("Error: Failed to parse TOML file.") raise RuntimeError(f"Error: Failed to parse TOML file. {e}")
print(e)
sys.exit(1) # Exit with error code
def calculate_event_hash(event): def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
@ -64,49 +61,48 @@ class FileChangeHandler(FileSystemEventHandler):
file modifications, deletions and creations. file modifications, deletions and creations.
""" """
def __init__(self, event_list): def __init__(self, event_list):
self.calendar_parser = CalendarParser() # Create an instance of CalendarParser self.calendar_parser = CalendarParser()
self.event_list = event_list self.event_list = event_list
def on_modified(self, event): def on_modified(self, event):
print(f"File modified: {event.src_path}") logging.debug(f"File modified: {event.src_path}")
if not event.is_directory: if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
print("Not a valid file: {}. Error: {}".format(event.src_path, e)) logging.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as i: except Exception as e:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
# Handle the modified event
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event): def on_deleted(self, event):
print(f"File deleted: {event.src_path}") logging.debug(f"File deleted: {event.src_path}")
if not event.is_directory: if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event): def on_created(self, event):
print(f"File created: {event.src_path}") logging.debug(f"File created: {event.src_path}")
if not event.is_directory: if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
print("Not a valid file: {}. Error: {}".format(event.src_path, e)) logging.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as i: except Exception as e:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i)) logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
@ -119,24 +115,32 @@ class FileChangeHandler(FileSystemEventHandler):
new_hash = calculate_event_hash(event_dict) new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash: if new_hash != old_hash:
print("Event with UID {} has been modified or deleted".format(old_event["uid"])) logging.debug(f"Event with UID {old_event['uid']} has been modified or deleted")
self.event_list[i] = event_dict self.event_list[i] = event_dict
else:
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break break
else: else:
self.event_list.append(event_dict) self.event_list.append(event_dict)
else: # If remove is True, remove the event from the list else: # If remove is True, remove the event from the list
for i, old_event in enumerate(self.event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
print("Event with UID {} has been deleted".format(old_event["uid"])) logging.debug(f"Event with UID {old_event['uid']} has been deleted")
del self.event_list[i] del self.event_list[i]
break break
class RecurringEventGenerator: class RecurringEventGenerator:
"""
A class to generate recurring events based on a start date and a recurrence rule.
Attributes:
dtstart (datetime): The starting date of the event series.
rrule (rrule): The recurrence rule for the event series.
Methods:
__init__(self, dtstart, rrule): Initializes the class with a start date and a recurrence rule.
generate(self): Generates the recurring events based on the start date and recurrence rule.
Returns a dictionary containing information about the recurring events.
"""
def __init__(self, dtstart, rrule): def __init__(self, dtstart, rrule):
self.dtstart = dtstart self.dtstart = dtstart
self.rrule = rrule self.rrule = rrule
@ -149,6 +153,8 @@ class RecurringEventGenerator:
} }
def generate(self): def generate(self):
"""
"""
if self.rrule is None: if self.rrule is None:
return self.recur_info return self.recur_info
@ -162,6 +168,12 @@ class RecurringEventGenerator:
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC) current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
if count is None or until is not None: if count is None or until is not None:
# If there is no COUNT value in RRULE, we need to manually calculate
# the dates else rrulestr method will return a very large number of
# values. Here we iterate from the start_date to the current_date based
# on the interval, then add an arbitrary number of days to that (here
# it's 10).
delta = None delta = None
if freq == "DAILY": if freq == "DAILY":
@ -198,6 +210,18 @@ class RecurringEventGenerator:
class CalendarParser: class CalendarParser:
def parse_calendar(self, cal_str): def parse_calendar(self, cal_str):
"""
Parse a calendar string and process each event.
Args:
cal_str (str): The iCalendar string to be parsed.
Returns:
dict: A dictionary containing information about the processed events.
Raises:
RuntimeError: If there are no dates returned for an event or if there is an error calculating the event hash.
"""
# Parse the calendar # Parse the calendar
cal = self.parse_icalendar(cal_str) cal = self.parse_icalendar(cal_str)
@ -209,6 +233,7 @@ class CalendarParser:
generator = RecurringEventGenerator(dtstart, event_dict["rrule"]) generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate() recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"]) event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
valarms = self.process_valarm(event) valarms = self.process_valarm(event)
event_dict = { event_dict = {
@ -226,15 +251,41 @@ class CalendarParser:
"valarms": valarms, "valarms": valarms,
"alert_history": [] "alert_history": []
} }
try:
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
except Exception as e:
raise RuntimeError("Error calculating event hash")
event_dict["hash"] = new_hash # Store the hash in the event dictionary event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict return event_dict
def parse_icalendar(self, cal_str): def parse_icalendar(self, cal_str):
"""
Parse a calendar string into an iCalendar object.
Args:
cal_str (str): The iCalendar string to be parsed.
Returns:
Calendar: An iCalendar object representing the parsed calendar.
Raises:
RuntimeError: If there is an error parsing the calendar.
"""
try:
return Calendar.from_ical(cal_str) return Calendar.from_ical(cal_str)
except Exception as e:
raise RuntimeError(f"Error parsing calendar. Message from icalendar: {e}")
def process_event(self, event): def process_event(self, event):
# Catch errors for missing components """
Process an event from a parsed calendar and extract relevant information.
Args:
event (Event): An iCalendar event object to be processed.
Returns:
dict: A dictionary containing the extracted event information.
"""
event_info = { event_info = {
"uid": None, "uid": None,
"dtstart": "", "dtstart": "",
@ -245,32 +296,72 @@ class CalendarParser:
"rrule": None "rrule": None
} }
# Catch errors for missing components
for info in event_info: for info in event_info:
try: try:
event_info[info] = event[info] event_info[info] = event[info]
except Exception: except Exception:
pass pass
return event_info return event_info
def dtstart_to_datetime(self, dtstart): def dtstart_to_datetime(self, dtstart):
"""
Convert a date or datetime object into a datetime object with UTC timezone.
Args:
dtstart (date/datetime): The date or datetime to be converted.
Returns:
datetime: A datetime object representing the input date or datetime in UTC timezone.
Raises:
RuntimeError: If there is an error converting the input to a datetime object.
"""
# Ensure dates are always as datetime # Ensure dates are always as datetime
try:
if isinstance(dtstart, dt.datetime): if isinstance(dtstart, dt.datetime):
return dtstart.replace(tzinfo=pytz.UTC) return dtstart.replace(tzinfo=pytz.UTC)
else: else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
except Exception as e:
raise RuntimeError(f"Error converting dtstart to datetime. Message: {e}")
def remove_exdates(self, exdates, recur_dates): def remove_exdates(self, exdates, recur_dates):
"""
Remove dates from a list of recurring event dates that are in the exdate list.
Args:
exdates (list): A list of datetime objects representing excluded dates.
recur_dates (list): A list of datetime objects representing recurring event dates.
Returns:
list: A list of datetime objects representing the remaining recurring event dates after removing the exdate dates.
Raises:
RuntimeError: If there is an error processing the exdates.
"""
if exdates != []: if exdates != []:
try:
if isinstance(exdates, list): if isinstance(exdates, list):
exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates] exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates]
else: else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates] return [i for i in recur_dates if i not in exdates]
except Exception as e:
raise RuntimeError(f"Error processing exdates. Message {e}")
else: else:
return recur_dates return recur_dates
def process_valarm(self, event): def process_valarm(self, event):
"""
Process VALARM components from an iCalendar event and extract trigger times.
Args:
event (Event): An iCalendar event object to be processed.
Returns:
list: A list of datetime objects representing the extracted trigger times.
"""
valarms = [] valarms = []
for subcomponent in event.walk("valarm"): for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical()) valarm = Event.from_ical(subcomponent.to_ical())
@ -299,23 +390,23 @@ def process_alert(current_time, next_alert, next_event, event, config):
This function processes a given alert and passes it to a messaging client. This function processes a given alert and passes it to a messaging client.
""" """
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15): if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
print(current_time, next_alert, next_alert + dt.timedelta(seconds=15))
print(True if current_time >= next_alert else False)
print(True if next_alert < next_alert + dt.timedelta(seconds=15) else False)
if len(event["alert_history"]) == 0: if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"])) logging.debug(f"First alert for '{event['summary']}' detected")
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}] event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]: elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return # continue is not needed here as it's the end of function return
else: else:
print("Posting alert for {}!".format(event["summary"])) logging.debug(f"Posting alert for {event['summary']}!")
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}) event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
#xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config) try:
processor = AlertProcessor(config) processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event) processor.send_email(event, next_alert, next_event)
except Exception as e:
raise RuntimeError(f"Error sending alert for event {event['summary']}. Message {e}")
#processor.send_xmpp(event, next_alert, next_event) #processor.send_xmpp(event, next_alert, next_event)
with open("alert_history", 'a') as f: with open("alert_history", 'a') as f:
f.write(str(event)) # write expects a str not dict f.write(str(event))
return return
def main(): def main():
@ -337,7 +428,11 @@ def main():
for file in files: for file in files:
with open(file, 'r') as f: with open(file, 'r') as f:
cal_str = f.read() cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str) event_dict = calendar_parser.parse_calendar(cal_str)
except Exception:
logging.error(f"Error parsing event, skipping. {file}")
continue
event_list.append(event_dict) event_list.append(event_dict)
#Start file handler to detect changes to calendar dir #Start file handler to detect changes to calendar dir