Complete refactor of script to make it more pythonic

This commit is contained in:
mrsu 2024-02-10 23:53:15 +00:00
parent f6b1b2d5ee
commit f239b4fb23
1 changed files with 95 additions and 65 deletions

View File

@ -1,3 +1,4 @@
from pathlib import Path
from icalendar import Calendar, Event from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
@ -9,38 +10,34 @@ from watchdog.events import FileSystemEventHandler
import email_alert, xmpp_alert import email_alert, xmpp_alert
from pprint import pprint from pprint import pprint
import humanfriendly import humanfriendly
from pathlib import Path
import argparse
import textwrap
# Parse args def parse_args():
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") """Parse command line arguments."""
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
args = parser.parse_args() parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
args = parser.parse_args()
if args.config is None:
raise ValueError("No config file provided")
return args
if not args.config: def read_file(filename):
print("Error: No config file provided.") try:
sys.exit(1) # Exit with error code return Path(filename).read_text()
elif not os.path.isfile(args.config): except FileNotFoundError as e:
print("Error: The specified config file does not exist.") print(f"Error: The specified file does not exist. {e}")
sys.exit(1) # Exit with error code sys.exit(1) # Exit with error code
else:
print("Config file path: ", args.config)
# Get config def parse_toml(content):
try: try:
with open(args.config, 'r') as f: return toml.loads(content)
config = toml.load(f) except Exception as e:
except Exception as e: print("Error: Failed to parse TOML file.")
print("Error: Failed to parse TOML file.") print(e)
print(e) sys.exit(1) # Exit with error code
sys.exit(1) # Exit with error code
cal_dir = config["app"]["calendar_dir"]
# Check if the path is a directory
if not os.path.isdir(cal_dir):
print("The provided path to .ics files does not exist: '{}'".format(cal_dir))
exit(1)
# Get all .ics files from your directory
files = glob.glob(os.path.join(cal_dir, '*.ics'))
class DateTimeEncoder(json.JSONEncoder): class DateTimeEncoder(json.JSONEncoder):
def default(self, o): def default(self, o):
@ -316,7 +313,7 @@ def get_next_alert(event, current_time):
next_alert = min(next_alert_list) next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, event): def process_alert(current_time, next_alert, next_event, event, config):
if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15): if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0: if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"])) print("First alert for '{}' detected".format(event["summary"]))
@ -333,38 +330,71 @@ def process_alert(current_time, next_alert, event):
f.write(str(event)) # write expects a str not dict f.write(str(event)) # write expects a str not dict
return return
# Create initial event_list using calendar_parser def create_event_list(files):
event_list = [] # List to hold dictionaries for each event event_list = [] # List to hold dictionaries for each event
for file in files: for file in files:
with open(file, 'r') as f: with open(file, 'r') as f:
cal_str = f.read() cal_str = f.read()
event_dict = calendar_parser(cal_str) event_dict = calendar_parser(cal_str)
event_list.append(event_dict) event_list.append(event_dict)
return event_list
observer = Observer() def start_observer(cal_dir):
handler = FileChangeHandler() observer = Observer()
observer.schedule(handler, cal_dir, recursive=True) handler = FileChangeHandler()
observer.start() observer.schedule(handler, cal_dir, recursive=True)
try: observer.start()
while True: return observer
with open("status", 'w') as f:
#Refresh the status file def main_loop(event_list, config, cal_dir):
f.write("") observer = start_observer(cal_dir)
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) try:
for event in event_list: while True:
next_alert, next_event = get_next_alert(event, current_time) with open("status", 'w') as f:
if next_alert == None: #Refresh the status file
continue f.write("")
event_delta = next_alert-current_time current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
total_seconds = event_delta.total_seconds() for event in event_list:
human_readable_time = humanfriendly.format_timespan(total_seconds) next_alert, next_event = get_next_alert(event, current_time)
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {} in {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["event_dates"]], next_alert, human_readable_time, event["recur_info"], event["alert_history"]) if next_alert == None:
with open("status", 'a') as f: continue
# Write the output to the file event_delta = next_alert-current_time
f.write(monitor_status) total_seconds = event_delta.total_seconds()
f.write("\n") human_readable_time = humanfriendly.format_timespan(total_seconds)
process_alert(current_time, next_alert, event) monitor_status = f"""\
time.sleep(1) Current time: {current_time}
except KeyboardInterrupt: Monitoring: {event["summary"]}
observer.stop() Event date: {next_event}
observer.join() Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
process_alert(current_time, next_alert, next_event, event, config)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# Main script execution starts here
def main():
args = parse_args()
print(args)
content = read_file(args.config)
config = parse_toml(content)
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code
files = list(cal_dir.glob('*.ics'))
event_list = create_event_list(files)
main_loop(event_list, config, cal_dir)
if __name__ == "__main__":
main()