#!/usr/bin/env python3 from __future__ import annotations import json import os import re import sys from json import JSONDecodeError from types import SimpleNamespace import requests class Request: def __init__(self, id: int, method: str, params: dict = None) -> None: self.id = id self.method = method self.params = params class Response: def __init__(self, id: int, error: str = None, result: object = None) -> None: self.id = id self.error = error self.result = result def __str__(self) -> str: try: return json.dumps( Response.__remove_empty_properties(self.__dict__.copy()), default=lambda o: o.__dict__, sort_keys=True, indent=None ) except BaseException as _: return self.__repr__() @staticmethod def __remove_empty_properties(dictionary) -> dict: for k, v in list(dictionary.items()): if v is None: del dictionary[k] elif isinstance(v, dict): Response.__remove_empty_properties(v) return dictionary class TelegramBridge: NOTIFICATION_TEMPLATE: str = ''' {severity} | {title} | #{id} 🔗 {url_title}
{text}
''' # as taken from # https://github.com/Icinga/icinga-notifications/blob/114c1be12dbe6ff8d3b9280fb67e02812100d101/internal/event/severity.go#L25-L33 SEVERITY_ICON: dict[str, str] = { 'ok': '🟢 OK', 'debug': '⚪ DEBUG', 'info': '🔵 INFO', 'notice': '⚫ NOTICE', 'warning': '🟠 WARNING', 'err': '🔴 ERROR', 'crit': '🔴 CRITICAL', 'alert': '🟣 ALERT', 'emerg': '🟣 EMERGENCY' } def __init__(self: TelegramBridge) -> None: self.token = None self.chat_id = None self.base_url = None self.log("starting") @staticmethod def log(message: str) -> None: sys.stderr.write("[telegramBridge] {message}{linesep}".format(message=message, linesep=os.linesep)) sys.stdout.flush() @staticmethod def return_and_continue(response: Response) -> None: if response.error: # removing result in case of an error response.result = None TelegramBridge.log("returning an error: {}".format(response.error)) TelegramBridge.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep)) sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep)) sys.stdout.flush() @staticmethod def return_and_exit(response: Response) -> None: TelegramBridge.return_and_continue(response) if response.error: exit(1) else: exit(0) def send_message(self, text: str) -> None: url = self.base_url + 'sendMessage' data = { 'chat_id': self.chat_id, 'text': text, 'parse_mode': 'HTML', 'disable_web_page_preview': True } requests.post(url, json=data) def run(self) -> None: self.log("waiting for requests") for line in sys.stdin: parsed_id = None raw = None # parse request identifier id_match = re.match(r'.*"id":(\d+)', line) if id_match: parsed_id = id_match.group(1) # parse request try: raw = json.loads(line.strip()) except JSONDecodeError as err: self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg))) # map request if raw: request = Request(**raw) self.log("received request: {}".format(vars(request))) self.handle_request(request) else: self.log("dropped input as it couldn't be mapped to a request") self.log("exiting") def handle_request(self, request: Request) -> None: match request.method: case 'GetInfo': # return information to Icinga Notifications about this channel info = SimpleNamespace() info.name = 'TelegramBridge' info.version = '0.1.0' info.author = 'Icinga GmbH' info.config_attrs = [ { "name": "bot_token", "type": "secret", "label": { "de_DE": "Telegram Token", "en_US": "Telegram Token" }, "help": { "de_DE": "Authentifizierungstoken von Telegram.", "en_US": "Authentication token from Telegram." }, "required": True, "min": None, "max": None }, { "name": "chat_identifier", "type": "number", "label": { "de_DE": "Chat Identifikator", "en_US": "Chat Identifier" }, "help": { "de_DE": "Chat Identifikator, welcher den gewünschten Telegram Chat referenziert, " "in welchem die Benachrichtigungen landen sollen.", "en_US": "Chat identifier that references the desired Telegram chat, which should receive " "the notifications." }, "required": True, "min": -1, "max": None } ] self.return_and_exit(Response(id=request.id, result=info)) case 'SetConfig': missing_fields = [] invalid_fields = [] if not request.params: self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)")) conf = request.params # check for required fields if not 'bot_token' in conf: missing_fields.append('bot_token') if not 'chat_identifier' in conf: missing_fields.append('chat_identifier') if len(missing_fields) > 0: # return an error as at least one required field seems missing self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'" ")".format(', '.join(missing_fields)))) # validate fields if len(conf['bot_token']) < 44 or len(conf['bot_token']) > 46: invalid_fields.append('bot_token') if conf['chat_identifier'][0:1] == '-': invalid_fields.append('channel_identifier') else: try: test = int(conf['chat_identifier']) except ValueError as _: invalid_fields.append('channel_identifier') if len(invalid_fields) > 0: # return an error as at least one field seems invalid self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'" ")".format(', '.join(invalid_fields)))) # set configuration to current instance self.token = conf['bot_token'] self.chat_id = conf['chat_identifier'] self.base_url = 'https://api.telegram.org/bot{token}/'.format(token=self.token) # all good self.return_and_continue(Response(id=request.id)) case 'SendNotification': # check for required payload if not request.params: self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)")) notification = request.params # check if notification payload contains a linked object if not 'object' in notification: self.return_and_exit(Response(id=request.id, error="notification is missing its linked object.")) # send notification to linked telegram chat title = '' if 'service' in notification['object']['tags']: title = "{service} on {host}".format( service=notification['object']['tags']['service'], host=notification['object']['tags']['host'] ) else: title = notification['object']['tags']['host'] self.send_message(text=TelegramBridge.NOTIFICATION_TEMPLATE.format( severity=TelegramBridge.SEVERITY_ICON[notification['incident']['severity']], title=title, id=notification['incident']['id'], url=notification['incident']['url'], url_title="View Incident".format(notification['incident']['id']), text=notification['event']['message'] )) self.return_and_continue(Response(id=request.id)) case _: self.log("request method unknown: {}".format(request.method)) self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)")) # run entrypoint if __name__ == '__main__': app = TelegramBridge() app.run()