194 lines
6.8 KiB
Python
194 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from json import JSONDecodeError
|
|
from types import SimpleNamespace
|
|
|
|
class Request:
|
|
def __init__(self, id: int, method: str, params: dict = None) -> None:
|
|
self.id = id
|
|
self.method = method
|
|
self.params = params
|
|
|
|
|
|
class Response:
|
|
def __init__(self, id: int, error: str = None, result: object = None) -> None:
|
|
self.id = id
|
|
self.error = error
|
|
self.result = result
|
|
|
|
def __str__(self) -> str:
|
|
try:
|
|
return json.dumps(
|
|
Response.__remove_empty_properties(self.__dict__.copy()),
|
|
default=lambda o: o.__dict__,
|
|
sort_keys=True,
|
|
indent=None
|
|
)
|
|
except BaseException as _:
|
|
return self.__repr__()
|
|
|
|
@staticmethod
|
|
def __remove_empty_properties(dictionary) -> dict:
|
|
for k, v in list(dictionary.items()):
|
|
if v is None:
|
|
del dictionary[k]
|
|
elif isinstance(v, dict):
|
|
Response.__remove_empty_properties(v)
|
|
return dictionary
|
|
|
|
class FSCLogger:
|
|
|
|
def __init__(self: FSCLogger) -> None:
|
|
self.token = None
|
|
self.chat_id = None
|
|
self.base_url = None
|
|
self.logfile = None
|
|
self.log("starting")
|
|
|
|
@staticmethod
|
|
def log(message: str) -> None:
|
|
sys.stderr.write("[FSCLogger] {message}{linesep}".format(message=message, linesep=os.linesep))
|
|
sys.stdout.flush()
|
|
|
|
@staticmethod
|
|
def return_and_continue(response: Response) -> None:
|
|
if response.error:
|
|
# removing result in case of an error
|
|
response.result = None
|
|
FSCLogger.log("returning an error: {}".format(response.error))
|
|
|
|
FSCLogger.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep))
|
|
sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep))
|
|
sys.stdout.flush()
|
|
|
|
@staticmethod
|
|
def return_and_exit(response: Response) -> None:
|
|
FSCLogger.return_and_continue(response)
|
|
|
|
if response.error:
|
|
exit(1)
|
|
else:
|
|
exit(0)
|
|
|
|
def run(self) -> None:
|
|
self.log("waiting for requests")
|
|
for line in sys.stdin:
|
|
parsed_id = None
|
|
raw = None
|
|
|
|
# parse request identifier
|
|
id_match = re.match(r'.*"id":(\d+)', line)
|
|
if id_match:
|
|
parsed_id = id_match.group(1)
|
|
|
|
# parse request
|
|
try:
|
|
raw = json.loads(line.strip())
|
|
except JSONDecodeError as err:
|
|
self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg)))
|
|
|
|
# map request
|
|
if raw:
|
|
request = Request(**raw)
|
|
self.log("received request: {}".format(vars(request)))
|
|
self.handle_request(request)
|
|
else:
|
|
self.log("dropped input as it couldn't be mapped to a request")
|
|
|
|
self.log("exiting")
|
|
|
|
def handle_request(self, request: Request) -> None:
|
|
match request.method:
|
|
case 'GetInfo':
|
|
# return information to Icinga Notifications about this channel
|
|
info = SimpleNamespace()
|
|
info.name = 'FSCLogger'
|
|
info.version = '0.1.0'
|
|
info.author = 'FSC'
|
|
info.config_attrs = [
|
|
{
|
|
"name": "logfile",
|
|
"type": "string",
|
|
"label": {
|
|
"de_DE": "Logfile Pfad",
|
|
"en_US": "Logfile path"
|
|
},
|
|
"help": {
|
|
"de_DE": "Pfad zum Logfile",
|
|
"en_US": "Path to logfile."
|
|
},
|
|
"required": True,
|
|
"min": None,
|
|
"max": None
|
|
},
|
|
]
|
|
|
|
self.return_and_exit(Response(id=request.id, result=info))
|
|
case 'SetConfig':
|
|
missing_fields = []
|
|
invalid_fields = []
|
|
|
|
if not request.params:
|
|
self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)"))
|
|
conf = request.params
|
|
|
|
# check for required fields
|
|
if not 'logfile' in conf:
|
|
missing_fields.append('logfile')
|
|
|
|
if len(missing_fields) > 0:
|
|
# return an error as at least one required field seems missing
|
|
self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'"
|
|
")".format(', '.join(missing_fields))))
|
|
# validate fields
|
|
if conf['logfile'] == "":
|
|
invalid_fields.append('logfile')
|
|
else:
|
|
try:
|
|
test = int(conf['logfile'])
|
|
except ValueError as _:
|
|
invalid_fields.append('logfile')
|
|
|
|
if len(invalid_fields) > 0:
|
|
# return an error as at least one field seems invalid
|
|
self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'"
|
|
")".format(', '.join(invalid_fields))))
|
|
|
|
# set configuration to current instance
|
|
self.logfile = conf['logfile']
|
|
|
|
# all good
|
|
self.return_and_continue(Response(id=request.id))
|
|
case 'SendNotification':
|
|
# check for required payload
|
|
if not request.params:
|
|
self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)"))
|
|
notification = request.params
|
|
|
|
# check if notification payload contains a linked object
|
|
if not 'object' in notification:
|
|
self.return_and_exit(Response(id=request.id, error="notification is missing its linked object."))
|
|
|
|
## Log to file
|
|
f = open(self.logfile, "a")
|
|
f.write("Now the file has more content!")
|
|
f.close()
|
|
self.log("getting data %s" % notification)
|
|
|
|
|
|
|
|
self.return_and_continue(Response(id=request.id))
|
|
case _:
|
|
self.log("request method unknown: {}".format(request.method))
|
|
self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)"))
|
|
|
|
# run entrypoint
|
|
if __name__ == '__main__':
|
|
app = FSCLogger()
|
|
app.run()
|