Compare commits
No commits in common. "e4c42de74c599b3b26217dd22b359b3d394273d3" and "f6b1b2d5ee66678dc946c3841340917623467b65" have entirely different histories.
e4c42de74c
...
f6b1b2d5ee
|
@ -1,4 +1,3 @@
|
||||||
from pathlib import Path
|
|
||||||
from icalendar import Calendar, Event
|
from icalendar import Calendar, Event
|
||||||
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
|
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
|
||||||
from dateutil.relativedelta import relativedelta
|
from dateutil.relativedelta import relativedelta
|
||||||
|
@ -10,49 +9,38 @@ from watchdog.events import FileSystemEventHandler
|
||||||
import email_alert, xmpp_alert
|
import email_alert, xmpp_alert
|
||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
import humanfriendly
|
import humanfriendly
|
||||||
from pathlib import Path
|
|
||||||
import argparse
|
|
||||||
import textwrap
|
|
||||||
import logging
|
|
||||||
|
|
||||||
def setup_logger(loglevel):
|
# Parse args
|
||||||
"""Setup basic logging."""
|
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
|
||||||
loglevel = getattr(logging, loglevel.upper(), None)
|
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
|
||||||
|
args = parser.parse_args()
|
||||||
if not isinstance(loglevel, int):
|
|
||||||
raise ValueError('Invalid log level: %s' % loglevel)
|
|
||||||
|
|
||||||
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
|
|
||||||
logger = logging.getLogger()
|
|
||||||
logger.setLevel(loglevel)
|
|
||||||
|
|
||||||
def parse_args():
|
if not args.config:
|
||||||
"""Parse command line arguments."""
|
print("Error: No config file provided.")
|
||||||
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
|
sys.exit(1) # Exit with error code
|
||||||
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
|
elif not os.path.isfile(args.config):
|
||||||
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
|
print("Error: The specified config file does not exist.")
|
||||||
args = parser.parse_args()
|
sys.exit(1) # Exit with error code
|
||||||
if args.config is None:
|
else:
|
||||||
raise ValueError("No config file provided")
|
print("Config file path: ", args.config)
|
||||||
return args
|
|
||||||
|
|
||||||
def read_file(filename):
|
# Get config
|
||||||
try:
|
try:
|
||||||
return Path(filename).read_text()
|
with open(args.config, 'r') as f:
|
||||||
except FileNotFoundError as e:
|
config = toml.load(f)
|
||||||
print(f"Error: The specified file does not exist. {e}")
|
except Exception as e:
|
||||||
sys.exit(1) # Exit with error code
|
print("Error: Failed to parse TOML file.")
|
||||||
|
print(e)
|
||||||
def parse_toml(content):
|
sys.exit(1) # Exit with error code
|
||||||
try:
|
|
||||||
return toml.loads(content)
|
|
||||||
except Exception as e:
|
|
||||||
print("Error: Failed to parse TOML file.")
|
|
||||||
print(e)
|
|
||||||
sys.exit(1) # Exit with error code
|
|
||||||
|
|
||||||
def calculate_event_hash(event):
|
cal_dir = config["app"]["calendar_dir"]
|
||||||
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
|
# Check if the path is a directory
|
||||||
|
if not os.path.isdir(cal_dir):
|
||||||
|
print("The provided path to .ics files does not exist: '{}'".format(cal_dir))
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
# Get all .ics files from your directory
|
||||||
|
files = glob.glob(os.path.join(cal_dir, '*.ics'))
|
||||||
|
|
||||||
class DateTimeEncoder(json.JSONEncoder):
|
class DateTimeEncoder(json.JSONEncoder):
|
||||||
def default(self, o):
|
def default(self, o):
|
||||||
|
@ -65,11 +53,29 @@ class FileChangeHandler(FileSystemEventHandler):
|
||||||
`FileChangeHandler` is a custom event handler for the
|
`FileChangeHandler` is a custom event handler for the
|
||||||
`watchdog.observers.Observer` class that handles file system events such as
|
`watchdog.observers.Observer` class that handles file system events such as
|
||||||
file modifications, deletions and creations.
|
file modifications, deletions and creations.
|
||||||
"""
|
|
||||||
def __init__(self, event_list):
|
|
||||||
self.calendar_parser = CalendarParser() # Create an instance of CalendarParser
|
|
||||||
self.event_list = event_list
|
|
||||||
|
|
||||||
|
It inherits from `watchdog.events.FileSystemEventHandler` providing methods
|
||||||
|
to handle these events: `on_modified`, `on_deleted`, and `on_created`. Each
|
||||||
|
method is overridden for specific functionality when a file system event
|
||||||
|
occurs.
|
||||||
|
|
||||||
|
The class also includes a method `calculate_event_hash` that generates an
|
||||||
|
MD5 hash for each event dictionary based on its contents. This is used to
|
||||||
|
track changes in the events and determine if they have been modified or
|
||||||
|
deleted.
|
||||||
|
|
||||||
|
For example, when a file is modified:
|
||||||
|
- It reads the content of the file, parses it into an event dictionary
|
||||||
|
using `calendar_parser`.
|
||||||
|
- Calculates the hash for the event and checks if there's already an
|
||||||
|
existing event with the same UID in `event_list`.
|
||||||
|
- If there is one, it compares the new hash with the old hash. If they
|
||||||
|
differ, it prints that the event has been modified or deleted. Otherwise,
|
||||||
|
it prints that the event hasn't been modified.
|
||||||
|
|
||||||
|
For file deletion and creation, similar operations are performed but
|
||||||
|
without comparison of hashes.
|
||||||
|
"""
|
||||||
def on_modified(self, event):
|
def on_modified(self, event):
|
||||||
print(f"File modified: {event.src_path}")
|
print(f"File modified: {event.src_path}")
|
||||||
if not event.is_directory:
|
if not event.is_directory:
|
||||||
|
@ -81,7 +87,7 @@ class FileChangeHandler(FileSystemEventHandler):
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
|
event_dict = calendar_parser(cal_str)
|
||||||
except Exception as i:
|
except Exception as i:
|
||||||
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
|
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
|
||||||
return
|
return
|
||||||
|
@ -107,7 +113,7 @@ class FileChangeHandler(FileSystemEventHandler):
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
|
event_dict = calendar_parser(cal_str)
|
||||||
except Exception as i:
|
except Exception as i:
|
||||||
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
|
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
|
||||||
return
|
return
|
||||||
|
@ -116,111 +122,175 @@ class FileChangeHandler(FileSystemEventHandler):
|
||||||
|
|
||||||
def handle_modified(self, old_event, event_dict, remove=False):
|
def handle_modified(self, old_event, event_dict, remove=False):
|
||||||
if not remove:
|
if not remove:
|
||||||
for i, old_event in enumerate(self.event_list):
|
for i, old_event in enumerate(event_list):
|
||||||
if old_event["uid"] == event_dict["uid"]:
|
if old_event["uid"] == event_dict["uid"]:
|
||||||
old_hash = old_event["hash"]
|
old_hash = old_event["hash"]
|
||||||
|
|
||||||
new_hash = calculate_event_hash(event_dict)
|
new_hash = self.calculate_event_hash(event_dict)
|
||||||
if new_hash != old_hash:
|
if new_hash != old_hash:
|
||||||
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
|
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
|
||||||
|
|
||||||
self.event_list[i] = event_dict
|
event_list[i] = event_dict
|
||||||
else:
|
else:
|
||||||
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
|
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
|
||||||
|
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self.event_list.append(event_dict)
|
event_list.append(event_dict)
|
||||||
else: # If remove is True, remove the event from the list
|
else: # If remove is True, remove the event from the list
|
||||||
for i, old_event in enumerate(self.event_list):
|
for i, old_event in enumerate(event_list):
|
||||||
if old_event["uid"] == event_dict["uid"]:
|
if old_event["uid"] == event_dict["uid"]:
|
||||||
print("Event with UID {} has been deleted".format(old_event["uid"]))
|
print("Event with UID {} has been deleted".format(old_event["uid"]))
|
||||||
|
|
||||||
del self.event_list[i]
|
del event_list[i]
|
||||||
break
|
break
|
||||||
|
|
||||||
|
def calculate_event_hash(self, event):
|
||||||
|
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
|
||||||
|
|
||||||
class RecurringEventGenerator:
|
def generate_recurring_event_dates(dtstart, rrule):
|
||||||
def __init__(self, dtstart, rrule):
|
"""
|
||||||
self.dtstart = dtstart
|
Generate recurring event dates based on a start date and an RRULE.
|
||||||
self.rrule = rrule
|
|
||||||
self.recur_info = {
|
This function takes in a start date (`dtstart`) and an RRULE (`rrule`),
|
||||||
|
which is used to generate future dates according to the rules specified by
|
||||||
|
the RRULE.
|
||||||
|
|
||||||
|
If no count rule is present, it generates a date array from `dtstart` to
|
||||||
|
the current datetime, then adds an arbitrary number of dates into the
|
||||||
|
future (here it's 10). This is done because generating a date array using
|
||||||
|
the rrulestr function without a count rule will return a very large number
|
||||||
|
of elements.
|
||||||
|
|
||||||
|
The function returns a dictionary containing information about the
|
||||||
|
recurring event:
|
||||||
|
- `recur_dates`: A list of future dates generated by the RRULE.
|
||||||
|
- `infinite_recur`: Boolean indicating whether the recurrence is infinite.
|
||||||
|
- `recur_freq`: The frequency of the recurrence (e.g., 'DAILY').
|
||||||
|
- `recur_interval`: The interval between each occurrence of the event.
|
||||||
|
- `n_recur_dates_left`: The number of future dates left.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
dtstart (datetime): The start date of the recurring event.
|
||||||
|
rrule (rrule): The RRULE object specifying the recurrence rules.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: A dictionary containing information about the recurring event
|
||||||
|
dates, frequency, interval and count.
|
||||||
|
"""
|
||||||
|
recur_info = {
|
||||||
"recur_dates": [dtstart],
|
"recur_dates": [dtstart],
|
||||||
"infinite_recur": False,
|
"infinite_recur": False,
|
||||||
"recur_freq": None,
|
"recur_freq": None,
|
||||||
"recur_interval": None,
|
"recur_interval": None,
|
||||||
"n_recur_dates_left": None
|
"n_recur_dates_left": None
|
||||||
|
}
|
||||||
|
if rrule is None:
|
||||||
|
return recur_info
|
||||||
|
rule_str = "RRULE:{}".format(rrule.to_ical().decode('utf-8'))
|
||||||
|
start_date = dtstart
|
||||||
|
infinite_recur = False
|
||||||
|
freq = rrule.get('FREQ')[0]
|
||||||
|
count = rrule.get("COUNT")
|
||||||
|
interval = rrule.get('INTERVAL')[0]
|
||||||
|
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
|
||||||
|
|
||||||
|
if count is None:# or until is not None:
|
||||||
|
|
||||||
|
delta = None
|
||||||
|
|
||||||
|
if freq == "DAILY":
|
||||||
|
delta = relativedelta(days=interval)
|
||||||
|
elif freq == "MONTHLY":
|
||||||
|
delta = relativedelta(months=interval)
|
||||||
|
elif freq == "YEARLY":
|
||||||
|
delta = relativedelta(years=interval)
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
origin_date = start_date
|
||||||
|
while origin_date < current_date:
|
||||||
|
count += interval
|
||||||
|
origin_date += delta*interval
|
||||||
|
|
||||||
|
rule_str += ";COUNT={}".format(count+10)
|
||||||
|
infinite_recur = True
|
||||||
|
|
||||||
|
ruleset = rrulestr(rule_str, dtstart=start_date)
|
||||||
|
|
||||||
|
recur_dates = [None]
|
||||||
|
n_recur = None
|
||||||
|
dates = list(ruleset) # Generate future dates according to the rules
|
||||||
|
recur_dates = [i for i in dates if i >= current_date]
|
||||||
|
n_recur = "inf" if infinite_recur is True else len(recur_dates)
|
||||||
|
|
||||||
|
recur_info["recur_dates"] = recur_dates
|
||||||
|
recur_info["infinite_recur"] = infinite_recur
|
||||||
|
recur_info["recur_freq"] = freq
|
||||||
|
recur_info["recur_interval"] = interval
|
||||||
|
recur_info["n_recur_dates_left"] = n_recur
|
||||||
|
return recur_info
|
||||||
|
|
||||||
|
def calendar_parser(cal_str):
|
||||||
|
# Parse the calendar
|
||||||
|
cal = Calendar.from_ical(cal_str)
|
||||||
|
|
||||||
|
# Iterate over each event in the calendar
|
||||||
|
for event in cal.walk('vevent'):
|
||||||
|
|
||||||
|
event_info = {
|
||||||
|
"uid": None,
|
||||||
|
"dtstart": "",
|
||||||
|
"exdate": [],
|
||||||
|
"summary": None,
|
||||||
|
"description": None,
|
||||||
|
"location": None,
|
||||||
|
"rrule": None
|
||||||
}
|
}
|
||||||
|
# Catch errors for missing components
|
||||||
def generate(self):
|
for info in event_info:
|
||||||
if self.rrule is None:
|
try:
|
||||||
return self.recur_info
|
event_info[info] = event[info]
|
||||||
|
except Exception:
|
||||||
rule_str = "RRULE:{}".format(self.rrule.to_ical().decode('utf-8'))
|
pass
|
||||||
start_date = self.dtstart
|
|
||||||
infinite_recur = False
|
uid = str(event_info["uid"])
|
||||||
freq = self.rrule.get('FREQ')[0]
|
dtstart = event_info["dtstart"].dt
|
||||||
count = self.rrule.get("COUNT")
|
exdates = event_info["exdate"]
|
||||||
interval = self.rrule.get('INTERVAL')[0]
|
if exdates is not []:
|
||||||
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
|
if isinstance(exdates, list):
|
||||||
|
exdates = [i.dts.dt.replace(tzinfo=pytz.UTC) for i in exdates]
|
||||||
if count is None:
|
else:
|
||||||
delta = None
|
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
|
||||||
|
summary = event["summary"]
|
||||||
if freq == "DAILY":
|
description = event_info["description"]
|
||||||
delta = relativedelta(days=interval)
|
location = event_info["location"]
|
||||||
elif freq == "MONTHLY":
|
rrule = event_info["rrule"]
|
||||||
delta = relativedelta(months=interval)
|
|
||||||
elif freq == "YEARLY":
|
# Ensure dates are always as datetime
|
||||||
delta = relativedelta(years=interval)
|
if isinstance(dtstart, dt.datetime):
|
||||||
|
dtstart = dtstart.replace(tzinfo=pytz.UTC)
|
||||||
count = 0
|
else:
|
||||||
origin_date = start_date
|
dtstart = dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
|
||||||
while origin_date < current_date:
|
|
||||||
count += interval
|
# Get recurring events if they exist
|
||||||
origin_date += delta*interval
|
recur_info = generate_recurring_event_dates(dtstart, rrule)
|
||||||
|
event_dates = recur_info["recur_dates"]
|
||||||
rule_str += ";COUNT={}".format(count+10)
|
|
||||||
infinite_recur = True
|
|
||||||
|
|
||||||
ruleset = rrulestr(rule_str, dtstart=start_date)
|
|
||||||
|
|
||||||
recur_dates = [None]
|
|
||||||
n_recur = None
|
|
||||||
dates = list(ruleset) # Generate future dates according to the rules
|
|
||||||
recur_dates = [i for i in dates if i >= current_date]
|
|
||||||
n_recur = "inf" if infinite_recur is True else len(recur_dates)
|
|
||||||
|
|
||||||
self.recur_info["recur_dates"] = recur_dates
|
|
||||||
self.recur_info["infinite_recur"] = infinite_recur
|
|
||||||
self.recur_info["recur_freq"] = freq
|
|
||||||
self.recur_info["recur_interval"] = interval
|
|
||||||
self.recur_info["n_recur_dates_left"] = n_recur
|
|
||||||
|
|
||||||
return self.recur_info
|
|
||||||
|
|
||||||
class CalendarParser:
|
# Remove exdates
|
||||||
def parse_calendar(self, cal_str):
|
event_dates = [i for i in event_dates if i not in exdates]
|
||||||
# Parse the calendar
|
|
||||||
cal = self.parse_icalendar(cal_str)
|
valarms = []
|
||||||
|
for subcomponent in event.walk("valarm"):
|
||||||
# Iterate over each event in the calendar
|
valarm = Event.from_ical(subcomponent.to_ical())
|
||||||
for event in cal.walk('vevent'):
|
timedelta = valarm["trigger"].dt
|
||||||
event_dict = self.process_event(event)
|
valarms.append(timedelta)
|
||||||
dtstart = self.dtstart_to_datetime(event_dict["dtstart"].dt)
|
|
||||||
|
|
||||||
generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
|
|
||||||
recur_info = generator.generate()
|
|
||||||
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
|
|
||||||
|
|
||||||
valarms = self.process_valarm(event)
|
event_dict = {
|
||||||
|
"uid": uid,
|
||||||
event_dict = {
|
|
||||||
"uid": str(event_dict["uid"]),
|
|
||||||
"dtstart": dtstart,
|
"dtstart": dtstart,
|
||||||
"summary": event_dict["summary"],
|
"summary": summary,
|
||||||
"description": event_dict["description"],
|
"description": description,
|
||||||
"location": event_dict["location"],
|
"location": location,
|
||||||
"event_dates": event_dates,
|
"event_dates": event_dates,
|
||||||
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
|
"recur_info": "Recur freq: {}, Recur interval: {}, N dates left: {}".format(
|
||||||
recur_info["recur_freq"],
|
recur_info["recur_freq"],
|
||||||
|
@ -229,76 +299,24 @@ class CalendarParser:
|
||||||
),
|
),
|
||||||
"valarms": valarms,
|
"valarms": valarms,
|
||||||
"alert_history": []
|
"alert_history": []
|
||||||
}
|
}
|
||||||
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
|
|
||||||
event_dict["hash"] = new_hash # Store the hash in the event dictionary
|
|
||||||
return event_dict
|
|
||||||
|
|
||||||
def parse_icalendar(self, cal_str):
|
|
||||||
return Calendar.from_ical(cal_str)
|
|
||||||
|
|
||||||
def process_event(self, event):
|
|
||||||
# Catch errors for missing components
|
|
||||||
event_info = {
|
|
||||||
"uid": None,
|
|
||||||
"dtstart": "",
|
|
||||||
"exdate": [],
|
|
||||||
"summary": None,
|
|
||||||
"description": None,
|
|
||||||
"location": None,
|
|
||||||
"rrule": None
|
|
||||||
}
|
|
||||||
|
|
||||||
for info in event_info:
|
|
||||||
try:
|
|
||||||
event_info[info] = event[info]
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return event_info
|
|
||||||
|
|
||||||
def dtstart_to_datetime(self, dtstart):
|
|
||||||
# Ensure dates are always as datetime
|
|
||||||
if isinstance(dtstart, dt.datetime):
|
|
||||||
return dtstart.replace(tzinfo=pytz.UTC)
|
|
||||||
else:
|
|
||||||
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
|
|
||||||
|
|
||||||
def remove_exdates(self, exdates, recur_dates):
|
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class
|
||||||
if exdates != []:
|
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
|
||||||
if isinstance(exdates, list):
|
event_dict["hash"] = new_hash # Store the hash in the event dictionary
|
||||||
exdates = [i.dts[0].dt.replace(tzinfo=pytz.UTC) for i in exdates]
|
return event_dict
|
||||||
else:
|
|
||||||
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
|
|
||||||
return [i for i in recur_dates if i not in exdates]
|
|
||||||
else:
|
|
||||||
return recur_dates
|
|
||||||
|
|
||||||
def process_valarm(self, event):
|
|
||||||
valarms = []
|
|
||||||
for subcomponent in event.walk("valarm"):
|
|
||||||
valarm = Event.from_ical(subcomponent.to_ical())
|
|
||||||
timedelta = valarm["trigger"].dt
|
|
||||||
valarms.append(timedelta)
|
|
||||||
return valarms
|
|
||||||
|
|
||||||
def get_next_alert(event, current_time):
|
def get_next_alert(event, current_time):
|
||||||
"""
|
|
||||||
This function returns the next alert that should be processed based on the current time.
|
|
||||||
"""
|
|
||||||
event_dates = event["event_dates"]
|
event_dates = event["event_dates"]
|
||||||
valarm_deltas = event["valarms"]
|
valarm_deltas = event["valarms"]
|
||||||
if event_dates == [] or event_dates is None or current_time > event_dates[-1]:
|
if event_dates == [] or current_time > event_dates[-1]:
|
||||||
return None, None
|
return None, None
|
||||||
next_event = [i for i in event_dates if i >= current_time][0]
|
next_event = [i for i in event_dates if i >= current_time][0]
|
||||||
next_alert_list = [next_event + i for i in valarm_deltas]
|
next_alert_list = [next_event + i for i in valarm_deltas]
|
||||||
next_alert = min(next_alert_list)
|
next_alert = min(next_alert_list)
|
||||||
return next_alert - dt.timedelta(seconds=5), next_event
|
return next_alert - dt.timedelta(seconds=5), next_event
|
||||||
|
|
||||||
def process_alert(current_time, next_alert, next_event, event, config):
|
def process_alert(current_time, next_alert, event):
|
||||||
"""
|
|
||||||
This function processes a given alert and passes it to a messaging client.
|
|
||||||
"""
|
|
||||||
if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
|
if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
|
||||||
if len(event["alert_history"]) == 0:
|
if len(event["alert_history"]) == 0:
|
||||||
print("First alert for '{}' detected".format(event["summary"]))
|
print("First alert for '{}' detected".format(event["summary"]))
|
||||||
|
@ -315,66 +333,38 @@ def process_alert(current_time, next_alert, next_event, event, config):
|
||||||
f.write(str(event)) # write expects a str not dict
|
f.write(str(event)) # write expects a str not dict
|
||||||
return
|
return
|
||||||
|
|
||||||
def main():
|
# Create initial event_list using calendar_parser
|
||||||
# Parse args and config
|
event_list = [] # List to hold dictionaries for each event
|
||||||
args = parse_args()
|
for file in files:
|
||||||
content = read_file(args.config)
|
with open(file, 'r') as f:
|
||||||
config = parse_toml(content)
|
cal_str = f.read()
|
||||||
|
event_dict = calendar_parser(cal_str)
|
||||||
# Get calendar dir
|
event_list.append(event_dict)
|
||||||
cal_dir = Path(config["app"]["calendar_dir"])
|
|
||||||
if not cal_dir.is_dir():
|
|
||||||
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
|
|
||||||
sys.exit(1) # Exit with error code
|
|
||||||
|
|
||||||
#Parse calendar events
|
observer = Observer()
|
||||||
calendar_parser = CalendarParser()
|
handler = FileChangeHandler()
|
||||||
files = list(cal_dir.glob('*.ics'))
|
observer.schedule(handler, cal_dir, recursive=True)
|
||||||
event_list = [] # List to hold dictionaries for each event
|
observer.start()
|
||||||
for file in files:
|
try:
|
||||||
with open(file, 'r') as f:
|
while True:
|
||||||
cal_str = f.read()
|
with open("status", 'w') as f:
|
||||||
event_dict = calendar_parser.parse_calendar(cal_str)
|
#Refresh the status file
|
||||||
event_list.append(event_dict)
|
f.write("")
|
||||||
|
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
|
||||||
#Start file handler to detect changes to calendar dir
|
for event in event_list:
|
||||||
observer = Observer()
|
next_alert, next_event = get_next_alert(event, current_time)
|
||||||
handler = FileChangeHandler(event_list) # Pass event_list here
|
if next_alert == None:
|
||||||
observer.schedule(handler, cal_dir, recursive=True)
|
continue
|
||||||
observer.start()
|
event_delta = next_alert-current_time
|
||||||
|
total_seconds = event_delta.total_seconds()
|
||||||
#Start main loop
|
human_readable_time = humanfriendly.format_timespan(total_seconds)
|
||||||
try:
|
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nRecur Dates: {}\nNext alert on: {} in {}\nRecur info: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, [str(i) for i in event["event_dates"]], next_alert, human_readable_time, event["recur_info"], event["alert_history"])
|
||||||
while True:
|
with open("status", 'a') as f:
|
||||||
with open("status", 'w') as f:
|
# Write the output to the file
|
||||||
#Refresh the status file
|
f.write(monitor_status)
|
||||||
f.write("")
|
f.write("\n")
|
||||||
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
|
process_alert(current_time, next_alert, event)
|
||||||
for event in event_list:
|
time.sleep(1)
|
||||||
next_alert, next_event = get_next_alert(event, current_time)
|
except KeyboardInterrupt:
|
||||||
if next_alert == None:
|
observer.stop()
|
||||||
continue
|
observer.join()
|
||||||
event_delta = next_alert-current_time
|
|
||||||
total_seconds = event_delta.total_seconds()
|
|
||||||
human_readable_time = humanfriendly.format_timespan(total_seconds)
|
|
||||||
monitor_status = f"""\
|
|
||||||
Current time: {current_time}
|
|
||||||
Monitoring: {event["summary"]}
|
|
||||||
Event date: {next_event}
|
|
||||||
Recur Dates: {[str(i) for i in event["event_dates"]]}
|
|
||||||
Next alert on: {next_alert} in {human_readable_time}
|
|
||||||
Recur info: {event["recur_info"]}
|
|
||||||
Alert history: {event["alert_history"]}\n"""
|
|
||||||
monitor_status = textwrap.dedent(monitor_status)
|
|
||||||
with open("status", 'a') as f:
|
|
||||||
# Write the output to the file
|
|
||||||
f.write(monitor_status)
|
|
||||||
f.write("\n")
|
|
||||||
process_alert(current_time, next_alert, next_event, event, config)
|
|
||||||
time.sleep(1)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
observer.stop()
|
|
||||||
observer.join()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
Loading…
Reference in New Issue