Compare commits

...

17 Commits

Author SHA1 Message Date
Sam 025d0a353b Update remindme_caldav service to use log directory
- Changed ExecStart command to include a new log directory option (--logdir) instead of a single log file (--logfile).
2024-02-16 13:11:15 +00:00
Sam 1fa0bfb310 Update Makefile and README.md
- Updated install command in Makefile to create /opt/remindme_caldav/logs.
- Added information about modifying the config file before installation, ensuring calendar syncing from a CalDav server, and setting up the logging directory in README.md.
- Added section about logging in README.md, including details on log levels, log locations, status location, and alert history location.
2024-02-16 13:06:43 +00:00
Sam a5583d981b Update log directory check in setup_log_location function
* Updated the condition to check if the log directory exists by using Path(logdir).is_dir() instead of Path.is_dir(logdir).
2024-02-16 12:36:05 +00:00
Sam 64516db02a Update logging and file handling
* Added setup_log_location function to handle log directory creation and validation.
* Updated setup_logging function to accept a log location instead of a log file name.
* Updated parse_args function to handle log directory argument and validate the presence of a config file.
* Updated read_file function to raise an error if the specified file does not exist.
* Updated parse_toml function to raise an error if the parsed TOML content is None.
* Added get_calendar_dir function to validate the existence of the calendar directory.
* Added parse_calendar_files function to find and return a list of .ics files in the specified directory.
* Added construct_initial_event_dict function to parse each .ics file into an event dictionary and store them in a list.
* Updated FileChangeHandler class to log warnings instead of errors for invalid or unparseable files.
* Updated CalendarParser class to log warnings if no event dates are found for an event, or if the calendar component is missing.
* Added daemon function to monitor events and update status files at regular intervals.
* Updated main function to use setup_log_location, parse_args, get_calendar_dir, and construct_initial_event_dict functions.
2024-02-16 11:57:51 +00:00
Sam 1176ad9b67 Update systemd service file for remindme_caldav
- Modify the standard output and error to syslog.
- Add SyslogIdentifier for easier identification in logs.
2024-02-16 10:21:21 +00:00
mrsu ae7965c60b Update installation directory in Makefile 2024-02-16 00:53:19 +00:00
mrsu f604dcbae3 Update logging and add file detection for calendar files
- Updated setup_logging function to accept a logfile argument with default value "log"
- Changed basicConfig to use the provided logfile argument
- Updated parse_args to set default logfile value to "log" in current directory
- Added while loop to detect calendar files in sync location before proceeding with main logic
- Updated main function to call setup_logging with args.logfile and added file detection logic
2024-02-16 00:48:38 +00:00
mrsu d892cdbd4b Update logging and error handling in remindme_caldav.py
- Updated the logging configuration to use a custom format and set the log level based on command line arguments.
- Added more detailed error messages for file not found, failed TOML parsing, and failed calendar parsing.
- Improved exception handling during event processing and added more informative error messages.
2024-02-16 00:34:53 +00:00
mrsu a2a508fd31 Update Makefile and service file for installing remindme_caldav
- Create new directories in /etc/
- Update the install command to include the new directories creation
2024-02-15 22:54:56 +00:00
Sam aeb1eddbbb Update uninstall process to remove additional files and directories
- Updated the `uninstall` target in Makefile to also remove the `/etc/remindme_caldav` directory.
2024-02-15 20:03:59 +00:00
Sam 22e15fc49d Update install process and add configuration file management
- Added mkdir command to create /etc/remindme_caldav directory.
- Copied config.toml to the new directory.
- Updated Makefile install instructions to include these changes.
- Updated comment in config.toml to provide a link to the readme for an example configuration.
- Removed placeholder values from all sections of the config file.
- Added check to ensure there are .ics files in the destination location before parsing them.
- If no files are found, log a message and exit with an error code.
2024-02-15 19:56:40 +00:00
Sam 4deeda0964 Update log file path and add logging to service
- Updated the `parse_args()` function to include a new argument `--logfile`, which allows users to specify a custom log file. The default value is "none".
- Modified the `main()` function to check if the user provided a log file using the `--logfile` argument. If a log file is specified, it adds a FileHandler to the logger and sets the logging level based on the `--loglevel` argument.
- Updated the `remindme_caldav.service` file to include the `--logfile` argument in the `ExecStart` command. This ensures that the log file is used when the service starts.
2024-02-15 18:46:33 +00:00
Sam 89994c1f0a Update logging and error handling
* Move from basic logging to more detailed logging with a log file.
* Improve error handling in several functions.
* Modify the way log messages are displayed.
2024-02-15 18:12:13 +00:00
Sam ef00f112f3 Update error handling and logging in remindme_caldav.py
- Replace RuntimeError with logging.error.
- Add sys.exit(1) to terminate the program when a critical error occurs.
2024-02-15 17:48:18 +00:00
Sam 30dc31344c Update Makefile and Python scripts
- Updated the install command to use python3-venv instead of python3.11-venv.
- Added alert_processor.py to the list of files copied to /opt/remindme_caldav/.
- Changed the activation of the virtual environment and pip installation commands.
- Updated logging to not use f-strings
2024-02-15 17:02:22 +00:00
Sam 33244c1994 Fix formatting of Makefile 2024-02-15 15:39:55 +00:00
Sam 8fe759673d Update Makefile, README and service file for remindme_caldav
- The Makefile now includes installation instructions for Debian/Ubuntu based systems.
- The README has been updated with more detailed descriptions of the script's purpose, how it works, and how to use it.
- The service file has been added to manage the remindme_caldav daemon on a systemd-based system.
2024-02-15 15:21:37 +00:00
6 changed files with 286 additions and 173 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
config_local.toml config_local.toml
status status
alert_history alert_history
log
__pycache__ __pycache__

