Compare commits

...

2 Commits

Author SHA1 Message Date
mrsu 1b225d08fd modify datetime 2024-02-08 15:32:49 +00:00
mrsu eb6b8db0ea alerting and filehandling
Added both xmpp and email alerting
Proper FileHandling to manage modified, deleted and new events
Argument parsing and added toml config
2024-02-06 19:02:44 +00:00
5 changed files with 276 additions and 40 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
config_local.toml
status
alert_history

16
config.toml Normal file
View File

@ -0,0 +1,16 @@
# Modify to your requirements
[app]
calendar_dir = "FULL_PATH_TO_.ICS_CALENDAR_FILES"
[email]
smtp_server = "SMTP.PROVIDER.DOMAIN"
port = 587
username = "YOUR_USERNAME"
password = "YOUR_PASSWORD"
recipient = "RECIPIENT_EMAIL_ADDRESS"
[xmpp]
jid = 'YOUR_USERNAME@SERVER_INSTANCE.DOMAIN'
password = 'YOUR_PASSWORD'
recipient = 'RECIPIENT_USERNAME@SERVER_INSTANCE.DOMAIN'
[notify-send]

51
email_alert.py Normal file
View File

@ -0,0 +1,51 @@
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import ssl
import toml
def send_email(event_summary, next_alert, next_event, config):
with open('config.toml', 'r') as f:
config = toml.load(f)
# Set up the SMTP server details
smtp_server = config["email"]["smtp_server"]
port = config["email"]["port"]
sender_email = config["email"]["username"]
receiver_email = config["email"]["recipient"]
password = config["email"]["password"]
# Event details
event_name = event_summary
event_date = next_event
event_delta = next_event - next_alert
# Create a multipart message and set headers
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = "Event Alert: '{}' @ {}".format(event_name, event_date)
# Add body to email
body = """\
Hi,
This is an alert for the event named '{}' which will occur on {}.
This event will start in '{}'
""".format(event_name, event_date, event_delta)
message.attach(MIMEText(body, "plain"))
text = message.as_string()
try:
# Create a secure SSL context and connect to the server
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, port) as server: # Use 'with' statement for automatic cleanup
server.ehlo() # Can be omitted
server.starttls(context=context) # Secure the connection
server.login(sender_email, password)
# Send email here
server.sendmail(sender_email, receiver_email, text)
except Exception as e:
# Print any error messages to stdout
print(e)
return print("Message sent via email")

View File

