Compare commits

..

No commits in common. "025d0a353b71e397c27ac0ec0c55585a3a69918d" and "740ac039dd7cb9ba3c7fc592e9487ea7b7f67dd0" have entirely different histories.

6 changed files with 172 additions and 285 deletions

1
.gitignore vendored
View File

@ -1,5 +1,4 @@
config_local.toml
status
alert_history
log
__pycache__

View File

@ -1,18 +0,0 @@
install:
sudo apt-get update && sudo apt install python3-venv -y
sudo mkdir -p /opt/remindme_caldav/logs /etc/remindme_caldav
python3 -m venv /opt/remindme_caldav/.venv
cp remindme_caldav.py alert_processor.py /opt/remindme_caldav/
. /opt/remindme_caldav/.venv/bin/activate && pip3 install -r requirements.txt
sudo cp remindme_caldav.service /etc/systemd/system/
sudo cp config.toml /etc/remindme_caldav/config.toml
sudo systemctl daemon-reload
sudo systemctl enable remindme_caldav.service
sudo systemctl start remindme_caldav.service
uninstall:
sudo systemctl stop remindme_caldav.service
sudo systemctl disable remindme_caldav.service
rm -rf /opt/remindme_caldav
rm -rf /etc/remindme_caldav
rm /etc/systemd/system/remindme_caldav.service

View File

@ -1,96 +1,3 @@
# remindme_caldav
## A Calendar Alerting Daemon
## Purpose
This script is a simple calendar alerting daemon written in Python. It monitors
.ics files for changes and sends alerts based on the events' start times,
recurrence rules, and alert triggers defined within these files. The main
purpose of this script is to provide reminders or notifications about upcoming
events.
## How it Works
The script works by parsing .ics files using the `icalendar` library, which
converts them into a Python dictionary format for easier manipulation. It then
processes each event and calculates when the next alert should be triggered
based on the event's start time, recurrence rules, and alert triggers. If an
alert is due to trigger, it sends a notification via email or XMPP (an instant
messaging protocol).
The script also monitors for changes in the directory containing .ics files
using the `watchdog` library. When a file is modified, created, or deleted, it
updates its internal list of events accordingly.
## How to Use It
This script should be used with a calendar syncing service such as vdirsyncer.
vdirsyncer can be scheduled using cron to sync regularly from the CalDav
server.
To use this script, you need Python 3 installed on your system. You can install
the required libraries by running:
```bash
pip install -r requirements.txt
```
You also need a .toml configuration file with the following structure:
```toml
[app]
calendar_dir = "/path/to/your/ics/files"
email_address = "your-email@example.com"
smtp_server = "smtp.example.com"
smtp_port = 587
smtp_username = "your-username"
smtp_password = "your-password"
...
```
The config file is passed to the script with the `--config` argument.
You can then run the script with:
```bash
python3 remindme_caldav.py --config /path/to/your/config.toml
```
## Installation
A Makefile and systemd service file is also included for Debian/Ubuntu based
systems. Make sure to modify the config file in the source directory before
installation. Also, please ensure that the calendar_dir exists and syncing
from a CalDav server before running the script.
This Makefile does the following:
- install: Installs Python 3.11, creates a virtual environment in
/opt/remindme_caldav/.venv, installs dependencies from requirements.txt
into this virtual environment, copies the script to /opt/remindme_caldav/,
copies the config file to /etc/remindme_caldav and sets up the systemd
service file. It also sets up the logging dir in /opt/remindme_caldav/logs.
- uninstall: Stops and disables the systemd service, removes the installation
directory (/opt/remindme_caldav/), and deletes the systemd service file.
## Logging
The script uses Python's built-in logging module to handle logging. The
setup_logging(log_location) function sets up basic configuration for the
logger, including the log file location and format.
Log levels are used to categorize logs based on their severity: DEBUG, INFO,
WARNING, ERROR, CRITICAL. By default, the log level is set to INFO. This can
be modified by passing a --loglevel argument when running the script.
The setup_log_location(logdir) function sets up the locations for three types
of logs: log, status, and alert_history.
- Log Location: This is where script logs are stored. These logs contain
information about the general operation of the script, such as when it
starts or stops, what files it's processing, errors etc. The logdir argument
specifies the directory where these log files will be created.
- Status Location: This file contains information about the current state of
each event being monitored by the script. It includes details such as the
current time, the name of the event, its recurrence dates, and when the
next alert will be triggered. The purpose of this file is to provide a
real-time status update on what's happening in the script.
- Alert History Location: This file logs all alerts that have been sent out
by the script. It includes details such as the timestamp when the alert was
triggered, and the definition time of the alert. The purpose of this file
is to keep a record of all alerts that have been sent.
A simple script to send alerts/reminders for caldav events.

