#!/usr/bin/env python3 from __future__ import annotations import json import os import re import sys from json import JSONDecodeError from types import SimpleNamespace class Request: def __init__(self, id: int, method: str, params: dict = None) -> None: self.id = id self.method = method self.params = params class Response: def __init__(self, id: int, error: str = None, result: object = None) -> None: self.id = id self.error = error self.result = result def __str__(self) -> str: try: return json.dumps( Response.__remove_empty_properties(self.__dict__.copy()), default=lambda o: o.__dict__, sort_keys=True, indent=None ) except BaseException as _: return self.__repr__() @staticmethod def __remove_empty_properties(dictionary) -> dict: for k, v in list(dictionary.items()): if v is None: del dictionary[k] elif isinstance(v, dict): Response.__remove_empty_properties(v) return dictionary class FSCLogger: def __init__(self: FSCLogger) -> None: self.token = None self.chat_id = None self.base_url = None self.logfile = None self.log("starting") @staticmethod def log(message: str) -> None: sys.stderr.write("[FSCLogger] {message}{linesep}".format(message=message, linesep=os.linesep)) sys.stdout.flush() @staticmethod def return_and_continue(response: Response) -> None: if response.error: # removing result in case of an error response.result = None FSCLogger.log("returning an error: {}".format(response.error)) FSCLogger.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep)) sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep)) sys.stdout.flush() @staticmethod def return_and_exit(response: Response) -> None: FSCLogger.return_and_continue(response) if response.error: exit(1) else: exit(0) def run(self) -> None: self.log("waiting for requests") for line in sys.stdin: parsed_id = None raw = None # parse request identifier id_match = re.match(r'.*"id":(\d+)', line) if id_match: parsed_id = id_match.group(1) # parse request try: raw = json.loads(line.strip()) except JSONDecodeError as err: self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg))) # map request if raw: request = Request(**raw) self.log("received request: {}".format(vars(request))) self.handle_request(request) else: self.log("dropped input as it couldn't be mapped to a request") self.log("exiting") def handle_request(self, request: Request) -> None: match request.method: case 'GetInfo': # return information to Icinga Notifications about this channel info = SimpleNamespace() info.name = 'FSCLogger' info.version = '0.1.0' info.author = 'FSC' info.config_attrs = [ { "name": "logfile", "type": "string", "label": { "de_DE": "Logfile Pfad", "en_US": "Logfile path" }, "help": { "de_DE": "Pfad zum Logfile", "en_US": "Path to logfile." }, "required": True, "min": None, "max": None }, ] self.return_and_exit(Response(id=request.id, result=info)) case 'SetConfig': missing_fields = [] invalid_fields = [] if not request.params: self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)")) conf = request.params # check for required fields if not 'logfile' in conf: missing_fields.append('logfile') if len(missing_fields) > 0: # return an error as at least one required field seems missing self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'" ")".format(', '.join(missing_fields)))) self.log("conf: " + conf['logfile']) # validate fields if conf['logfile'] == "": invalid_fields.append('logfile') if len(invalid_fields) > 0: # return an error as at least one field seems invalid self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'" ")".format(', '.join(invalid_fields)))) # set configuration to current instance self.logfile = conf['logfile'] # all good self.return_and_continue(Response(id=request.id)) case 'SendNotification': # check for required payload if not request.params: self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)")) notification = request.params # check if notification payload contains a linked object if not 'object' in notification: self.return_and_exit(Response(id=request.id, error="notification is missing its linked object.")) ## Log to file f = open(self.logfile, "a+") f.write("Now the file has more content!") f.close() self.log("getting data %s" % notification) self.return_and_continue(Response(id=request.id)) case _: self.log("request method unknown: {}".format(request.method)) self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)")) # run entrypoint if __name__ == '__main__': app = FSCLogger() app.run()