Compare commits

...

2 Commits

Author SHA1 Message Date
Sam a5583d981b Update log directory check in setup_log_location function
* Updated the condition to check if the log directory exists by using Path(logdir).is_dir() instead of Path.is_dir(logdir).
2024-02-16 12:36:05 +00:00
Sam 64516db02a Update logging and file handling
* Added setup_log_location function to handle log directory creation and validation.
* Updated setup_logging function to accept a log location instead of a log file name.
* Updated parse_args function to handle log directory argument and validate the presence of a config file.
* Updated read_file function to raise an error if the specified file does not exist.
* Updated parse_toml function to raise an error if the parsed TOML content is None.
* Added get_calendar_dir function to validate the existence of the calendar directory.
* Added parse_calendar_files function to find and return a list of .ics files in the specified directory.
* Added construct_initial_event_dict function to parse each .ics file into an event dictionary and store them in a list.
* Updated FileChangeHandler class to log warnings instead of errors for invalid or unparseable files.
* Updated CalendarParser class to log warnings if no event dates are found for an event, or if the calendar component is missing.
* Added daemon function to monitor events and update status files at regular intervals.
* Updated main function to use setup_log_location, parse_args, get_calendar_dir, and construct_initial_event_dict functions.
2024-02-16 11:57:51 +00:00
2 changed files with 107 additions and 83 deletions

View File

@ -1,6 +1,6 @@
install: install:
sudo apt-get update && sudo apt install python3-venv -y sudo apt-get update && sudo apt install python3-venv -y
sudo mkdir /opt/remindme_caldav /etc/remindme_caldav sudo mkdir -p /opt/remindme_caldav/status /etc/remindme_caldav
python3 -m venv /opt/remindme_caldav/.venv python3 -m venv /opt/remindme_caldav/.venv
cp remindme_caldav.py alert_processor.py /opt/remindme_caldav/ cp remindme_caldav.py alert_processor.py /opt/remindme_caldav/
. /opt/remindme_caldav/.venv/bin/activate && pip3 install -r requirements.txt . /opt/remindme_caldav/.venv/bin/activate && pip3 install -r requirements.txt

View File

