diff --git a/README.md b/README.md index 79bcd9a..15a84cf 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,4 @@ # Icinga Notifications Test Channels +Channels must be renamed without extension when in use. + diff --git a/TelegramBridge.py b/TelegramBridge.py new file mode 100755 index 0000000..3adf165 --- /dev/null +++ b/TelegramBridge.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import re +import sys +from json import JSONDecodeError +from types import SimpleNamespace + +import requests + + +class Request: + def __init__(self, id: int, method: str, params: dict = None) -> None: + self.id = id + self.method = method + self.params = params + + +class Response: + def __init__(self, id: int, error: str = None, result: object = None) -> None: + self.id = id + self.error = error + self.result = result + + def __str__(self) -> str: + try: + return json.dumps( + Response.__remove_empty_properties(self.__dict__.copy()), + default=lambda o: o.__dict__, + sort_keys=True, + indent=None + ) + except BaseException as _: + return self.__repr__() + + @staticmethod + def __remove_empty_properties(dictionary) -> dict: + for k, v in list(dictionary.items()): + if v is None: + del dictionary[k] + elif isinstance(v, dict): + Response.__remove_empty_properties(v) + return dictionary + + +class TelegramBridge: + NOTIFICATION_TEMPLATE: str = ''' +{severity} | {title} | #{id} +🔗 {url_title} +
{text}
+''' + # as taken from + # https://github.com/Icinga/icinga-notifications/blob/114c1be12dbe6ff8d3b9280fb67e02812100d101/internal/event/severity.go#L25-L33 + SEVERITY_ICON: dict[str, str] = { + 'ok': '🟢 OK', + 'debug': '⚪ DEBUG', + 'info': '🔵 INFO', + 'notice': '⚫ NOTICE', + 'warning': '🟠 WARNING', + 'err': '🔴 ERROR', + 'crit': '🔴 CRITICAL', + 'alert': '🟣 ALERT', + 'emerg': '🟣 EMERGENCY' + } + + def __init__(self: TelegramBridge) -> None: + self.token = None + self.chat_id = None + self.base_url = None + self.log("starting") + + @staticmethod + def log(message: str) -> None: + sys.stderr.write("[telegramBridge] {message}{linesep}".format(message=message, linesep=os.linesep)) + sys.stdout.flush() + + @staticmethod + def return_and_continue(response: Response) -> None: + if response.error: + # removing result in case of an error + response.result = None + TelegramBridge.log("returning an error: {}".format(response.error)) + + TelegramBridge.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep)) + sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep)) + sys.stdout.flush() + + @staticmethod + def return_and_exit(response: Response) -> None: + TelegramBridge.return_and_continue(response) + + if response.error: + exit(1) + else: + exit(0) + + def send_message(self, text: str) -> None: + url = self.base_url + 'sendMessage' + data = { + 'chat_id': self.chat_id, + 'text': text, + 'parse_mode': 'HTML', + 'disable_web_page_preview': True + } + requests.post(url, json=data) + + def run(self) -> None: + self.log("waiting for requests") + for line in sys.stdin: + parsed_id = None + raw = None + + # parse request identifier + id_match = re.match(r'.*"id":(\d+)', line) + if id_match: + parsed_id = id_match.group(1) + + # parse request + try: + raw = json.loads(line.strip()) + except JSONDecodeError as err: + self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg))) + + # map request + if raw: + request = Request(**raw) + self.log("received request: {}".format(vars(request))) + self.handle_request(request) + else: + self.log("dropped input as it couldn't be mapped to a request") + + self.log("exiting") + + def handle_request(self, request: Request) -> None: + match request.method: + case 'GetInfo': + # return information to Icinga Notifications about this channel + info = SimpleNamespace() + info.name = 'TelegramBridge' + info.version = '0.1.0' + info.author = 'Icinga GmbH' + info.config_attrs = [ + { + "name": "bot_token", + "type": "secret", + "label": { + "de_DE": "Telegram Token", + "en_US": "Telegram Token" + }, + "help": { + "de_DE": "Authentifizierungstoken von Telegram.", + "en_US": "Authentication token from Telegram." + }, + "required": True, + "min": None, + "max": None + }, + { + "name": "chat_identifier", + "type": "number", + "label": { + "de_DE": "Chat Identifikator", + "en_US": "Chat Identifier" + }, + "help": { + "de_DE": "Chat Identifikator, welcher den gewünschten Telegram Chat referenziert, " + "in welchem die Benachrichtigungen landen sollen.", + "en_US": "Chat identifier that references the desired Telegram chat, which should receive " + "the notifications." + }, + "required": True, + "min": -1, + "max": None + } + ] + + self.return_and_exit(Response(id=request.id, result=info)) + case 'SetConfig': + missing_fields = [] + invalid_fields = [] + + if not request.params: + self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)")) + conf = request.params + + # check for required fields + if not 'bot_token' in conf: + missing_fields.append('bot_token') + if not 'chat_identifier' in conf: + missing_fields.append('chat_identifier') + + if len(missing_fields) > 0: + # return an error as at least one required field seems missing + self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'" + ")".format(', '.join(missing_fields)))) + # validate fields + if len(conf['bot_token']) < 44 or len(conf['bot_token']) > 46: + invalid_fields.append('bot_token') + if conf['chat_identifier'][0:1] == '-': + invalid_fields.append('channel_identifier') + else: + try: + test = int(conf['chat_identifier']) + except ValueError as _: + invalid_fields.append('channel_identifier') + + if len(invalid_fields) > 0: + # return an error as at least one field seems invalid + self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'" + ")".format(', '.join(invalid_fields)))) + + # set configuration to current instance + self.token = conf['bot_token'] + self.chat_id = conf['chat_identifier'] + self.base_url = 'https://api.telegram.org/bot{token}/'.format(token=self.token) + + # all good + self.return_and_continue(Response(id=request.id)) + case 'SendNotification': + # check for required payload + if not request.params: + self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)")) + notification = request.params + + # check if notification payload contains a linked object + if not 'object' in notification: + self.return_and_exit(Response(id=request.id, error="notification is missing its linked object.")) + + # send notification to linked telegram chat + title = '' + if 'service' in notification['object']['tags']: + title = "{service} on {host}".format( + service=notification['object']['tags']['service'], + host=notification['object']['tags']['host'] + ) + else: + title = notification['object']['tags']['host'] + self.send_message(text=TelegramBridge.NOTIFICATION_TEMPLATE.format( + severity=TelegramBridge.SEVERITY_ICON[notification['incident']['severity']], + title=title, + id=notification['incident']['id'], + url=notification['incident']['url'], + url_title="View Incident".format(notification['incident']['id']), + text=notification['event']['message'] + )) + + self.return_and_continue(Response(id=request.id)) + case _: + self.log("request method unknown: {}".format(request.method)) + self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)")) + + +# run entrypoint +if __name__ == '__main__': + app = TelegramBridge() + app.run()