icinga-channels/TelegramBridge.py

259 lines
9.8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import re
import sys
from json import JSONDecodeError
from types import SimpleNamespace
import requests
class Request:
def __init__(self, id: int, method: str, params: dict = None) -> None:
self.id = id
self.method = method
self.params = params
class Response:
def __init__(self, id: int, error: str = None, result: object = None) -> None:
self.id = id
self.error = error
self.result = result
def __str__(self) -> str:
try:
return json.dumps(
Response.__remove_empty_properties(self.__dict__.copy()),
default=lambda o: o.__dict__,
sort_keys=True,
indent=None
)
except BaseException as _:
return self.__repr__()
@staticmethod
def __remove_empty_properties(dictionary) -> dict:
for k, v in list(dictionary.items()):
if v is None:
del dictionary[k]
elif isinstance(v, dict):
Response.__remove_empty_properties(v)
return dictionary
class TelegramBridge:
NOTIFICATION_TEMPLATE: str = '''
<b>{severity} | {title} | #{id}</b>
<a href="{url}">🔗 {url_title}</a>
<pre>{text}</pre>
'''
# as taken from
# https://github.com/Icinga/icinga-notifications/blob/114c1be12dbe6ff8d3b9280fb67e02812100d101/internal/event/severity.go#L25-L33
SEVERITY_ICON: dict[str, str] = {
'ok': '🟢 OK',
'debug': '⚪ DEBUG',
'info': '🔵 INFO',
'notice': '⚫ NOTICE',
'warning': '🟠 WARNING',
'err': '🔴 ERROR',
'crit': '🔴 CRITICAL',
'alert': '🟣 ALERT',
'emerg': '🟣 EMERGENCY'
}
def __init__(self: TelegramBridge) -> None:
self.token = None
self.chat_id = None
self.base_url = None
self.log("starting")
@staticmethod
def log(message: str) -> None:
sys.stderr.write("[telegramBridge] {message}{linesep}".format(message=message, linesep=os.linesep))
sys.stdout.flush()
@staticmethod
def return_and_continue(response: Response) -> None:
if response.error:
# removing result in case of an error
response.result = None
TelegramBridge.log("returning an error: {}".format(response.error))
TelegramBridge.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep))
sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep))
sys.stdout.flush()
@staticmethod
def return_and_exit(response: Response) -> None:
TelegramBridge.return_and_continue(response)
if response.error:
exit(1)
else:
exit(0)
def send_message(self, text: str) -> None:
url = self.base_url + 'sendMessage'
data = {
'chat_id': self.chat_id,
'text': text,
'parse_mode': 'HTML',
'disable_web_page_preview': True
}
requests.post(url, json=data)
def run(self) -> None:
self.log("waiting for requests")
for line in sys.stdin:
parsed_id = None
raw = None
# parse request identifier
id_match = re.match(r'.*"id":(\d+)', line)
if id_match:
parsed_id = id_match.group(1)
# parse request
try:
raw = json.loads(line.strip())
except JSONDecodeError as err:
self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg)))
# map request
if raw:
request = Request(**raw)
self.log("received request: {}".format(vars(request)))
self.handle_request(request)
else:
self.log("dropped input as it couldn't be mapped to a request")
self.log("exiting")
def handle_request(self, request: Request) -> None:
match request.method:
case 'GetInfo':
# return information to Icinga Notifications about this channel
info = SimpleNamespace()
info.name = 'TelegramBridge'
info.version = '0.1.0'
info.author = 'Icinga GmbH'
info.config_attrs = [
{
"name": "bot_token",
"type": "secret",
"label": {
"de_DE": "Telegram Token",
"en_US": "Telegram Token"
},
"help": {
"de_DE": "Authentifizierungstoken von Telegram.",
"en_US": "Authentication token from Telegram."
},
"required": True,
"min": None,
"max": None
},
{
"name": "chat_identifier",
"type": "number",
"label": {
"de_DE": "Chat Identifikator",
"en_US": "Chat Identifier"
},
"help": {
"de_DE": "Chat Identifikator, welcher den gewünschten Telegram Chat referenziert, "
"in welchem die Benachrichtigungen landen sollen.",
"en_US": "Chat identifier that references the desired Telegram chat, which should receive "
"the notifications."
},
"required": True,
"min": -1,
"max": None
}
]
self.return_and_exit(Response(id=request.id, result=info))
case 'SetConfig':
missing_fields = []
invalid_fields = []
if not request.params:
self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)"))
conf = request.params
# check for required fields
if not 'bot_token' in conf:
missing_fields.append('bot_token')
if not 'chat_identifier' in conf:
missing_fields.append('chat_identifier')
if len(missing_fields) > 0:
# return an error as at least one required field seems missing
self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'"
")".format(', '.join(missing_fields))))
# validate fields
if len(conf['bot_token']) < 44 or len(conf['bot_token']) > 46:
invalid_fields.append('bot_token')
if conf['chat_identifier'][0:1] == '-':
invalid_fields.append('channel_identifier')
else:
try:
test = int(conf['chat_identifier'])
except ValueError as _:
invalid_fields.append('channel_identifier')
if len(invalid_fields) > 0:
# return an error as at least one field seems invalid
self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'"
")".format(', '.join(invalid_fields))))
# set configuration to current instance
self.token = conf['bot_token']
self.chat_id = conf['chat_identifier']
self.base_url = 'https://api.telegram.org/bot{token}/'.format(token=self.token)
# all good
self.return_and_continue(Response(id=request.id))
case 'SendNotification':
# check for required payload
if not request.params:
self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)"))
notification = request.params
# check if notification payload contains a linked object
if not 'object' in notification:
self.return_and_exit(Response(id=request.id, error="notification is missing its linked object."))
# send notification to linked telegram chat
title = ''
if 'service' in notification['object']['tags']:
title = "{service} on {host}".format(
service=notification['object']['tags']['service'],
host=notification['object']['tags']['host']
)
else:
title = notification['object']['tags']['host']
self.send_message(text=TelegramBridge.NOTIFICATION_TEMPLATE.format(
severity=TelegramBridge.SEVERITY_ICON[notification['incident']['severity']],
title=title,
id=notification['incident']['id'],
url=notification['incident']['url'],
url_title="View Incident".format(notification['incident']['id']),
text=notification['event']['message']
))
self.return_and_continue(Response(id=request.id))
case _:
self.log("request method unknown: {}".format(request.method))
self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)"))
# run entrypoint
if __name__ == '__main__':
app = TelegramBridge()
app.run()