Compare commits

..

No commits in common. "1b225d08fdf7d24cc3a29bcf1ba59119397278b1" and "c41a9ae7cff42b58081c6b80617186b394e625c1" have entirely different histories.

5 changed files with 40 additions and 276 deletions

3
.gitignore vendored
View File

@ -1,3 +0,0 @@
config_local.toml
status
alert_history

View File

@ -1,16 +0,0 @@
# Modify to your requirements
[app]
calendar_dir = "FULL_PATH_TO_.ICS_CALENDAR_FILES"
[email]
smtp_server = "SMTP.PROVIDER.DOMAIN"
port = 587
username = "YOUR_USERNAME"
password = "YOUR_PASSWORD"
recipient = "RECIPIENT_EMAIL_ADDRESS"
[xmpp]
jid = 'YOUR_USERNAME@SERVER_INSTANCE.DOMAIN'
password = 'YOUR_PASSWORD'
recipient = 'RECIPIENT_USERNAME@SERVER_INSTANCE.DOMAIN'
[notify-send]

View File

@ -1,51 +0,0 @@
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import ssl
import toml
def send_email(event_summary, next_alert, next_event, config):
with open('config.toml', 'r') as f:
config = toml.load(f)
# Set up the SMTP server details
smtp_server = config["email"]["smtp_server"]
port = config["email"]["port"]
sender_email = config["email"]["username"]
receiver_email = config["email"]["recipient"]
password = config["email"]["password"]
# Event details
event_name = event_summary
event_date = next_event
event_delta = next_event - next_alert
# Create a multipart message and set headers
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = "Event Alert: '{}' @ {}".format(event_name, event_date)
# Add body to email
body = """\
Hi,
This is an alert for the event named '{}' which will occur on {}.
This event will start in '{}'
""".format(event_name, event_date, event_delta)
message.attach(MIMEText(body, "plain"))
text = message.as_string()
try:
# Create a secure SSL context and connect to the server
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, port) as server: # Use 'with' statement for automatic cleanup
server.ehlo() # Can be omitted
server.starttls(context=context) # Secure the connection
server.login(sender_email, password)
# Send email here
server.sendmail(sender_email, receiver_email, text)
except Exception as e:
# Print any error messages to stdout
print(e)
return print("Message sent via email")

View File