@ -14,33 +14,80 @@ from alert_processor import AlertProcessor
logger = logging.getLogger() logger = logging.getLogger()
def setup_logging(logfile = "log"): def setup_log_location(logdir):
if not Path(logdir).is_dir():
raise FileNotFoundError(f"Log dir '{logdir}' does not exist. Be sure to create it first.")
log_location = os.path.join(logdir, "log")
status_location = os.path.join(logdir, "status")
alert_history_location = os.path.join(logdir, "alert_history")
return log_location, status_location, alert_history_location
def setup_logging(log_location):
log_format='[%(levelname)s] %(asctime)s %(message)s' log_format='[%(levelname)s] %(asctime)s %(message)s'
logging.basicConfig(filename = logfile, format=log_format, level=logging.INFO) logging.basicConfig(filename = log_location, format=log_format, level=logging.INFO)
def parse_args(): def parse_args():
"""Parse command line arguments.""" """Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--logfile', type=str, help='Path to logfile file. Defaults to "log" in current directory.', default = "log") parser.add_argument('--logdir', type=str, help='Path to logfile directory', default = "logs")
parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
return parser.parse_args() args = parser.parse_args()
if args.config is None:
logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1)
return args
def read_file(filename): def read_file(filename):
try: try:
return Path(filename).read_text() return Path(filename).read_text()
except FileNotFoundError: except FileNotFoundError:
logger.error("Error: The specified file does not exist.")
raise FileNotFoundError("Error: The specified file does not exist.") raise FileNotFoundError("Error: The specified file does not exist.")
def parse_toml(content): def parse_toml(content):
try: try:
return toml.loads(content) config = toml.loads(content)
if config is None:
logging.error("Invalid config")
sys.exit(1)
return config
except Exception: except Exception:
logger.error("Error: Failed to parse TOML file.")
raise RuntimeError("Error: Failed to parse TOML file.") raise RuntimeError("Error: Failed to parse TOML file.")
def get_calendar_dir(config):
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1)
return cal_dir
def parse_calendar_files(cal_dir):
files = []
no_files_detected = True
logger.info(f"Looking for calendar files in {cal_dir}...")
while no_files_detected is True:
files = list(cal_dir.glob('*.ics'))
if len(files) != 0:
logger.info("Calendar files detected in sync location!")
no_files_detected = False
return files
def construct_initial_event_dict(cal_dir):
files = parse_calendar_files(cal_dir)
calendar_parser = CalendarParser()
event_list = []
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception as e:
logger.warning(f"Error parsing event, skipping. {file}. Error message {e}")
continue
event_list.append(event_dict)
return event_list
def calculate_event_hash(event): def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
@ -89,12 +136,12 @@ class FileChangeHandler(FileSystemEventHandler):
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.warning(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.warning(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
@ -205,6 +252,8 @@ class CalendarParser:
generator = RecurringEventGenerator(dtstart, event_dict["rrule"]) generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate() recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"]) event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
if len(event_dates) == 0:
logging.warning(f"No event dates for event: '{event['summary']}'")
valarms = self.process_valarm(event) valarms = self.process_valarm(event)
@ -258,7 +307,7 @@ class CalendarParser:
try: try:
event_info[info] = event[info] event_info[info] = event[info]
except Exception: except Exception:
pass logging.info(f"CalDav componant '{info}' missing for event {event['summary']}")
return event_info return event_info
def dtstart_to_datetime(self, dtstart): def dtstart_to_datetime(self, dtstart):
@ -299,6 +348,8 @@ class CalendarParser:
valarm = Event.from_ical(subcomponent.to_ical()) valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt timedelta = valarm["trigger"].dt
valarms.append(timedelta) valarms.append(timedelta)
if len(valarms) == 0:
logging.info(f"No reminders for event: {event['summary']}")
return valarms return valarms
def get_next_alert(event, current_time): def get_next_alert(event, current_time):
@ -317,7 +368,7 @@ def get_next_alert(event, current_time):
next_alert = min(next_alert_list) next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, next_event, event, config): def process_alert(current_time, next_alert, next_event, event, config, alert_history_location):
""" """
Processes a given alert and passes it to a messaging client. Processes a given alert and passes it to a messaging client.
""" """
@ -335,60 +386,62 @@ def process_alert(current_time, next_alert, next_event, event, config):
processor.send_email(event, next_alert, next_event) processor.send_email(event, next_alert, next_event)
except Exception as e: except Exception as e:
raise RuntimeError(f"Error sending alert for event. {e}") raise RuntimeError(f"Error sending alert for event. {e}")
with open("alert_history", 'a') as f: with open(alert_history_location, 'a') as f:
f.write(str(event)) f.write(str(event))
return return
def daemon(status_location, alert_history_location, config, event_list):
with open(status_location, 'w') as f:
f.write("") # Refresh the status file
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
try:
next_alert, next_event = get_next_alert(event, current_time)
except RuntimeError as e:
logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}")
continue
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open(status_location, 'a') as f:
f.write(monitor_status) # Write the output to the file
f.write("\n")
try:
process_alert(current_time, next_alert, next_event, event, config, alert_history_location)
except RuntimeError as e:
logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}")
return
def main(): def main():
# Parse args # Parse args and initiate logging
args = parse_args() args = parse_args()
log_location, status_location, alert_history_location = setup_log_location(args.logdir)
setup_logging(log_location)
logger = logging.getLogger()
setup_logging(args.logfile) # Redefine log level if args passed
logger = logging.getLogger() # Assign a default value to logger
# Set log level
if args.loglevel is not None: if args.loglevel is not None:
numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer
if isinstance(numeric_level, int): if isinstance(numeric_level, int):
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(numeric_level) # Set the log level logger.setLevel(numeric_level) # Set the log level
if args.config is None:
logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1)
# Setup initial event_list
config_file = read_file(args.config) config_file = read_file(args.config)
config = parse_toml(config_file) config = parse_toml(config_file)
if config is None: cal_dir = get_calendar_dir(config)
logging.error("Invalid config") event_list = construct_initial_event_dict(cal_dir)
sys.exit(1)
# Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1)
# Parse calendar events
no_files_detected = True
logger.info(f"Looking for calendar files in {cal_dir}...")
while no_files_detected is True:
files = list(cal_dir.glob('*.ics'))
if len(files) != 0:
logger.info("Calendar files detected in sync location!")
no_files_detected = False
calendar_parser = CalendarParser()
event_list = []
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception as e:
logger.warning(f"Error parsing event, skipping. {file}. Error message {e}")
continue
event_list.append(event_dict)
# Start file handler to detect changes to calendar dir # Start file handler to detect changes to calendar dir
observer = Observer() observer = Observer()
@ -399,36 +452,7 @@ def main():
# Start main loop # Start main loop
try: try:
while True: while True:
with open("status", 'w') as f: daemon(status_location, alert_history_location, config, event_list)
f.write("") # Refresh the status file
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
try:
next_alert, next_event = get_next_alert(event, current_time)
except RuntimeError as e:
logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}")
continue
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f:
f.write(monitor_status) # Write the output to the file
f.write("\n")
try:
process_alert(current_time, next_alert, next_event, event, config)
except RuntimeError as e:
logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}")
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
observer.stop() observer.stop()