Update logging and error handling

* Move from basic logging to more detailed logging with a log file.
* Improve error handling in several functions.
* Modify the way log messages are displayed.
This commit is contained in:
Sam 2024-02-15 18:12:13 +00:00
parent ef00f112f3
commit 89994c1f0a
3 changed files with 27 additions and 27 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
config_local.toml config_local.toml
status status
alert_history alert_history
log
__pycache__ __pycache__

View File

@ -12,25 +12,23 @@ from pathlib import Path
import argparse, textwrap, logging import argparse, textwrap, logging
from alert_processor import AlertProcessor from alert_processor import AlertProcessor
def setup_logger(loglevel): #Setup basic logging.
"""Setup basic logging.""" log_format='[%(levelname)s] %(asctime)s %(message)s'
loglevel = getattr(logging, loglevel.upper(), None) logging.basicConfig(format=log_format, level=logging.INFO)
if not isinstance(loglevel, int):
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(loglevel) file_handler = logging.FileHandler('log', mode='a')
formatter = logging.Formatter(log_format)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def parse_args(): def parse_args():
"""Parse command line arguments.""" """Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level') parser.add_argument('--loglevel', choices=['info', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
args = parser.parse_args() args = parser.parse_args()
if args.config is None: if args.config is None:
logging.error("No config file provided. Please use --config path_to_config.toml") logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1) sys.exit(1)
return args return args
@ -38,14 +36,14 @@ def read_file(filename):
try: try:
return Path(filename).read_text() return Path(filename).read_text()
except FileNotFoundError as e: except FileNotFoundError as e:
logging.error("Error: The specified file does not exist.") logger.error("Error: The specified file does not exist.")
sys.exit(1) sys.exit(1)
def parse_toml(content): def parse_toml(content):
try: try:
return toml.loads(content) return toml.loads(content)
except Exception as e: except Exception as e:
logging.error("Error: Failed to parse TOML file.") logger.error("Error: Failed to parse TOML file.")
sys.exit(1) sys.exit(1)
def calculate_event_hash(event): def calculate_event_hash(event):
@ -68,44 +66,44 @@ class FileChangeHandler(FileSystemEventHandler):
self.event_list = event_list self.event_list = event_list
def on_modified(self, event): def on_modified(self, event):
logging.debug(f"File modified: {event.src_path}") logger.info(f"File modified: {event.src_path}")
if not event.is_directory: if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
logging.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event): def on_deleted(self, event):
logging.debug(f"File deleted: {event.src_path}") logger.info(f"File deleted: {event.src_path}")
if not event.is_directory: if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event): def on_created(self, event):
logging.debug(f"File created: {event.src_path}") logger.info(f"File created: {event.src_path}")
if not event.is_directory: if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
logging.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
@ -118,7 +116,7 @@ class FileChangeHandler(FileSystemEventHandler):
new_hash = calculate_event_hash(event_dict) new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash: if new_hash != old_hash:
logging.debug(f"Event with UID {old_event['uid']} has been modified or deleted") logger.info(f"Event with UID {old_event['uid']} has been modified or deleted")
self.event_list[i] = event_dict self.event_list[i] = event_dict
break break
else: else:
@ -126,7 +124,7 @@ class FileChangeHandler(FileSystemEventHandler):
else: # If remove is True, remove the event from the list else: # If remove is True, remove the event from the list
for i, old_event in enumerate(self.event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
logging.debug(f"Event with UID {old_event['uid']} has been deleted") logger.info(f"Event with UID {old_event['uid']} has been deleted")
del self.event_list[i] del self.event_list[i]
break break
@ -336,12 +334,12 @@ def process_alert(current_time, next_alert, next_event, event, config):
""" """
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15): if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0: if len(event["alert_history"]) == 0:
logging.debug(f"First alert for '{event['summary']}' detected") logger.info(f"First alert for '{event['summary']}' detected")
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}] event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]: elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return return
else: else:
logging.debug(f"Posting alert for {event['summary']}!") logger.info(f"Posting alert for {event['summary']}!")
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}) event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
try: try:
processor = AlertProcessor(config) processor = AlertProcessor(config)
@ -376,7 +374,7 @@ def main():
try: try:
event_dict = calendar_parser.parse_calendar(cal_str) event_dict = calendar_parser.parse_calendar(cal_str)
except Exception: except Exception:
logging.error(f"Error parsing event, skipping. {file}") logger.error(f"Error parsing event, skipping. {file}")
continue continue
event_list.append(event_dict) event_list.append(event_dict)

View File

@ -8,7 +8,8 @@ Type=simple
Restart=always Restart=always
RestartSec=1 RestartSec=1
User=root User=root
ExecStart=/opt/remindme_caldav/.venv/bin/python3 /opt/remindme_caldav/remindme_caldav.py --config /etc/remindme_caldav/config.toml ExecStart=/opt/remindme_caldav/.venv/bin/python3 -u /opt/remindme_caldav/remindme_caldav.py --config /etc/remindme_caldav/config.toml
Environment="PYTHONUNBUFFERED=1"
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target