Update logging and error handling in remindme_caldav.py

- Updated the logging configuration to use a custom format and set the log level based on command line arguments.
- Added more detailed error messages for file not found, failed TOML parsing, and failed calendar parsing.
- Improved exception handling during event processing and added more informative error messages.
This commit is contained in:
mrsu 2024-02-16 00:34:53 +00:00
parent a2a508fd31
commit d892cdbd4b
2 changed files with 51 additions and 43 deletions

View File

@ -1,6 +1,7 @@
# Modify to your requirements. See readme for example. # Modify to your requirements. See readme for example.
[app] [app]
calendar_dir = calendar_dir =
[email] [email]
smtp_server = smtp_server =
port = port =

View File

@ -12,15 +12,19 @@ from pathlib import Path
import argparse, textwrap, logging import argparse, textwrap, logging
from alert_processor import AlertProcessor from alert_processor import AlertProcessor
log_format='[%(levelname)s] %(asctime)s %(message)s'
logging.basicConfig(format=log_format)
logger = logging.getLogger() logger = logging.getLogger()
def setup_logging():
log_format='[%(levelname)s] %(asctime)s %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)
def parse_args(): def parse_args():
"""Parse command line arguments.""" """Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--logfile', type=str, help='Path to logfile file. Default logfile', default = "none") parser.add_argument('--logfile', type=str, help='Path to logfile file. Default logfile', default = "none")
parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
return parser.parse_args() return parser.parse_args()
def read_file(filename): def read_file(filename):
@ -28,15 +32,14 @@ def read_file(filename):
return Path(filename).read_text() return Path(filename).read_text()
except FileNotFoundError: except FileNotFoundError:
logger.error("Error: The specified file does not exist.") logger.error("Error: The specified file does not exist.")
sys.exit(1) raise FileNotFoundError("Error: The specified file does not exist.")
def parse_toml(content): def parse_toml(content):
try: try:
config = toml.loads(content) return toml.loads(content)
return config
except Exception: except Exception:
logger.error("Error: Failed to parse TOML file.") logger.error("Error: Failed to parse TOML file.")
sys.exit(1) raise RuntimeError("Error: Failed to parse TOML file.")
def calculate_event_hash(event): def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
@ -66,20 +69,17 @@ class FileChangeHandler(FileSystemEventHandler):
except Exception as e: except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event): def on_deleted(self, event):
logger.info(f"File deleted: {event.src_path}") logger.info(f"File deleted: {event.src_path}")
if not event.is_directory: if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event): def on_created(self, event):
@ -91,13 +91,11 @@ class FileChangeHandler(FileSystemEventHandler):
except Exception as e: except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
def handle_modified(self, old_event, event_dict, remove=False): def handle_modified(self, old_event, event_dict, remove=False):
@ -105,7 +103,6 @@ class FileChangeHandler(FileSystemEventHandler):
for i, old_event in enumerate(self.event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"] old_hash = old_event["hash"]
new_hash = calculate_event_hash(event_dict) new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash: if new_hash != old_hash:
logger.info(f"Event with UID {old_event['uid']} has been modified or deleted") logger.info(f"Event with UID {old_event['uid']} has been modified or deleted")
@ -240,7 +237,7 @@ class CalendarParser:
try: try:
return Calendar.from_ical(cal_str) return Calendar.from_ical(cal_str)
except Exception: except Exception:
raise RuntimeError("Error parsing calendar.") raise RuntimeError("Error parsing icalendar.")
def process_event(self, event): def process_event(self, event):
""" """
@ -337,20 +334,24 @@ def process_alert(current_time, next_alert, next_event, event, config):
processor = AlertProcessor(config) processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event) processor.send_email(event, next_alert, next_event)
except Exception as e: except Exception as e:
raise RuntimeError("Error sending alert for event.") raise RuntimeError(f"Error sending alert for event. {e}")
#processor.send_xmpp(event, next_alert, next_event)
with open("alert_history", 'a') as f: with open("alert_history", 'a') as f:
f.write(str(event)) f.write(str(event))
return return
def main(): def main():
# Parse args and config setup_logging()
logger = logging.getLogger() # Assign a default value to logger
# Parse args
args = parse_args() args = parse_args()
if args.logfile != "none":
file_handler = logging.FileHandler(args.logfile, mode='a') # Set log level
formatter = logging.Formatter(log_format) if args.loglevel is not None:
file_handler.setFormatter(formatter) numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer
logger.addHandler(file_handler) if isinstance(numeric_level, int):
logger = logging.getLogger()
logger.setLevel(numeric_level) # Set the log level
if args.config is None: if args.config is None:
logger.error("No config file provided. Please use --config path_to_config.toml") logger.error("No config file provided. Please use --config path_to_config.toml")
@ -358,48 +359,52 @@ def main():
config_file = read_file(args.config) config_file = read_file(args.config)
config = parse_toml(config_file) config = parse_toml(config_file)
if config is None:
# Write logs to logfile logging.error("Invalid config")
sys.exit(1)
# Get calendar dir # Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"]) cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir(): if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'") logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code sys.exit(1)
#Parse calendar events # Parse calendar events
calendar_parser = CalendarParser() calendar_parser = CalendarParser()
files = list(cal_dir.glob('*.ics')) files = list(cal_dir.glob('*.ics'))
if len(files) == 0: if len(files) == 0:
logger.info("No calendar files in destination location. Did you sync with the caldav server?") logger.error("No calendar files in destination location. Did you sync with the caldav server?")
sys.exit(1) # Exit with error code sys.exit(1)
event_list = [] # List to hold dictionaries for each event event_list = []
for file in files: for file in files:
with open(file, 'r') as f: with open(file, 'r') as f:
cal_str = f.read() cal_str = f.read()
try: try:
event_dict = calendar_parser.parse_calendar(cal_str) event_dict = calendar_parser.parse_calendar(cal_str)
except Exception: except Exception as e:
logger.error(f"Error parsing event, skipping. {file}") logger.warning(f"Error parsing event, skipping. {file}. Error message {e}")
continue continue
event_list.append(event_dict) event_list.append(event_dict)
#Start file handler to detect changes to calendar dir # Start file handler to detect changes to calendar dir
observer = Observer() observer = Observer()
handler = FileChangeHandler(event_list) # Pass event_list here handler = FileChangeHandler(event_list)
observer.schedule(handler, cal_dir, recursive=True) observer.schedule(handler, cal_dir, recursive=True)
observer.start() observer.start()
#Start main loop # Start main loop
try: try:
while True: while True:
with open("status", 'w') as f: with open("status", 'w') as f:
#Refresh the status file f.write("") # Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list: for event in event_list:
next_alert, next_event = get_next_alert(event, current_time) try:
next_alert, next_event = get_next_alert(event, current_time)
except RuntimeError as e:
logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}")
continue
if next_alert == None: if next_alert == None:
continue continue
event_delta = next_alert-current_time event_delta = next_alert-current_time
@ -415,10 +420,12 @@ def main():
Alert history: {event["alert_history"]}\n""" Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status) monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f: with open("status", 'a') as f:
# Write the output to the file f.write(monitor_status) # Write the output to the file
f.write(monitor_status)
f.write("\n") f.write("\n")
process_alert(current_time, next_alert, next_event, event, config) try:
process_alert(current_time, next_alert, next_event, event, config)
except RuntimeError as e:
logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}")
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
observer.stop() observer.stop()