18
Makefile Normal file
View File

@ -0,0 +1,18 @@
install:
sudo apt-get update && sudo apt install python3-venv -y
sudo mkdir -p /opt/remindme_caldav/logs /etc/remindme_caldav
python3 -m venv /opt/remindme_caldav/.venv
cp remindme_caldav.py alert_processor.py /opt/remindme_caldav/
. /opt/remindme_caldav/.venv/bin/activate && pip3 install -r requirements.txt
sudo cp remindme_caldav.service /etc/systemd/system/
sudo cp config.toml /etc/remindme_caldav/config.toml
sudo systemctl daemon-reload
sudo systemctl enable remindme_caldav.service
sudo systemctl start remindme_caldav.service
uninstall:
sudo systemctl stop remindme_caldav.service
sudo systemctl disable remindme_caldav.service
rm -rf /opt/remindme_caldav
rm -rf /etc/remindme_caldav
rm /etc/systemd/system/remindme_caldav.service

View File

@ -1,3 +1,96 @@
# remindme_caldav # remindme_caldav
## A Calendar Alerting Daemon
## Purpose
This script is a simple calendar alerting daemon written in Python. It monitors
.ics files for changes and sends alerts based on the events' start times,
recurrence rules, and alert triggers defined within these files. The main
purpose of this script is to provide reminders or notifications about upcoming
events.
## How it Works
The script works by parsing .ics files using the `icalendar` library, which
converts them into a Python dictionary format for easier manipulation. It then
processes each event and calculates when the next alert should be triggered
based on the event's start time, recurrence rules, and alert triggers. If an
alert is due to trigger, it sends a notification via email or XMPP (an instant
messaging protocol).
The script also monitors for changes in the directory containing .ics files
using the `watchdog` library. When a file is modified, created, or deleted, it
updates its internal list of events accordingly.
## How to Use It
This script should be used with a calendar syncing service such as vdirsyncer.
vdirsyncer can be scheduled using cron to sync regularly from the CalDav
server.
To use this script, you need Python 3 installed on your system. You can install
the required libraries by running:
```bash
pip install -r requirements.txt
```
You also need a .toml configuration file with the following structure:
```toml
[app]
calendar_dir = "/path/to/your/ics/files"
email_address = "your-email@example.com"
smtp_server = "smtp.example.com"
smtp_port = 587
smtp_username = "your-username"
smtp_password = "your-password"
...
```
The config file is passed to the script with the `--config` argument.
You can then run the script with:
```bash
python3 remindme_caldav.py --config /path/to/your/config.toml
```
## Installation
A Makefile and systemd service file is also included for Debian/Ubuntu based
systems. Make sure to modify the config file in the source directory before
installation. Also, please ensure that the calendar_dir exists and syncing
from a CalDav server before running the script.
This Makefile does the following:
- install: Installs Python 3.11, creates a virtual environment in
/opt/remindme_caldav/.venv, installs dependencies from requirements.txt
into this virtual environment, copies the script to /opt/remindme_caldav/,
copies the config file to /etc/remindme_caldav and sets up the systemd
service file. It also sets up the logging dir in /opt/remindme_caldav/logs.
- uninstall: Stops and disables the systemd service, removes the installation
directory (/opt/remindme_caldav/), and deletes the systemd service file.
## Logging
The script uses Python's built-in logging module to handle logging. The
setup_logging(log_location) function sets up basic configuration for the
logger, including the log file location and format.
Log levels are used to categorize logs based on their severity: DEBUG, INFO,
WARNING, ERROR, CRITICAL. By default, the log level is set to INFO. This can
be modified by passing a --loglevel argument when running the script.
The setup_log_location(logdir) function sets up the locations for three types
of logs: log, status, and alert_history.
- Log Location: This is where script logs are stored. These logs contain
information about the general operation of the script, such as when it
starts or stops, what files it's processing, errors etc. The logdir argument
specifies the directory where these log files will be created.
- Status Location: This file contains information about the current state of
each event being monitored by the script. It includes details such as the
current time, the name of the event, its recurrence dates, and when the
next alert will be triggered. The purpose of this file is to provide a
real-time status update on what's happening in the script.
- Alert History Location: This file logs all alerts that have been sent out
by the script. It includes details such as the timestamp when the alert was
triggered, and the definition time of the alert. The purpose of this file
is to keep a record of all alerts that have been sent.
A simple script to send alerts/reminders for caldav events.

