From f239b4fb232d376454258bacbc0dffdcbb4faee2 Mon Sep 17 00:00:00 2001 From: mrsu Date: Sat, 10 Feb 2024 23:53:15 +0000 Subject: [PATCH] Complete refactor of script to make it more pythonic --- remindme_caldav.py | 160 +++++++++++++++++++++++++++------------------ 1 file changed, 95 insertions(+), 65 deletions(-) diff --git a/remindme_caldav.py b/remindme_caldav.py index fc9b43e..71823a7 100644 --- a/remindme_caldav.py +++ b/remindme_caldav.py @@ -1,3 +1,4 @@ +from pathlib import Path from icalendar import Calendar, Event import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time from dateutil.relativedelta import relativedelta @@ -9,38 +10,34 @@ from watchdog.events import FileSystemEventHandler import email_alert, xmpp_alert from pprint import pprint import humanfriendly +from pathlib import Path +import argparse +import textwrap -# Parse args -parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") -parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') -args = parser.parse_args() +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") + parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') + parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level') + args = parser.parse_args() + if args.config is None: + raise ValueError("No config file provided") + return args -if not args.config: - print("Error: No config file provided.") - sys.exit(1) # Exit with error code -elif not os.path.isfile(args.config): - print("Error: The specified config file does not exist.") - sys.exit(1) # Exit with error code -else: - print("Config file path: ", args.config) - -# Get config -try: - with open(args.config, 'r') as f: - config = toml.load(f) -except Exception as e: - print("Error: Failed to parse TOML file.") - print(e) - sys.exit(1) # Exit with error code - -cal_dir = config["app"]["calendar_dir"] -# Check if the path is a directory -if not os.path.isdir(cal_dir): - print("The provided path to .ics files does not exist: '{}'".format(cal_dir)) - exit(1) - -# Get all .ics files from your directory -files = glob.glob(os.path.join(cal_dir, '*.ics')) +def read_file(filename): + try: + return Path(filename).read_text() + except FileNotFoundError as e: + print(f"Error: The specified file does not exist. {e}") + sys.exit(1) # Exit with error code + +def parse_toml(content): + try: + return toml.loads(content) + except Exception as e: + print("Error: Failed to parse TOML file.") + print(e) + sys.exit(1) # Exit with error code class DateTimeEncoder(json.JSONEncoder): def default(self, o): @@ -316,7 +313,7 @@ def get_next_alert(event, current_time): next_alert = min(next_alert_list) return next_alert - dt.timedelta(seconds=5), next_event -def process_alert(current_time, next_alert, event): +def process_alert(current_time, next_alert, next_event, event, config): if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15): if len(event["alert_history"]) == 0: print("First alert for '{}' detected".format(event["summary"])) @@ -333,38 +330,71 @@ def process_alert(current_time, next_alert, event): f.write(str(event)) # write expects a str not dict return -# Create initial event_list using calendar_parser -event_list = [] # List to hold dictionaries for each event -for file in files: - with open(file, 'r') as f: - cal_str = f.read() - event_dict = calendar_parser(cal_str) - event_list.append(event_dict) +def create_event_list(files): + event_list = [] # List to hold dictionaries for each event + for file in files: + with open(file, 'r') as f: + cal_str = f.read() + event_dict = calendar_parser(cal_str) + event_list.append(event_dict) + return event_list -observer = Observer() -handler = FileChangeHandler() -observer.schedule(handler, cal_dir, recursive=True) -observer.start() -try: - while True: - with open("status", 'w') as f: - #Refresh the status file - f.write("") - current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) - for event in event_list: - next_alert, next_event = get_next_alert(event, current_time) - if next_alert == None: - continue - event_delta = next_alert-current_time - total_seconds = event_delta.total_seconds() - human_readable_time = humanfriendly.format_timespan(total_seconds) - monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {} in {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["event_dates"]], next_alert, human_readable_time, event["recur_info"], event["alert_history"]) - with open("status", 'a') as f: - # Write the output to the file - f.write(monitor_status) - f.write("\n") - process_alert(current_time, next_alert, event) - time.sleep(1) -except KeyboardInterrupt: - observer.stop() -observer.join() +def start_observer(cal_dir): + observer = Observer() + handler = FileChangeHandler() + observer.schedule(handler, cal_dir, recursive=True) + observer.start() + return observer + +def main_loop(event_list, config, cal_dir): + observer = start_observer(cal_dir) + try: + while True: + with open("status", 'w') as f: + #Refresh the status file + f.write("") + current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) + for event in event_list: + next_alert, next_event = get_next_alert(event, current_time) + if next_alert == None: + continue + event_delta = next_alert-current_time + total_seconds = event_delta.total_seconds() + human_readable_time = humanfriendly.format_timespan(total_seconds) + monitor_status = f"""\ + Current time: {current_time} + Monitoring: {event["summary"]} + Event date: {next_event} + Recur Dates: {[str(i) for i in event["event_dates"]]} + Next alert on: {next_alert} in {human_readable_time} + Recur info: {event["recur_info"]} + Alert history: {event["alert_history"]}\n""" + monitor_status = textwrap.dedent(monitor_status) + with open("status", 'a') as f: + # Write the output to the file + f.write(monitor_status) + f.write("\n") + process_alert(current_time, next_alert, next_event, event, config) + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + observer.join() + +# Main script execution starts here +def main(): + args = parse_args() + print(args) + content = read_file(args.config) + config = parse_toml(content) + + cal_dir = Path(config["app"]["calendar_dir"]) + if not cal_dir.is_dir(): + print(f"The provided path to .ics files does not exist: '{cal_dir}'") + sys.exit(1) # Exit with error code + + files = list(cal_dir.glob('*.ics')) + event_list = create_event_list(files) + main_loop(event_list, config, cal_dir) + +if __name__ == "__main__": + main()