View File

@ -1,15 +1,16 @@
# Modify to your requirements. See readme for example.
# Modify to your requirements
[app]
calendar_dir =
calendar_dir = "FULL_PATH_TO_.ICS_CALENDAR_FILES"
[email]
smtp_server =
port =
username =
password =
recipient =
smtp_server = "SMTP.PROVIDER.DOMAIN"
port = 587
username = "YOUR_USERNAME"
password = "YOUR_PASSWORD"
recipient = "RECIPIENT_EMAIL_ADDRESS"
[xmpp]
jid =
password =
recipient =
jid = 'YOUR_USERNAME@SERVER_INSTANCE.DOMAIN'
password = 'YOUR_PASSWORD'
recipient = 'RECIPIENT_USERNAME@SERVER_INSTANCE.DOMAIN'
[notify-send]

View File

@ -12,81 +12,38 @@ from pathlib import Path
import argparse, textwrap, logging
from alert_processor import AlertProcessor
logger = logging.getLogger()
def setup_logger(loglevel):
"""Setup basic logging."""
loglevel = getattr(logging, loglevel.upper(), None)
def setup_log_location(logdir):
if not Path(logdir).is_dir():
raise FileNotFoundError(f"Log dir '{logdir}' does not exist. Be sure to create it first.")
log_location = os.path.join(logdir, "log")
status_location = os.path.join(logdir, "status")
alert_history_location = os.path.join(logdir, "alert_history")
return log_location, status_location, alert_history_location
if not isinstance(loglevel, int):
raise ValueError('Invalid log level: %s' % loglevel)
def setup_logging(log_location):
log_format='[%(levelname)s] %(asctime)s %(message)s'
logging.basicConfig(filename = log_location, format=log_format, level=logging.INFO)
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(loglevel)
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--logdir', type=str, help='Path to logfile directory', default = "logs")
parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level')
args = parser.parse_args()
if args.config is None:
logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1)
raise RuntimeError("No config file provided. Please use --config path_to_config.toml")
return args
def read_file(filename):
try:
return Path(filename).read_text()
except FileNotFoundError:
raise FileNotFoundError("Error: The specified file does not exist.")
except FileNotFoundError as e:
raise RuntimeError(f"Error: The specified file does not exist. {e}")
def parse_toml(content):
try:
config = toml.loads(content)
if config is None:
logging.error("Invalid config")
sys.exit(1)
return config
except Exception:
raise RuntimeError("Error: Failed to parse TOML file.")
def get_calendar_dir(config):
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1)
return cal_dir
def parse_calendar_files(cal_dir):
files = []
no_files_detected = True
logger.info(f"Looking for calendar files in {cal_dir}...")
while no_files_detected is True:
files = list(cal_dir.glob('*.ics'))
if len(files) != 0:
logger.info("Calendar files detected in sync location!")
no_files_detected = False
return files
def construct_initial_event_dict(cal_dir):
files = parse_calendar_files(cal_dir)
calendar_parser = CalendarParser()
event_list = []
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception as e:
logger.warning(f"Error parsing event, skipping. {file}. Error message {e}")
continue
event_list.append(event_dict)
return event_list
return toml.loads(content)
except Exception as e:
raise RuntimeError(f"Error: Failed to parse TOML file. {e}")
def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
@ -108,41 +65,46 @@ class FileChangeHandler(FileSystemEventHandler):
self.event_list = event_list
def on_modified(self, event):
logger.info(f"File modified: {event.src_path}")
logging.debug(f"File modified: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
logging.error(f"Not a valid file: {event.src_path}. Error: {e}")
return
try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e:
logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return
self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event):
logger.info(f"File deleted: {event.src_path}")
logging.debug(f"File deleted: {event.src_path}")
if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event):
logger.info(f"File created: {event.src_path}")
logging.debug(f"File created: {event.src_path}")
if not event.is_directory:
try:
with open(event.src_path, 'r') as f:
cal_str = f.read()
except Exception as e:
logger.warning(f"Not a valid file: {event.src_path}. Error: {e}")
logging.error(f"Not a valid file: {event.src_path}. Error: {e}")
return
try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e:
logger.warning(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return
self.handle_modified(old_event=None, event_dict=event_dict)
def handle_modified(self, old_event, event_dict, remove=False):
@ -150,9 +112,10 @@ class FileChangeHandler(FileSystemEventHandler):
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"]
new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash:
logger.info(f"Event with UID {old_event['uid']} has been modified or deleted")
logging.debug(f"Event with UID {old_event['uid']} has been modified or deleted")
self.event_list[i] = event_dict
break
else:
@ -160,7 +123,7 @@ class FileChangeHandler(FileSystemEventHandler):
else: # If remove is True, remove the event from the list
for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]:
logger.info(f"Event with UID {old_event['uid']} has been deleted")
logging.debug(f"Event with UID {old_event['uid']} has been deleted")
del self.event_list[i]
break
@ -168,6 +131,15 @@ class FileChangeHandler(FileSystemEventHandler):
class RecurringEventGenerator:
"""
A class to generate recurring events based on a start date and a recurrence rule.
Attributes:
dtstart (datetime): The starting date of the event series.
rrule (rrule): The recurrence rule for the event series.
Methods:
__init__(self, dtstart, rrule): Initializes the class with a start date and a recurrence rule.
generate(self): Generates the recurring events based on the start date and recurrence rule.
Returns a dictionary containing information about the recurring events.
"""
def __init__(self, dtstart, rrule):
self.dtstart = dtstart
@ -240,6 +212,15 @@ class CalendarParser:
def parse_calendar(self, cal_str):
"""
Parse a calendar string and process each event.
Args:
cal_str (str): The iCalendar string to be parsed.
Returns:
dict: A dictionary containing information about the processed events.
Raises:
RuntimeError: If there are no dates returned for an event or if there is an error calculating the event hash.
"""
# Parse the calendar
cal = self.parse_icalendar(cal_str)
@ -252,8 +233,6 @@ class CalendarParser:
generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
if len(event_dates) == 0:
logging.warning(f"No event dates for event: '{event['summary']}'")
valarms = self.process_valarm(event)
@ -274,7 +253,7 @@ class CalendarParser:
}
try:
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
except Exception:
except Exception as e:
raise RuntimeError("Error calculating event hash")
event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict
@ -282,15 +261,30 @@ class CalendarParser:
def parse_icalendar(self, cal_str):
"""
Parse a calendar string into an iCalendar object.
Args:
cal_str (str): The iCalendar string to be parsed.
Returns:
Calendar: An iCalendar object representing the parsed calendar.
Raises:
RuntimeError: If there is an error parsing the calendar.
"""
try:
return Calendar.from_ical(cal_str)
except Exception:
raise RuntimeError("Error parsing icalendar.")
except Exception as e:
raise RuntimeError(f"Error parsing calendar. Message from icalendar: {e}")
def process_event(self, event):
"""
Process an event from a parsed calendar and extract relevant information.
Args:
event (Event): An iCalendar event object to be processed.
Returns:
dict: A dictionary containing the extracted event information.
"""
event_info = {
"uid": None,
@ -307,12 +301,21 @@ class CalendarParser:
try:
event_info[info] = event[info]
except Exception:
logging.info(f"CalDav componant '{info}' missing for event {event['summary']}")
pass
return event_info
def dtstart_to_datetime(self, dtstart):
"""
Convert a date or datetime object into a datetime object with UTC timezone.
Args:
dtstart (date/datetime): The date or datetime to be converted.
Returns:
datetime: A datetime object representing the input date or datetime in UTC timezone.
Raises:
RuntimeError: If there is an error converting the input to a datetime object.
"""
# Ensure dates are always as datetime
try:
@ -320,12 +323,22 @@ class CalendarParser:
return dtstart.replace(tzinfo=pytz.UTC)
else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
except Exception:
raise RuntimeError("Error converting dtstart to datetime.")
except Exception as e:
raise RuntimeError(f"Error converting dtstart to datetime. Message: {e}")
def remove_exdates(self, exdates, recur_dates):
"""
Remove dates from a list of recurring event dates that are in the exdate list.
Args:
exdates (list): A list of datetime objects representing excluded dates.
recur_dates (list): A list of datetime objects representing recurring event dates.
Returns:
list: A list of datetime objects representing the remaining recurring event dates after removing the exdate dates.
Raises:
RuntimeError: If there is an error processing the exdates.
"""
if exdates != []:
try:
@ -334,27 +347,31 @@ class CalendarParser:
else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates]
except Exception:
raise RuntimeError("Error processing exdates.")
except Exception as e:
raise RuntimeError(f"Error processing exdates. Message {e}")
else:
return recur_dates
def process_valarm(self, event):
"""
Process VALARM components from an iCalendar event and extract trigger times.
Args:
event (Event): An iCalendar event object to be processed.
Returns:
list: A list of datetime objects representing the extracted trigger times.
"""
valarms = []
for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt
valarms.append(timedelta)
if len(valarms) == 0:
logging.info(f"No reminders for event: {event['summary']}")
return valarms
def get_next_alert(event, current_time):
"""
Returns the next alert that should be processed based on the current time.
This function returns the next alert that should be processed based on the current time.
"""
event_dates = event["event_dates"]
valarm_deltas = event["valarms"]
@ -368,91 +385,90 @@ def get_next_alert(event, current_time):
next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, next_event, event, config, alert_history_location):
def process_alert(current_time, next_alert, next_event, event, config):
"""
Processes a given alert and passes it to a messaging client.
This function processes a given alert and passes it to a messaging client.
"""
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0:
logger.info(f"First alert for '{event['summary']}' detected")
logging.debug(f"First alert for '{event['summary']}' detected")
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return
else:
logger.info(f"Posting alert for {event['summary']}!")
logging.debug(f"Posting alert for {event['summary']}!")
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
try:
processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event)
except Exception as e:
raise RuntimeError(f"Error sending alert for event. {e}")
with open(alert_history_location, 'a') as f:
raise RuntimeError(f"Error sending alert for event {event['summary']}. Message {e}")
#processor.send_xmpp(event, next_alert, next_event)
with open("alert_history", 'a') as f:
f.write(str(event))
return
def daemon(status_location, alert_history_location, config, event_list):
with open(status_location, 'w') as f:
f.write("") # Refresh the status file
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
try:
next_alert, next_event = get_next_alert(event, current_time)
except RuntimeError as e:
logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}")
continue
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open(status_location, 'a') as f:
f.write(monitor_status) # Write the output to the file
f.write("\n")
try:
process_alert(current_time, next_alert, next_event, event, config, alert_history_location)
except RuntimeError as e:
logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}")
return
def main():
# Parse args and initiate logging
# Parse args and config
args = parse_args()
log_location, status_location, alert_history_location = setup_log_location(args.logdir)
setup_logging(log_location)
logger = logging.getLogger()
content = read_file(args.config)
config = parse_toml(content)
# Redefine log level if args passed
if args.loglevel is not None:
numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer
if isinstance(numeric_level, int):
logger = logging.getLogger()
logger.setLevel(numeric_level) # Set the log level
# Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code
# Setup initial event_list
config_file = read_file(args.config)
config = parse_toml(config_file)
cal_dir = get_calendar_dir(config)
event_list = construct_initial_event_dict(cal_dir)
#Parse calendar events
calendar_parser = CalendarParser()
files = list(cal_dir.glob('*.ics'))
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception:
logging.error(f"Error parsing event, skipping. {file}")
continue
event_list.append(event_dict)
# Start file handler to detect changes to calendar dir
#Start file handler to detect changes to calendar dir
observer = Observer()
handler = FileChangeHandler(event_list)
handler = FileChangeHandler(event_list) # Pass event_list here
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
# Start main loop
#Start main loop
try:
while True:
daemon(status_location, alert_history_location, config, event_list)
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list:
next_alert, next_event = get_next_alert(event, current_time)
if next_alert == None:
continue
event_delta = next_alert-current_time
total_seconds = event_delta.total_seconds()
human_readable_time = humanfriendly.format_timespan(total_seconds)
monitor_status = f"""\
Current time: {current_time}
Monitoring: {event["summary"]}
Event date: {next_event}
Recur Dates: {[str(i) for i in event["event_dates"]]}
Next alert on: {next_alert} in {human_readable_time}
Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f:
# Write the output to the file
f.write(monitor_status)
f.write("\n")
process_alert(current_time, next_alert, next_event, event, config)
time.sleep(1)
except KeyboardInterrupt:
observer.stop()

View File

@ -1,18 +0,0 @@
[Unit]
Description=Calendar Alerting Daemon
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
RestartSec=1
User=root
ExecStart=/opt/remindme_caldav/.venv/bin/python3 -u /opt/remindme_caldav/remindme_caldav.py --config /etc/remindme_caldav/config.toml --logdir /opt/remindme_caldav/logs
Environment="PYTHONUNBUFFERED=1"
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=remindme_caldav
[Install]
WantedBy=multi-user.target