View File

@ -1,16 +1,15 @@
# Modify to your requirements # Modify to your requirements. See readme for example.
[app] [app]
calendar_dir = "FULL_PATH_TO_.ICS_CALENDAR_FILES" calendar_dir =
[email] [email]
smtp_server = "SMTP.PROVIDER.DOMAIN" smtp_server =
port = 587 port =
username = "YOUR_USERNAME" username =
password = "YOUR_PASSWORD" password =
recipient = "RECIPIENT_EMAIL_ADDRESS" recipient =
[xmpp] [xmpp]
jid = 'YOUR_USERNAME@SERVER_INSTANCE.DOMAIN' jid =
password = 'YOUR_PASSWORD' password =
recipient = 'RECIPIENT_USERNAME@SERVER_INSTANCE.DOMAIN' recipient =
[notify-send]

View File

@ -12,38 +12,81 @@ from pathlib import Path
import argparse, textwrap, logging import argparse, textwrap, logging
from alert_processor import AlertProcessor from alert_processor import AlertProcessor
def setup_logger(loglevel): logger = logging.getLogger()
"""Setup basic logging."""
loglevel = getattr(logging, loglevel.upper(), None)
if not isinstance(loglevel, int): def setup_log_location(logdir):
raise ValueError('Invalid log level: %s' % loglevel) if not Path(logdir).is_dir():
raise FileNotFoundError(f"Log dir '{logdir}' does not exist. Be sure to create it first.")
log_location = os.path.join(logdir, "log")
status_location = os.path.join(logdir, "status")
alert_history_location = os.path.join(logdir, "alert_history")
return log_location, status_location, alert_history_location
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s') def setup_logging(log_location):
logger = logging.getLogger() log_format='[%(levelname)s] %(asctime)s %(message)s'
logger.setLevel(loglevel) logging.basicConfig(filename = log_location, format=log_format, level=logging.INFO)
def parse_args(): def parse_args():
"""Parse command line arguments.""" """Parse command line arguments."""
parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python") parser = argparse.ArgumentParser(description="A simple calendar alerting daemon written in Python")
parser.add_argument('--config', type=str, help='Path to config file. Must be .toml') parser.add_argument('--config', type=str, help='Path to config file. Must be .toml')
parser.add_argument('--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set the logging level') parser.add_argument('--logdir', type=str, help='Path to logfile directory', default = "logs")
parser.add_argument('--loglevel', help="Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
args = parser.parse_args() args = parser.parse_args()
if args.config is None: if args.config is None:
raise RuntimeError("No config file provided. Please use --config path_to_config.toml") logger.error("No config file provided. Please use --config path_to_config.toml")
sys.exit(1)
return args return args
def read_file(filename): def read_file(filename):
try: try:
return Path(filename).read_text() return Path(filename).read_text()
except FileNotFoundError as e: except FileNotFoundError:
raise RuntimeError(f"Error: The specified file does not exist. {e}") raise FileNotFoundError("Error: The specified file does not exist.")
def parse_toml(content): def parse_toml(content):
try: try:
return toml.loads(content) config = toml.loads(content)
if config is None:
logging.error("Invalid config")
sys.exit(1)
return config
except Exception:
raise RuntimeError("Error: Failed to parse TOML file.")
def get_calendar_dir(config):
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
logger.error(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1)
return cal_dir
def parse_calendar_files(cal_dir):
files = []
no_files_detected = True
logger.info(f"Looking for calendar files in {cal_dir}...")
while no_files_detected is True:
files = list(cal_dir.glob('*.ics'))
if len(files) != 0:
logger.info("Calendar files detected in sync location!")
no_files_detected = False
return files
def construct_initial_event_dict(cal_dir):
files = parse_calendar_files(cal_dir)
calendar_parser = CalendarParser()
event_list = []
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception as e: except Exception as e:
raise RuntimeError(f"Error: Failed to parse TOML file. {e}") logger.warning(f"Error parsing event, skipping. {file}. Error message {e}")
continue
event_list.append(event_dict)
return event_list
def calculate_event_hash(event): def calculate_event_hash(event):
return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest() return hashlib.md5(json.dumps(event, sort_keys=True, cls=DateTimeEncoder).encode()).hexdigest()
@ -65,46 +108,41 @@ class FileChangeHandler(FileSystemEventHandler):
self.event_list = event_list self.event_list = event_list
def on_modified(self, event): def on_modified(self, event):
logging.debug(f"File modified: {event.src_path}") logger.info(f"File modified: {event.src_path}")
if not event.is_directory: if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
logging.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.error(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
def on_deleted(self, event): def on_deleted(self, event):
logging.debug(f"File deleted: {event.src_path}") logger.info(f"File deleted: {event.src_path}")
if not event.is_directory: if not event.is_directory:
uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension uid = os.path.splitext(os.path.basename(event.src_path))[0] # Get the UID from the file path without extension
self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True) self.handle_modified(old_event=None, event_dict={"uid": uid}, remove=True)
def on_created(self, event): def on_created(self, event):
logging.debug(f"File created: {event.src_path}") logger.info(f"File created: {event.src_path}")
if not event.is_directory: if not event.is_directory:
try: try:
with open(event.src_path, 'r') as f: with open(event.src_path, 'r') as f:
cal_str = f.read() cal_str = f.read()
except Exception as e: except Exception as e:
logging.error(f"Not a valid file: {event.src_path}. Error: {e}") logger.warning(f"Not a valid file: {event.src_path}. Error: {e}")
return return
try: try:
event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method event_dict = self.calendar_parser.parse_calendar(cal_str) # Use the instance to call parse_calendar method
except Exception as e: except Exception as e:
logging.error(f"Failed to parse calendar event at: {event.src_path}. Error: {e}") logger.warning(f"Failed to parse calendar event at: {event.src_path}. Error: {e}")
return return
self.handle_modified(old_event=None, event_dict=event_dict) self.handle_modified(old_event=None, event_dict=event_dict)
def handle_modified(self, old_event, event_dict, remove=False): def handle_modified(self, old_event, event_dict, remove=False):
@ -112,10 +150,9 @@ class FileChangeHandler(FileSystemEventHandler):
for i, old_event in enumerate(self.event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
old_hash = old_event["hash"] old_hash = old_event["hash"]
new_hash = calculate_event_hash(event_dict) new_hash = calculate_event_hash(event_dict)
if new_hash != old_hash: if new_hash != old_hash:
logging.debug(f"Event with UID {old_event['uid']} has been modified or deleted") logger.info(f"Event with UID {old_event['uid']} has been modified or deleted")
self.event_list[i] = event_dict self.event_list[i] = event_dict
break break
else: else:
@ -123,7 +160,7 @@ class FileChangeHandler(FileSystemEventHandler):
else: # If remove is True, remove the event from the list else: # If remove is True, remove the event from the list
for i, old_event in enumerate(self.event_list): for i, old_event in enumerate(self.event_list):
if old_event["uid"] == event_dict["uid"]: if old_event["uid"] == event_dict["uid"]:
logging.debug(f"Event with UID {old_event['uid']} has been deleted") logger.info(f"Event with UID {old_event['uid']} has been deleted")
del self.event_list[i] del self.event_list[i]
break break
@ -131,15 +168,6 @@ class FileChangeHandler(FileSystemEventHandler):
class RecurringEventGenerator: class RecurringEventGenerator:
""" """
A class to generate recurring events based on a start date and a recurrence rule. A class to generate recurring events based on a start date and a recurrence rule.
Attributes:
dtstart (datetime): The starting date of the event series.
rrule (rrule): The recurrence rule for the event series.
Methods:
__init__(self, dtstart, rrule): Initializes the class with a start date and a recurrence rule.
generate(self): Generates the recurring events based on the start date and recurrence rule.
Returns a dictionary containing information about the recurring events.
""" """
def __init__(self, dtstart, rrule): def __init__(self, dtstart, rrule):
self.dtstart = dtstart self.dtstart = dtstart
@ -212,15 +240,6 @@ class CalendarParser:
def parse_calendar(self, cal_str): def parse_calendar(self, cal_str):
""" """
Parse a calendar string and process each event. Parse a calendar string and process each event.
Args:
cal_str (str): The iCalendar string to be parsed.
Returns:
dict: A dictionary containing information about the processed events.
Raises:
RuntimeError: If there are no dates returned for an event or if there is an error calculating the event hash.
""" """
# Parse the calendar # Parse the calendar
cal = self.parse_icalendar(cal_str) cal = self.parse_icalendar(cal_str)
@ -233,6 +252,8 @@ class CalendarParser:
generator = RecurringEventGenerator(dtstart, event_dict["rrule"]) generator = RecurringEventGenerator(dtstart, event_dict["rrule"])
recur_info = generator.generate() recur_info = generator.generate()
event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"]) event_dates = self.remove_exdates(event_dict["exdate"], recur_info["recur_dates"])
if len(event_dates) == 0:
logging.warning(f"No event dates for event: '{event['summary']}'")
valarms = self.process_valarm(event) valarms = self.process_valarm(event)
@ -253,7 +274,7 @@ class CalendarParser:
} }
try: try:
new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary new_hash = calculate_event_hash(event_dict) # Calculate the hash of the event dictionary
except Exception as e: except Exception:
raise RuntimeError("Error calculating event hash") raise RuntimeError("Error calculating event hash")
event_dict["hash"] = new_hash # Store the hash in the event dictionary event_dict["hash"] = new_hash # Store the hash in the event dictionary
return event_dict return event_dict
@ -261,30 +282,15 @@ class CalendarParser:
def parse_icalendar(self, cal_str): def parse_icalendar(self, cal_str):
""" """
Parse a calendar string into an iCalendar object. Parse a calendar string into an iCalendar object.
Args:
cal_str (str): The iCalendar string to be parsed.
Returns:
Calendar: An iCalendar object representing the parsed calendar.
Raises:
RuntimeError: If there is an error parsing the calendar.
""" """
try: try:
return Calendar.from_ical(cal_str) return Calendar.from_ical(cal_str)
except Exception as e: except Exception:
raise RuntimeError(f"Error parsing calendar. Message from icalendar: {e}") raise RuntimeError("Error parsing icalendar.")
def process_event(self, event): def process_event(self, event):
""" """
Process an event from a parsed calendar and extract relevant information. Process an event from a parsed calendar and extract relevant information.
Args:
event (Event): An iCalendar event object to be processed.
Returns:
dict: A dictionary containing the extracted event information.
""" """
event_info = { event_info = {
"uid": None, "uid": None,
@ -301,21 +307,12 @@ class CalendarParser:
try: try:
event_info[info] = event[info] event_info[info] = event[info]
except Exception: except Exception:
pass logging.info(f"CalDav componant '{info}' missing for event {event['summary']}")
return event_info return event_info
def dtstart_to_datetime(self, dtstart): def dtstart_to_datetime(self, dtstart):
""" """
Convert a date or datetime object into a datetime object with UTC timezone. Convert a date or datetime object into a datetime object with UTC timezone.
Args:
dtstart (date/datetime): The date or datetime to be converted.
Returns:
datetime: A datetime object representing the input date or datetime in UTC timezone.
Raises:
RuntimeError: If there is an error converting the input to a datetime object.
""" """
# Ensure dates are always as datetime # Ensure dates are always as datetime
try: try:
@ -323,22 +320,12 @@ class CalendarParser:
return dtstart.replace(tzinfo=pytz.UTC) return dtstart.replace(tzinfo=pytz.UTC)
else: else:
return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC) return dt.datetime.combine(dtstart, dt.time.min).replace(tzinfo=pytz.UTC)
except Exception as e: except Exception:
raise RuntimeError(f"Error converting dtstart to datetime. Message: {e}") raise RuntimeError("Error converting dtstart to datetime.")
def remove_exdates(self, exdates, recur_dates): def remove_exdates(self, exdates, recur_dates):
""" """
Remove dates from a list of recurring event dates that are in the exdate list. Remove dates from a list of recurring event dates that are in the exdate list.
Args:
exdates (list): A list of datetime objects representing excluded dates.
recur_dates (list): A list of datetime objects representing recurring event dates.
Returns:
list: A list of datetime objects representing the remaining recurring event dates after removing the exdate dates.
Raises:
RuntimeError: If there is an error processing the exdates.
""" """
if exdates != []: if exdates != []:
try: try:
@ -347,31 +334,27 @@ class CalendarParser:
else: else:
exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)] exdates = [exdates.dts[0].dt.replace(tzinfo=pytz.UTC)]
return [i for i in recur_dates if i not in exdates] return [i for i in recur_dates if i not in exdates]
except Exception as e: except Exception:
raise RuntimeError(f"Error processing exdates. Message {e}") raise RuntimeError("Error processing exdates.")
else: else:
return recur_dates return recur_dates
def process_valarm(self, event): def process_valarm(self, event):
""" """
Process VALARM components from an iCalendar event and extract trigger times. Process VALARM components from an iCalendar event and extract trigger times.
Args:
event (Event): An iCalendar event object to be processed.
Returns:
list: A list of datetime objects representing the extracted trigger times.
""" """
valarms = [] valarms = []
for subcomponent in event.walk("valarm"): for subcomponent in event.walk("valarm"):
valarm = Event.from_ical(subcomponent.to_ical()) valarm = Event.from_ical(subcomponent.to_ical())
timedelta = valarm["trigger"].dt timedelta = valarm["trigger"].dt
valarms.append(timedelta) valarms.append(timedelta)
if len(valarms) == 0:
logging.info(f"No reminders for event: {event['summary']}")
return valarms return valarms
def get_next_alert(event, current_time): def get_next_alert(event, current_time):
""" """
This function returns the next alert that should be processed based on the current time. Returns the next alert that should be processed based on the current time.
""" """
event_dates = event["event_dates"] event_dates = event["event_dates"]
valarm_deltas = event["valarms"] valarm_deltas = event["valarms"]
@ -385,71 +368,38 @@ def get_next_alert(event, current_time):
next_alert = min(next_alert_list) next_alert = min(next_alert_list)
return next_alert - dt.timedelta(seconds=5), next_event return next_alert - dt.timedelta(seconds=5), next_event
def process_alert(current_time, next_alert, next_event, event, config): def process_alert(current_time, next_alert, next_event, event, config, alert_history_location):
""" """
This function processes a given alert and passes it to a messaging client. Processes a given alert and passes it to a messaging client.
""" """
if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15): if current_time >= next_alert and current_time < next_alert + dt.timedelta(seconds=15):
if len(event["alert_history"]) == 0: if len(event["alert_history"]) == 0:
logging.debug(f"First alert for '{event['summary']}' detected") logger.info(f"First alert for '{event['summary']}' detected")
event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}] event["alert_history"] = [{"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}]
elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]: elif next_alert in [i["alert_defintition_time"] for i in event["alert_history"]]:
return return
else: else:
logging.debug(f"Posting alert for {event['summary']}!") logger.info(f"Posting alert for {event['summary']}!")
event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert}) event["alert_history"].append({"timestamp_alert_triggered": current_time, "alert_defintition_time": next_alert})
try: try:
processor = AlertProcessor(config) processor = AlertProcessor(config)
processor.send_email(event, next_alert, next_event) processor.send_email(event, next_alert, next_event)
except Exception as e: except Exception as e:
raise RuntimeError(f"Error sending alert for event {event['summary']}. Message {e}") raise RuntimeError(f"Error sending alert for event. {e}")
#processor.send_xmpp(event, next_alert, next_event) with open(alert_history_location, 'a') as f:
with open("alert_history", 'a') as f:
f.write(str(event)) f.write(str(event))
return return
def main(): def daemon(status_location, alert_history_location, config, event_list):
# Parse args and config with open(status_location, 'w') as f:
args = parse_args() f.write("") # Refresh the status file
content = read_file(args.config)
config = parse_toml(content)
# Get calendar dir
cal_dir = Path(config["app"]["calendar_dir"])
if not cal_dir.is_dir():
print(f"The provided path to .ics files does not exist: '{cal_dir}'")
sys.exit(1) # Exit with error code
#Parse calendar events
calendar_parser = CalendarParser()
files = list(cal_dir.glob('*.ics'))
event_list = [] # List to hold dictionaries for each event
for file in files:
with open(file, 'r') as f:
cal_str = f.read()
try:
event_dict = calendar_parser.parse_calendar(cal_str)
except Exception:
logging.error(f"Error parsing event, skipping. {file}")
continue
event_list.append(event_dict)
#Start file handler to detect changes to calendar dir
observer = Observer()
handler = FileChangeHandler(event_list) # Pass event_list here
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
#Start main loop
try:
while True:
with open("status", 'w') as f:
#Refresh the status file
f.write("")
current_time = dt.datetime.now().replace(tzinfo=pytz.UTC) current_time = dt.datetime.now().replace(tzinfo=pytz.UTC)
for event in event_list: for event in event_list:
try:
next_alert, next_event = get_next_alert(event, current_time) next_alert, next_event = get_next_alert(event, current_time)
except RuntimeError as e:
logger.warning(f"Error getting next alert for {event['summary']}, skipping event. Error message {e}")
continue
if next_alert == None: if next_alert == None:
continue continue
event_delta = next_alert-current_time event_delta = next_alert-current_time
@ -464,11 +414,45 @@ def main():
Recur info: {event["recur_info"]} Recur info: {event["recur_info"]}
Alert history: {event["alert_history"]}\n""" Alert history: {event["alert_history"]}\n"""
monitor_status = textwrap.dedent(monitor_status) monitor_status = textwrap.dedent(monitor_status)
with open("status", 'a') as f: with open(status_location, 'a') as f:
# Write the output to the file f.write(monitor_status) # Write the output to the file
f.write(monitor_status)
f.write("\n") f.write("\n")
process_alert(current_time, next_alert, next_event, event, config) try:
process_alert(current_time, next_alert, next_event, event, config, alert_history_location)
except RuntimeError as e:
logger.warning(f"Error processing alert for event {event['summary']}. Error message: {e}")
return
def main():
# Parse args and initiate logging
args = parse_args()
log_location, status_location, alert_history_location = setup_log_location(args.logdir)
setup_logging(log_location)
logger = logging.getLogger()
# Redefine log level if args passed
if args.loglevel is not None:
numeric_level = getattr(logging, args.loglevel.upper(), None) # Convert string to integer
if isinstance(numeric_level, int):
logger = logging.getLogger()
logger.setLevel(numeric_level) # Set the log level
# Setup initial event_list
config_file = read_file(args.config)
config = parse_toml(config_file)
cal_dir = get_calendar_dir(config)
event_list = construct_initial_event_dict(cal_dir)
# Start file handler to detect changes to calendar dir
observer = Observer()
handler = FileChangeHandler(event_list)
observer.schedule(handler, cal_dir, recursive=True)
observer.start()
# Start main loop
try:
while True:
daemon(status_location, alert_history_location, config, event_list)
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
observer.stop() observer.stop()

18
remindme_caldav.service Normal file
View File

@ -0,0 +1,18 @@
[Unit]
Description=Calendar Alerting Daemon
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
RestartSec=1
User=root
ExecStart=/opt/remindme_caldav/.venv/bin/python3 -u /opt/remindme_caldav/remindme_caldav.py --config /etc/remindme_caldav/config.toml --logdir /opt/remindme_caldav/logs
Environment="PYTHONUNBUFFERED=1"
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=remindme_caldav
[Install]
WantedBy=multi-user.target