119 lines
5.0 KiB
Python
119 lines
5.0 KiB
Python
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
import ssl
|
|
import humanfriendly
|
|
import slixmpp
|
|
|
|
class AlertProcessor:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
def _create_email(self, event, next_alert, next_event):
|
|
# Set up the SMTP server details
|
|
smtp_server = self.config["email"]["smtp_server"]
|
|
port = self.config["email"]["port"]
|
|
sender_email = self.config["email"]["username"]
|
|
receiver_email = self.config["email"]["recipient"]
|
|
password = self.config["email"]["password"]
|
|
|
|
# Event details
|
|
event_name, event_description, event_location, event_date, human_readable_time = self._get_event_details(event, next_alert, next_event)
|
|
|
|
# Create a multipart message and set headers
|
|
message = MIMEMultipart()
|
|
message["From"] = sender_email
|
|
message["To"] = receiver_email
|
|
message["Subject"] = "Event Alert: '{}' @ {}".format(event_name, event_date)
|
|
|
|
# Add body to email
|
|
body = self._create_email_body(event_name, event_date, event_description, event_location, human_readable_time)
|
|
message.attach(MIMEText(body, "plain"))
|
|
text = message.as_string()
|
|
|
|
return smtp_server, port, sender_email, receiver_email, password, text
|
|
|
|
def _get_event_details(self, event, next_alert, next_event):
|
|
# Event details
|
|
event_name = event["summary"]
|
|
event_description = event["description"]
|
|
event_location = event["location"]
|
|
event_date = next_event
|
|
event_delta = next_event - next_alert
|
|
total_seconds = event_delta.total_seconds()
|
|
human_readable_time = humanfriendly.format_timespan(total_seconds)
|
|
|
|
return event_name, event_description, event_location, event_date, human_readable_time
|
|
|
|
def _create_email_body(self, event_name, event_date, event_description, event_location, human_readable_time):
|
|
body = """\
|
|
Hi,
|
|
This is an event alert from remindme_caldav.
|
|
|
|
Event details:
|
|
---------------------------------
|
|
|
|
Event name: {}
|
|
Date/time: {}
|
|
Description: {}
|
|
Location: {}
|
|
Time until event: {}
|
|
|
|
---------------------------------
|
|
""".format(event_name, event_date, event_description, event_location, human_readable_time)
|
|
return body
|
|
|
|
def send_email(self, event, next_alert, next_event):
|
|
try:
|
|
smtp_server, port, sender_email, receiver_email, password, text = self._create_email(event, next_alert, next_event)
|
|
|
|
# Create a secure SSL context and connect to the server
|
|
context = ssl.create_default_context()
|
|
with smtplib.SMTP(smtp_server, port) as server: # Use 'with' statement for automatic cleanup
|
|
server.ehlo() # Can be omitted
|
|
server.starttls(context=context) # Secure the connection
|
|
server.login(sender_email, password)
|
|
|
|
# Send email here
|
|
server.sendmail(sender_email, receiver_email, text)
|
|
return print("Message sent via email")
|
|
except Exception as e:
|
|
# Print any error messages to stdout
|
|
print("An error occured when sending alert via email, please check your config. Message: {}".format(e))
|
|
|
|
def _create_xmpp_bot(self, event, next_alert, next_event):
|
|
class SendMsgBot(slixmpp.ClientXMPP):
|
|
def __init__(self, jid, password, recipient, message):
|
|
super().__init__(jid, password)
|
|
self.recipient = recipient
|
|
self.msg = message
|
|
self.add_event_handler("session_start", self.start)
|
|
|
|
def start(self, event):
|
|
self.send_presence()
|
|
self.get_roster()
|
|
self.send_message(mto=self.recipient, mbody=self.msg, mtype='chat')
|
|
self.disconnect()
|
|
|
|
jid = self.config["xmpp"]["jid"]
|
|
password = self.config["xmpp"]["password"] # replace with your password
|
|
recipient = self.config["xmpp"]["recipient"]
|
|
|
|
event_name, event_description, event_location, event_date, human_readable_time = self._get_event_details(event, next_alert, next_event)
|
|
|
|
message = self._create_email_body(event_name, event_date, event_description, event_location, human_readable_time)
|
|
|
|
return SendMsgBot(jid, password, recipient, message)
|
|
|
|
def send_xmpp(self, event, next_alert, next_event):
|
|
try:
|
|
bot = self._create_xmpp_bot(event, next_alert, next_event)
|
|
|
|
bot.register_plugin('xep_0030') # Service Discovery
|
|
bot.register_plugin('xep_0199') # XMPP Ping
|
|
bot.connect()
|
|
bot.process(forever=False)
|
|
return print("Message sent via XMPP")
|
|
except Exception as e:
|
|
print("An error occured when sending alert via XMPP, please check your config. Message: {}".format(e))
|