@ -1,23 +1,126 @@
from icalendar import Calendar, Event, vRecur from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from dateutil.rrule import rruleset, rrulestr from dateutil.rrule import rrulestr
import datetime as dt import datetime as dt
from datetime import date, time
import glob
import os
import time import time
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
import email_alert
#import xmpp_alert
# Parse args
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
args = parser.parse_args()
if not args.config:
print("Error: No config file provided.")
sys.exit(1) # Exit with error code
elif not os.path.isfile(args.config):
print("Error: The specified config file does not exist.")
sys.exit(1) # Exit with error code
else:
print("Config file path: ", args.config)
# Get config
try:
with open(args.config, 'r') as f:
config = toml.load(f)
except Exception as e:
print("Error: Failed to parse TOML file.")
print(e)
sys.exit(1) # Exit with error code
cal_dir = config["app"]["calendar_dir"]
# Get all .ics files from your directory # Get all .ics files from your directory
files = glob.glob(os.path.join('/home/mrsu/.local/share/caldav/personal-calendar/default', '*.ics')) files = glob.glob(os.path.join(cal_dir, '*.ics'))
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (dt.datetime, dt.timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f"File modified: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
# Parse the calendar
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for i, old_event in enumerate(event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
if new_hash != old_hash: # If the hashes don't match, it means that the event has been modified or deleted
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
# Update the event in the list
event_list[i] = event_dict
else: # If the hashes match, it means that the event hasn't been modified
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def on_deleted(self, event):
print(f"File deleted: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
for i, old_event in enumerate(event_list):
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
if old_event["uid"] == uid: # If the same event is found
print("Event with UID {} has been deleted".format(old_event["uid"]))
# Remove the event from the list
del event_list[i]
break
def on_created(self, event):
print(f"File created: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for old_event in event_list:
if old_event["uid"] == event_dict["uid"]: # If the same event is found
print("Event with UID {} already exists".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def calculate_event_hash(self, event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
def calculate_recur_dates(dtstart, vrecur): def calculate_recur_dates(dtstart, vrecur):
rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8')) rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8'))
start_date = dtstart start_date = dtstart
if vrecur.get("COUNT") is None: if vrecur.get("COUNT") is None:
# If it doesn't, calculate an end date based on FREQ and INTERVAL # If no COUNT, calculate an end date based on FREQ and INTERVAL to prevent generating too many dates
freq = vrecur.get('FREQ')[0] # Get the first frequency (e.g., 'DAILY', 'WEEKLY', etc.) freq = vrecur.get('FREQ')[0]
interval = vrecur.get('INTERVAL')[0] interval = vrecur.get('INTERVAL')[0]
delta = None delta = None
@ -30,23 +133,19 @@ def calculate_recur_dates(dtstart, vrecur):
delta = relativedelta(years=interval) delta = relativedelta(years=interval)
count = 0 count = 0
current_date = dt.datetime.today().date() current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
origin_date = start_date.date() if isinstance(start_date, dt.datetime) else start_date origin_date = start_date
while origin_date < current_date: while origin_date < current_date:
count += interval count += interval
origin_date += delta*interval origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10) rule_str += ";COUNT={}".format(count+10)
else:
None
# If 'COUNT' exists, set it to a high number so that the rrulestr method will stop generating dates after the specified count
#rule_str += ";UNTIL=21001231T000000Z" # This sets an end date far in the future
ruleset = rrulestr(rule_str, dtstart=start_date) ruleset = rrulestr(rule_str, dtstart=start_date)
# Generate future dates according to the rules # Generate future dates according to the rules
dates = list(ruleset) dates = list(ruleset)
return [d for d in dates][1:] # Remove first date as the same as start_date return [d for d in dates if d > start_date]
def calendar_parser(cal_str): def calendar_parser(cal_str):
# Parse the calendar # Parse the calendar
@ -55,7 +154,9 @@ def calendar_parser(cal_str):
for component in cal.walk(): for component in cal.walk():
if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary
uid = component.get("UID") uid = component.get("UID")
dtstart = component.get("DTSTART").dt dtstart = component.get("DTSTART").dt
dtstart = dtstart if isinstance(dtstart, dt.datetime) else dt.datetime.combine(dtstart, dt.time.min) # Ensure dates are always as datetime
dtstart = dtstart.replace(tzinfo=pytz.UTC)
summary = component.get("SUMMARY") summary = component.get("SUMMARY")
vrecur = component.get("RRULE") vrecur = component.get("RRULE")
recur_dates = [None] recur_dates = [None]
@ -68,9 +169,33 @@ def calendar_parser(cal_str):
valarm = Event.from_ical(subcomponent.to_ical()) valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm.get("TRIGGER").dt timedelta = valarm.get("TRIGGER").dt
valarm_list.append(timedelta) # Add this VALARM to the list valarm_list.append(timedelta) # Add this VALARM to the list
event_dict = {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_history": []}
return {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_triggered": [False*len(valarm_list)]} handler = FileChangeHandler() # Create an instance of the FileChangeHandler class
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def get_next_alert(event, current_time):
recur_dates = event["recur_dates"]
valarm_deltas = event["valarm"]
dtstart = event["dtstart"]
next_event = None
if recur_dates[0] is None:
next_event = dtstart
elif current_time <= dtstart:
next_event = dtstart
else:
next_event = recur_dates[0]
if current_time > next_event:
return None, next_event
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if next_alert_list == []:
next_alert = next_event
else:
next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event
# Create initial event_list using calendar_parser
event_list = [] # List to hold dictionaries for each event event_list = [] # List to hold dictionaries for each event
for file in files: for file in files:
with open(file, 'r') as f: with open(file, 'r') as f:
@ -78,34 +203,39 @@ for file in files:
event_dict = calendar_parser(cal_str) event_dict = calendar_parser(cal_str)
event_list.append(event_dict) event_list.append(event_dict)
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory: # If it's a file and not a directory
print("Updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
# Parse the calendar
event_dict = calendar_parser(cal_str)
event_list.append(event_dict)
observer = Observer() observer = Observer()
handler = FileChangeHandler() handler = FileChangeHandler()
observer.schedule(handler, '/home/mrsu/.local/share/caldav/personal-calendar/default', recursive=True) observer.schedule(handler, cal_dir, recursive=True)
observer.start() observer.start()
try: try:
while True: while True:
current_time = dt.datetime.now() with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list: for event in event_list:
None next_alert, next_event = get_next_alert(event, current_time)
#event["valarm"] = [a if isinstance(a, dt.datetime) else dt.datetime.combine(a, dt.datetime.min.time()) for a in event["valarm"]] if next_alert == None:
#print(event["summary"], event["freq"], current_time, [str(i.replace(tzinfo=None)) for i in event['valarm']], event["dtstart"]) continue
print(event["summary"], str(event["dtstart"]), [str(i) for i in event["recur_dates"]])#, [str(alarm_time) for alarm_time in event['valarm']]) monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nNext alert on: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, next_alert, event["alert_history"])
# if any(abs(current_time - alarm_time.total_seconds() <= 1 for alarm_time in event['valarm']) and event["alert_triggered"][0] is False: with open("status", 'a') as f:
# print("Alert for event with UID:", event["uid"], "Summary:", event["summary"]) # Write the output to the file
# event["alert_triggered"][0] = True f.write(monitor_status)
f.write("\n")
time.sleep(1) # Wait for a second before checking again if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"]))
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
continue
else:
print("Posting alert for {}!".format(event["summary"]))
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
#xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config)
email_alert.send_email(event["summary"], next_alert, next_event, args.config)
with open("alert_history", 'a') as f:
f.write(event)
time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
observer.stop() observer.stop()
observer.join() observer.join()

36
xmpp_alert.py Normal file
View File

@ -0,0 +1,36 @@
import slixmpp
import toml
class SendMsgBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, recipient, message):
super().__init__(jid, password)
self.recipient = recipient
self.msg = message
self.add_event_handler("session_start", self.start)
def start(self, event):
self.send_presence()
self.get_roster()
self.send_message(mto=self.recipient, mbody=self.msg, mtype='chat')
self.disconnect()
def send_xmpp(event_summary, next_alert, next_event, config):
with open('config.toml', 'r') as f:
config = toml.load(f)
jid = config["xmpp"]["jid"]
password = config["xmpp"]["password"] # replace with your password
recipient = config["xmpp"]["recipient"]
message = """\
Hi,
This is an alert for the event named '{}' which will occur on {}.
This event will start in '{}'
""".format(event_summary, next_event, next_alert)
bot = SendMsgBot(jid, password, recipient, message)
bot.register_plugin('xep_0030') # Service Discovery
bot.register_plugin('xep_0199') # XMPP Ping
bot.connect()
bot.process(forever=False)
return print("Message sent via XMPP")