From d892cdbd4b0befa87875ef46fde3617e124b0866 Mon Sep 17 00:00:00 2001 From: mrsu Date: Fri, 16 Feb 2024 00:34:53 +0000 Subject: [PATCH] Update logging and error handling in remindme_caldav.py - Updated the logging configuration to use a custom format and set the log level based on command line arguments. - Added more detailed error messages for file not found, failed TOML parsing, and failed calendar parsing. - Improved exception handling during event processing and added more informative error messages. --- config.toml | 1 + remindme_caldav.py | 93 +++++++++++++++++++++++++--------------------- 2 files changed, 51 insertions(+), 43 deletions(-) diff --git a/config.toml b/config.toml index 5de5292..79766d1 100644 --- a/config.toml +++ b/config.toml @@ -1,6 +1,7 @@ # Modify to your requirements. See readme for example. [app] calendar_dir = + [email] smtp_server = port = diff --git a/remindme_caldav.py b/remindme_caldav.py index a4f7689..c0ea9b1 100644 --- a/remindme_caldav.py +++ b/remindme_caldav.py @@ -12,15 +12,19 @@ from pathlib import Path import argparse, textwrap, logging from alert_processor import AlertProcessor -log_format='[%(levelname)s] %(asctime)s %(message)s' -logging.basicConfig(format=log_format) logger = logging.getLogger() +def setup_logging(): + log_format='[%(levelname)s] %(asctime)s %(message)s' + logging.basicConfig(format=log_format, level=logging.INFO) + def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--logfile', type=str, help='Path to logfile file. Default logfile', default = "none") + parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", + type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) return parser.parse_args() def read_file(filename): @@ -28,15 +32,14 @@ def read_file(filename): return Path(filename).read_text() except FileNotFoundError: logger.error("Error: The specified file does not exist.") - sys.exit(1) + raise FileNotFoundError("Error: The specified file does not exist.") def parse_toml(content): try: - config = toml.loads(content) - return config + return toml.loads(content) except Exception: logger.error("Error: Failed to parse TOML file.") - sys.exit(1) + raise RuntimeError("Error: Failed to parse TOML file.") def calculate_event_hash(event): return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() @@ -66,20 +69,17 @@ class FileChangeHandler(FileSystemEventHandler): except Exception as e: logger.error(f"Not a valid file: {event.src_path}. Error: {e}") return - try: event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as e: logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") - return - + return self.handle_modified(old_event=None, event_dict=event_dict) def on_deleted(self, event): logger.info(f"File deleted: {event.src_path}") if not event.is_directory: uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension - self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) def on_created(self, event): @@ -91,13 +91,11 @@ class FileChangeHandler(FileSystemEventHandler): except Exception as e: logger.error(f"Not a valid file: {event.src_path}. Error: {e}") return - try: event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method except Exception as e: logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") - return - + return self.handle_modified(old_event=None, event_dict=event_dict) def handle_modified(self, old_event, event_dict, remove=False): @@ -105,7 +103,6 @@ class FileChangeHandler(FileSystemEventHandler): for i, old_event in enumerate(self.event_list): if old_event["uid"] == event_dict["uid"]: old_hash = old_event["hash"] - new_hash = calculate_event_hash(event_dict) if new_hash != old_hash: logger.info(f"Event with UID {old_event['uid']} has been modified or deleted") @@ -240,7 +237,7 @@ class CalendarParser: try: return Calendar.from_ical(cal_str) except Exception: - raise RuntimeError("Error parsing calendar.") + raise RuntimeError("Error parsing icalendar.") def process_event(self, event): """ @@ -337,69 +334,77 @@ def process_alert(current_time, next_alert, next_event, event, config): processor = AlertProcessor(config) processor.send_email(event, next_alert, next_event) except Exception as e: - raise RuntimeError("Error sending alert for event.") - #processor.send_xmpp(event, next_alert, next_event) + raise RuntimeError(f"Error sending alert for event. {e}") with open("alert_history", 'a') as f: f.write(str(event)) return def main(): - # Parse args and config - args = parse_args() - if args.logfile != "none": - file_handler = logging.FileHandler(args.logfile, mode='a') - formatter = logging.Formatter(log_format) - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) + setup_logging() + logger = logging.getLogger() # Assign a default value to logger + # Parse args + args = parse_args() + + # Set log level + if args.loglevel is not None: + numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer + if isinstance(numeric_level, int): + logger = logging.getLogger() + logger.setLevel(numeric_level) # Set the log level + if args.config is None: logger.error("No config file provided. Please use --config path_to_config.toml") sys.exit(1) config_file = read_file(args.config) config = parse_toml(config_file) + if config is None: + logging.error("Invalid config") + sys.exit(1) - # Write logs to logfile - # Get calendar dir cal_dir = Path(config["app"]["calendar_dir"]) if not cal_dir.is_dir(): - print(f"The provided path to .ics files does not exist: '{cal_dir}'") - sys.exit(1) # Exit with error code + logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'") + sys.exit(1) - #Parse calendar events + # Parse calendar events calendar_parser = CalendarParser() files = list(cal_dir.glob('*.ics')) if len(files) == 0: - logger.info("No calendar files in destination location. Did you sync with the caldav server?") - sys.exit(1) # Exit with error code + logger.error("No calendar files in destination location. Did you sync with the caldav server?") + sys.exit(1) - event_list = [] # List to hold dictionaries for each event + event_list = [] for file in files: with open(file, 'r') as f: cal_str = f.read() try: event_dict = calendar_parser.parse_calendar(cal_str) - except Exception: - logger.error(f"Error parsing event, skipping. {file}") + except Exception as e: + logger.warning(f"Error parsing event, skipping. {file}. Error message {e}") continue event_list.append(event_dict) - #Start file handler to detect changes to calendar dir + # Start file handler to detect changes to calendar dir observer = Observer() - handler = FileChangeHandler(event_list) # Pass event_list here + handler = FileChangeHandler(event_list) observer.schedule(handler, cal_dir, recursive=True) observer.start() - #Start main loop + # Start main loop try: while True: with open("status", 'w') as f: - #Refresh the status file - f.write("") + f.write("") # Refresh the status file current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) for event in event_list: - next_alert, next_event = get_next_alert(event, current_time) + try: + next_alert, next_event = get_next_alert(event, current_time) + except RuntimeError as e: + logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}") + continue if next_alert == None: continue event_delta = next_alert-current_time @@ -415,10 +420,12 @@ def main(): Alert history: {event["alert_history"]}\n""" monitor_status = textwrap.dedent(monitor_status) with open("status", 'a') as f: - # Write the output to the file - f.write(monitor_status) + f.write(monitor_status) # Write the output to the file f.write("\n") - process_alert(current_time, next_alert, next_event, event, config) + try: + process_alert(current_time, next_alert, next_event, event, config) + except RuntimeError as e: + logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}") time.sleep(1) except KeyboardInterrupt: observer.stop()