259 lines
9.8 KiB
Python
259 lines
9.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
from json import JSONDecodeError
|
||
|
|
from types import SimpleNamespace
|
||
|
|
|
||
|
|
import requests
|
||
|
|
|
||
|
|
|
||
|
|
class Request:
|
||
|
|
def __init__(self, id: int, method: str, params: dict = None) -> None:
|
||
|
|
self.id = id
|
||
|
|
self.method = method
|
||
|
|
self.params = params
|
||
|
|
|
||
|
|
|
||
|
|
class Response:
|
||
|
|
def __init__(self, id: int, error: str = None, result: object = None) -> None:
|
||
|
|
self.id = id
|
||
|
|
self.error = error
|
||
|
|
self.result = result
|
||
|
|
|
||
|
|
def __str__(self) -> str:
|
||
|
|
try:
|
||
|
|
return json.dumps(
|
||
|
|
Response.__remove_empty_properties(self.__dict__.copy()),
|
||
|
|
default=lambda o: o.__dict__,
|
||
|
|
sort_keys=True,
|
||
|
|
indent=None
|
||
|
|
)
|
||
|
|
except BaseException as _:
|
||
|
|
return self.__repr__()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def __remove_empty_properties(dictionary) -> dict:
|
||
|
|
for k, v in list(dictionary.items()):
|
||
|
|
if v is None:
|
||
|
|
del dictionary[k]
|
||
|
|
elif isinstance(v, dict):
|
||
|
|
Response.__remove_empty_properties(v)
|
||
|
|
return dictionary
|
||
|
|
|
||
|
|
|
||
|
|
class TelegramBridge:
|
||
|
|
NOTIFICATION_TEMPLATE: str = '''
|
||
|
|
<b>{severity} | {title} | #{id}</b>
|
||
|
|
<a href="{url}">🔗 {url_title}</a>
|
||
|
|
<pre>{text}</pre>
|
||
|
|
'''
|
||
|
|
# as taken from
|
||
|
|
# https://github.com/Icinga/icinga-notifications/blob/114c1be12dbe6ff8d3b9280fb67e02812100d101/internal/event/severity.go#L25-L33
|
||
|
|
SEVERITY_ICON: dict[str, str] = {
|
||
|
|
'ok': '🟢 OK',
|
||
|
|
'debug': '⚪ DEBUG',
|
||
|
|
'info': '🔵 INFO',
|
||
|
|
'notice': '⚫ NOTICE',
|
||
|
|
'warning': '🟠 WARNING',
|
||
|
|
'err': '🔴 ERROR',
|
||
|
|
'crit': '🔴 CRITICAL',
|
||
|
|
'alert': '🟣 ALERT',
|
||
|
|
'emerg': '🟣 EMERGENCY'
|
||
|
|
}
|
||
|
|
|
||
|
|
def __init__(self: TelegramBridge) -> None:
|
||
|
|
self.token = None
|
||
|
|
self.chat_id = None
|
||
|
|
self.base_url = None
|
||
|
|
self.log("starting")
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def log(message: str) -> None:
|
||
|
|
sys.stderr.write("[telegramBridge] {message}{linesep}".format(message=message, linesep=os.linesep))
|
||
|
|
sys.stdout.flush()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def return_and_continue(response: Response) -> None:
|
||
|
|
if response.error:
|
||
|
|
# removing result in case of an error
|
||
|
|
response.result = None
|
||
|
|
TelegramBridge.log("returning an error: {}".format(response.error))
|
||
|
|
|
||
|
|
TelegramBridge.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep))
|
||
|
|
sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep))
|
||
|
|
sys.stdout.flush()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def return_and_exit(response: Response) -> None:
|
||
|
|
TelegramBridge.return_and_continue(response)
|
||
|
|
|
||
|
|
if response.error:
|
||
|
|
exit(1)
|
||
|
|
else:
|
||
|
|
exit(0)
|
||
|
|
|
||
|
|
def send_message(self, text: str) -> None:
|
||
|
|
url = self.base_url + 'sendMessage'
|
||
|
|
data = {
|
||
|
|
'chat_id': self.chat_id,
|
||
|
|
'text': text,
|
||
|
|
'parse_mode': 'HTML',
|
||
|
|
'disable_web_page_preview': True
|
||
|
|
}
|
||
|
|
requests.post(url, json=data)
|
||
|
|
|
||
|
|
def run(self) -> None:
|
||
|
|
self.log("waiting for requests")
|
||
|
|
for line in sys.stdin:
|
||
|
|
parsed_id = None
|
||
|
|
raw = None
|
||
|
|
|
||
|
|
# parse request identifier
|
||
|
|
id_match = re.match(r'.*"id":(\d+)', line)
|
||
|
|
if id_match:
|
||
|
|
parsed_id = id_match.group(1)
|
||
|
|
|
||
|
|
# parse request
|
||
|
|
try:
|
||
|
|
raw = json.loads(line.strip())
|
||
|
|
except JSONDecodeError as err:
|
||
|
|
self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg)))
|
||
|
|
|
||
|
|
# map request
|
||
|
|
if raw:
|
||
|
|
request = Request(**raw)
|
||
|
|
self.log("received request: {}".format(vars(request)))
|
||
|
|
self.handle_request(request)
|
||
|
|
else:
|
||
|
|
self.log("dropped input as it couldn't be mapped to a request")
|
||
|
|
|
||
|
|
self.log("exiting")
|
||
|
|
|
||
|
|
def handle_request(self, request: Request) -> None:
|
||
|
|
match request.method:
|
||
|
|
case 'GetInfo':
|
||
|
|
# return information to Icinga Notifications about this channel
|
||
|
|
info = SimpleNamespace()
|
||
|
|
info.name = 'TelegramBridge'
|
||
|
|
info.version = '0.1.0'
|
||
|
|
info.author = 'Icinga GmbH'
|
||
|
|
info.config_attrs = [
|
||
|
|
{
|
||
|
|
"name": "bot_token",
|
||
|
|
"type": "secret",
|
||
|
|
"label": {
|
||
|
|
"de_DE": "Telegram Token",
|
||
|
|
"en_US": "Telegram Token"
|
||
|
|
},
|
||
|
|
"help": {
|
||
|
|
"de_DE": "Authentifizierungstoken von Telegram.",
|
||
|
|
"en_US": "Authentication token from Telegram."
|
||
|
|
},
|
||
|
|
"required": True,
|
||
|
|
"min": None,
|
||
|
|
"max": None
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"name": "chat_identifier",
|
||
|
|
"type": "number",
|
||
|
|
"label": {
|
||
|
|
"de_DE": "Chat Identifikator",
|
||
|
|
"en_US": "Chat Identifier"
|
||
|
|
},
|
||
|
|
"help": {
|
||
|
|
"de_DE": "Chat Identifikator, welcher den gewünschten Telegram Chat referenziert, "
|
||
|
|
"in welchem die Benachrichtigungen landen sollen.",
|
||
|
|
"en_US": "Chat identifier that references the desired Telegram chat, which should receive "
|
||
|
|
"the notifications."
|
||
|
|
},
|
||
|
|
"required": True,
|
||
|
|
"min": -1,
|
||
|
|
"max": None
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
self.return_and_exit(Response(id=request.id, result=info))
|
||
|
|
case 'SetConfig':
|
||
|
|
missing_fields = []
|
||
|
|
invalid_fields = []
|
||
|
|
|
||
|
|
if not request.params:
|
||
|
|
self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)"))
|
||
|
|
conf = request.params
|
||
|
|
|
||
|
|
# check for required fields
|
||
|
|
if not 'bot_token' in conf:
|
||
|
|
missing_fields.append('bot_token')
|
||
|
|
if not 'chat_identifier' in conf:
|
||
|
|
missing_fields.append('chat_identifier')
|
||
|
|
|
||
|
|
if len(missing_fields) > 0:
|
||
|
|
# return an error as at least one required field seems missing
|
||
|
|
self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'"
|
||
|
|
")".format(', '.join(missing_fields))))
|
||
|
|
# validate fields
|
||
|
|
if len(conf['bot_token']) < 44 or len(conf['bot_token']) > 46:
|
||
|
|
invalid_fields.append('bot_token')
|
||
|
|
if conf['chat_identifier'][0:1] == '-':
|
||
|
|
invalid_fields.append('channel_identifier')
|
||
|
|
else:
|
||
|
|
try:
|
||
|
|
test = int(conf['chat_identifier'])
|
||
|
|
except ValueError as _:
|
||
|
|
invalid_fields.append('channel_identifier')
|
||
|
|
|
||
|
|
if len(invalid_fields) > 0:
|
||
|
|
# return an error as at least one field seems invalid
|
||
|
|
self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'"
|
||
|
|
")".format(', '.join(invalid_fields))))
|
||
|
|
|
||
|
|
# set configuration to current instance
|
||
|
|
self.token = conf['bot_token']
|
||
|
|
self.chat_id = conf['chat_identifier']
|
||
|
|
self.base_url = 'https://api.telegram.org/bot{token}/'.format(token=self.token)
|
||
|
|
|
||
|
|
# all good
|
||
|
|
self.return_and_continue(Response(id=request.id))
|
||
|
|
case 'SendNotification':
|
||
|
|
# check for required payload
|
||
|
|
if not request.params:
|
||
|
|
self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)"))
|
||
|
|
notification = request.params
|
||
|
|
|
||
|
|
# check if notification payload contains a linked object
|
||
|
|
if not 'object' in notification:
|
||
|
|
self.return_and_exit(Response(id=request.id, error="notification is missing its linked object."))
|
||
|
|
|
||
|
|
# send notification to linked telegram chat
|
||
|
|
title = ''
|
||
|
|
if 'service' in notification['object']['tags']:
|
||
|
|
title = "{service} on {host}".format(
|
||
|
|
service=notification['object']['tags']['service'],
|
||
|
|
host=notification['object']['tags']['host']
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
title = notification['object']['tags']['host']
|
||
|
|
self.send_message(text=TelegramBridge.NOTIFICATION_TEMPLATE.format(
|
||
|
|
severity=TelegramBridge.SEVERITY_ICON[notification['incident']['severity']],
|
||
|
|
title=title,
|
||
|
|
id=notification['incident']['id'],
|
||
|
|
url=notification['incident']['url'],
|
||
|
|
url_title="View Incident".format(notification['incident']['id']),
|
||
|
|
text=notification['event']['message']
|
||
|
|
))
|
||
|
|
|
||
|
|
self.return_and_continue(Response(id=request.id))
|
||
|
|
case _:
|
||
|
|
self.log("request method unknown: {}".format(request.method))
|
||
|
|
self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)"))
|
||
|
|
|
||
|
|
|
||
|
|
# run entrypoint
|
||
|
|
if __name__ == '__main__':
|
||
|
|
app = TelegramBridge()
|
||
|
|
app.run()
|