@ -1,126 +1,23 @@
from icalendar import Calendar, Event
import toml, argparse, os, sys, hashlib, json, pytz, glob, os, time
from icalendar import Calendar, Event, vRecur
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrulestr
from dateutil.rrule import rruleset, rrulestr
import datetime as dt
from datetime import date, time
import glob
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import email_alert
#import xmpp_alert
# Parse args
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
args = parser.parse_args()
if not args.config:
print("Error: No config file provided.")
sys.exit(1) # Exit with error code
elif not os.path.isfile(args.config):
print("Error: The specified config file does not exist.")
sys.exit(1) # Exit with error code
else:
print("Config file path: ", args.config)
# Get config
try:
with open(args.config, 'r') as f:
config = toml.load(f)
except Exception as e:
print("Error: Failed to parse TOML file.")
print(e)
sys.exit(1) # Exit with error code
cal_dir = config["app"]["calendar_dir"]
# Get all .ics files from your directory
files = glob.glob(os.path.join(cal_dir, '*.ics'))
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (dt.datetime, dt.timedelta)):
return str(o) # Convert the datetime or timedelta object to a string
return super().default(o)
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f"File modified: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
# Parse the calendar
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for i, old_event in enumerate(event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
if new_hash != old_hash: # If the hashes don't match, it means that the event has been modified or deleted
print("Event with UID {} has been modified or deleted".format(old_event["uid"]))
# Update the event in the list
event_list[i] = event_dict
else: # If the hashes match, it means that the event hasn't been modified
print("Event with UID {} hasn't been modified".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def on_deleted(self, event):
print(f"File deleted: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
for i, old_event in enumerate(event_list):
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
if old_event["uid"] == uid: # If the same event is found
print("Event with UID {} has been deleted".format(old_event["uid"]))
# Remove the event from the list
del event_list[i]
break
def on_created(self, event):
print(f"File created: {event.src_path}")
if not event.is_directory: # If it's a file and not a directory
print(str(dt.datetime.now()), "Sync detected, updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser(cal_str)
new_hash = self.calculate_event_hash(event_dict)
except Exception as i:
print("Failed to parse calendar event at: {}.\n Error:\n{}".format(event.src_path,i))
return
for old_event in event_list:
if old_event["uid"] == event_dict["uid"]: # If the same event is found
print("Event with UID {} already exists".format(old_event["uid"]))
break
else: # If no matching event is found in the list, add the new event to the list
event_list.append(event_dict)
def calculate_event_hash(self, event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
files = glob.glob(os.path.join('/home/mrsu/.local/share/caldav/personal-calendar/default', '*.ics'))
def calculate_recur_dates(dtstart, vrecur):
rule_str = "RRULE:{}".format(vrecur.to_ical().decode('utf-8'))
start_date = dtstart
if vrecur.get("COUNT") is None:
# If no COUNT, calculate an end date based on FREQ and INTERVAL to prevent generating too many dates
freq = vrecur.get('FREQ')[0]
# If it doesn't, calculate an end date based on FREQ and INTERVAL
freq = vrecur.get('FREQ')[0] # Get the first frequency (e.g., 'DAILY', 'WEEKLY', etc.)
interval = vrecur.get('INTERVAL')[0]
delta = None
@ -133,19 +30,23 @@ def calculate_recur_dates(dtstart, vrecur):
delta = relativedelta(years=interval)
count = 0
current_date = dt.datetime.now().replace(tzinfo=pytz.UTC)
origin_date = start_date
current_date = dt.datetime.today().date()
origin_date = start_date.date() if isinstance(start_date, dt.datetime) else start_date
while origin_date < current_date:
count += interval
origin_date += delta*interval
rule_str += ";COUNT={}".format(count+10)
else:
None
# If 'COUNT' exists, set it to a high number so that the rrulestr method will stop generating dates after the specified count
#rule_str += ";UNTIL=21001231T000000Z" # This sets an end date far in the future
ruleset = rrulestr(rule_str, dtstart=start_date)
# Generate future dates according to the rules
dates = list(ruleset)
return [d for d in dates if d > start_date]
return [d for d in dates][1:] # Remove first date as the same as start_date
def calendar_parser(cal_str):
# Parse the calendar
@ -155,8 +56,6 @@ def calendar_parser(cal_str):
if component.name == "VEVENT": # If it's a VEVENT, create a new event dictionary
uid = component.get("UID")
dtstart = component.get("DTSTART").dt
dtstart = dtstart if isinstance(dtstart, dt.datetime) else dt.datetime.combine(dtstart, dt.time.min) # Ensure dates are always as datetime
dtstart = dtstart.replace(tzinfo=pytz.UTC)
summary = component.get("SUMMARY")
vrecur = component.get("RRULE")
recur_dates = [None]
@ -169,33 +68,9 @@ def calendar_parser(cal_str):
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm.get("TRIGGER").dt
valarm_list.append(timedelta) # Add this VALARM to the list
event_dict = {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_history": []}
handler = FileChangeHandler() # Create an instance of the FileChangeHandler class
new_hash = handler.calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
def get_next_alert(event, current_time):
recur_dates = event["recur_dates"]
valarm_deltas = event["valarm"]
dtstart = event["dtstart"]
next_event = None
if recur_dates[0] is None:
next_event = dtstart
elif current_time <= dtstart:
next_event = dtstart
else:
next_event = recur_dates[0]
if current_time > next_event:
return None, next_event
next_alert_list = [next_event + i for i in valarm_deltas if next_event + i >= current_time]
if next_alert_list == []:
next_alert = next_event
else:
next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event
return {"uid": str(uid), "dtstart": dtstart, "summary": summary, "recur_dates": recur_dates, "valarm": valarm_list, "alert_triggered": [False*len(valarm_list)]}
# Create initial event_list using calendar_parser
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
@ -203,39 +78,34 @@ for file in files:
event_dict = calendar_parser(cal_str)
event_list.append(event_dict)
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory: # If it's a file and not a directory
print("Updating events")
with open(event.src_path, 'r') as f:
cal_str = f.read()
# Parse the calendar
event_dict = calendar_parser(cal_str)
event_list.append(event_dict)
observer = Observer()
handler = FileChangeHandler()
observer.schedule(handler, cal_dir, recursive=True)
observer.schedule(handler, '/home/mrsu/.local/share/caldav/personal-calendar/default', recursive=True)
observer.start()
try:
while True:
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
current_time = dt.datetime.now()
for event in event_list:
next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None:
continue
monitor_status = "Current time: {}\nMonitoring: {}\nEvent date: {}\nNext alert on: {}\nAlert history: {}\n".format(current_time, event["summary"], next_event, next_alert, event["alert_history"])
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
if current_time >= next_alert and next_alert < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0:
print("First alert for '{}' detected".format(event["summary"]))
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
continue
else:
print("Posting alert for {}!".format(event["summary"]))
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
#xmpp_alert.send_xmpp(event["summary"], next_alert, next_event, args.config)
email_alert.send_email(event["summary"], next_alert, next_event, args.config)
with open("alert_history", 'a') as f:
f.write(event)
time.sleep(1)
None
#event["valarm"] = [a if isinstance(a, dt.datetime) else dt.datetime.combine(a, dt.datetime.min.time()) for a in event["valarm"]]
#print(event["summary"], event["freq"], current_time, [str(i.replace(tzinfo=None)) for i in event['valarm']], event["dtstart"])
print(event["summary"], str(event["dtstart"]), [str(i) for i in event["recur_dates"]])#, [str(alarm_time) for alarm_time in event['valarm']])
# if any(abs(current_time - alarm_time.total_seconds() <= 1 for alarm_time in event['valarm']) and event["alert_triggered"][0] is False:
# print("Alert for event with UID:", event["uid"], "Summary:", event["summary"])
# event["alert_triggered"][0] = True
time.sleep(1) # Wait for a second before checking again
except KeyboardInterrupt:
observer.stop()
observer.join()

View File

@ -1,36 +0,0 @@
import slixmpp
import toml
class SendMsgBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, recipient, message):
super().__init__(jid, password)
self.recipient = recipient
self.msg = message
self.add_event_handler("session_start", self.start)
def start(self, event):
self.send_presence()
self.get_roster()
self.send_message(mto=self.recipient, mbody=self.msg, mtype='chat')
self.disconnect()
def send_xmpp(event_summary, next_alert, next_event, config):
with open('config.toml', 'r') as f:
config = toml.load(f)
jid = config["xmpp"]["jid"]
password = config["xmpp"]["password"] # replace with your password
recipient = config["xmpp"]["recipient"]
message = """\
Hi,
This is an alert for the event named '{}' which will occur on {}.
This event will start in '{}'
""".format(event_summary, next_event, next_alert)
bot = SendMsgBot(jid, password, recipient, message)
bot.register_plugin('xep_0030') # Service Discovery
bot.register_plugin('xep_0199') # XMPP Ping
bot.connect()
bot.process(forever=False)
return print("Message sent via XMPP")