icinga-channels/RequestLogger.py

194 lines
6.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import re
import sys
from json import JSONDecodeError
from types import SimpleNamespace
class Request:
def __init__(self, id: int, method: str, params: dict = None) -> None:
self.id = id
self.method = method
self.params = params
class Response:
def __init__(self, id: int, error: str = None, result: object = None) -> None:
self.id = id
self.error = error
self.result = result
def __str__(self) -> str:
try:
return json.dumps(
Response.__remove_empty_properties(self.__dict__.copy()),
default=lambda o: o.__dict__,
sort_keys=True,
indent=None
)
except BaseException as _:
return self.__repr__()
@staticmethod
def __remove_empty_properties(dictionary) -> dict:
for k, v in list(dictionary.items()):
if v is None:
del dictionary[k]
elif isinstance(v, dict):
Response.__remove_empty_properties(v)
return dictionary
class FSCLogger:
def __init__(self: FSCLogger) -> None:
self.token = None
self.chat_id = None
self.base_url = None
2025-04-12 22:15:22 +02:00
self.logfile = None
self.log("starting")
@staticmethod
def log(message: str) -> None:
sys.stderr.write("[FSCLogger] {message}{linesep}".format(message=message, linesep=os.linesep))
sys.stdout.flush()
@staticmethod
def return_and_continue(response: Response) -> None:
if response.error:
# removing result in case of an error
response.result = None
FSCLogger.log("returning an error: {}".format(response.error))
FSCLogger.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep))
sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep))
sys.stdout.flush()
@staticmethod
def return_and_exit(response: Response) -> None:
FSCLogger.return_and_continue(response)
if response.error:
exit(1)
else:
exit(0)
def run(self) -> None:
self.log("waiting for requests")
for line in sys.stdin:
parsed_id = None
raw = None
# parse request identifier
id_match = re.match(r'.*"id":(\d+)', line)
if id_match:
parsed_id = id_match.group(1)
# parse request
try:
raw = json.loads(line.strip())
except JSONDecodeError as err:
self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg)))
# map request
if raw:
request = Request(**raw)
self.log("received request: {}".format(vars(request)))
self.handle_request(request)
else:
self.log("dropped input as it couldn't be mapped to a request")
self.log("exiting")
def handle_request(self, request: Request) -> None:
match request.method:
case 'GetInfo':
# return information to Icinga Notifications about this channel
info = SimpleNamespace()
info.name = 'FSCLogger'
info.version = '0.1.0'
info.author = 'FSC'
info.config_attrs = [
{
"name": "logfile",
"type": "string",
"label": {
"de_DE": "Logfile Pfad",
"en_US": "Logfile path"
},
"help": {
"de_DE": "Pfad zum Logfile",
"en_US": "Path to logfile."
},
"required": True,
"min": None,
"max": None
},
]
self.return_and_exit(Response(id=request.id, result=info))
2025-04-12 22:15:22 +02:00
case 'SetConfig':
missing_fields = []
invalid_fields = []
if not request.params:
self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)"))
conf = request.params
# check for required fields
if not 'logfile' in conf:
missing_fields.append('logfile')
if len(missing_fields) > 0:
# return an error as at least one required field seems missing
self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'"
")".format(', '.join(missing_fields))))
# validate fields
2025-04-12 22:30:10 +02:00
if len(conf['logfile']) == "":
2025-04-12 22:15:22 +02:00
invalid_fields.append('logfile')
else:
try:
test = int(conf['logfile'])
except ValueError as _:
invalid_fields.append('logfile')
if len(invalid_fields) > 0:
# return an error as at least one field seems invalid
self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'"
")".format(', '.join(invalid_fields))))
# set configuration to current instance
self.logfile = conf['logfile']
# all good
self.return_and_continue(Response(id=request.id))
case 'SendNotification':
# check for required payload
if not request.params:
self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)"))
notification = request.params
# check if notification payload contains a linked object
if not 'object' in notification:
self.return_and_exit(Response(id=request.id, error="notification is missing its linked object."))
## Log to file
f = open(self.logfile, "a")
f.write("Now the file has more content!")
f.close()
self.log("getting data %s" % notification)
self.return_and_continue(Response(id=request.id))
case _:
self.log("request method unknown: {}".format(request.method))
self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)"))
# run entrypoint
if __name__ == '__main__':
app = FSCLogger()
app.run()