import sqlite3 import logging import os import http.server import time import json import qrcode import base64 import threading from urllib.parse import parse_qs from io import BytesIO LOG_LEVEL_SCREEN = logging.DEBUG LOG_LEVEL_FILE = logging.DEBUG """ Copyright 2022 Tarek Poltermann """ def generate_config(name="", iban="", bic="", usecase=""): new_config = { "name": name, "iban": iban.replace(" ", ""), "bic": bic, "usecase": usecase } with open(os.path.join(os.getcwd(), "config.json"), "w") as config_file: config_file.write(json.dumps(new_config)) class IBANCheck: letter_dic = {"A": 10, "B": 11, "C": 12, "D": 13, "E": 14, "F": 15, "G": 16, "H": 17, "I": 18, "J": 19, "K": 20, "L": 21, "M": 22, "N": 23, "O": 24, "P": 25, "Q": 26, "R": 27, "S": 28, "T": 29, "U": 30, "V": 31, "W": 32, "X": 33, "Y": 34, "Z": 35, "0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9} letters = {ord(k): str(v) for k, v in letter_dic.items()} def check_validation_chars_iban(self, iban): zeros_iban = iban[:2] + '00' + iban[4:] iban_inverted = zeros_iban[4:] + zeros_iban[:4] iban_numbered = iban_inverted.translate(self.letters) verification_chars = 98 - (int(iban_numbered) % 97) if verification_chars < 10: verification_chars = '{:02}'.format(int(verification_chars)) return verification_chars def validate_iban(self, iban): iban_inverted = iban[4:] + iban[:4] iban_numbered = iban_inverted.translate(self.letters) return int(iban_numbered) % 97 class ServerHandler(http.server.SimpleHTTPRequestHandler): logger = logging.getLogger('webEPC.Webserver') allowed_paths = ["/favicon.ico", "/", "/css/global.css", "/settings"] mapping = { "/settings": "settings.html", "/": "index.html", "/css/global.css": "css/global.css", "/favicon.ico": "favicon.ico", } def __init__(self, *args, **kwargs): directory = os.path.join(os.getcwd(), "root") super().__init__(*args, **kwargs, directory=directory) def log_message(self, format, *args): self.logger.info("IP: %s - HTTP Code: %s - %s" % (self.client_address[0], args[1], args[0])) def do_404(self): body = b"404" self.send_response(404) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): if self.path[-1:] == "/" and self.path != "/": self.path = self.path[:-1] if self.path in self.allowed_paths: body = open(os.path.join(os.getcwd(), "root", self.mapping[self.path]), "rb").read() if self.path == "/settings": with open(os.path.join(os.getcwd(), "config.json")) as config_file: config_json = json.loads(config_file.read()) body = body.replace(b"{{ name }}", config_json["name"].encode("utf-8")) body = body.replace(b"{{ iban }}", config_json["iban"].encode("utf-8")) body = body.replace(b"{{ bic }}", config_json["bic"].encode("utf-8")) body = body.replace(b"{{ usecase }}", config_json["usecase"].encode("utf-8")) self.send_response(200) if self.mapping[self.path][-5:] == ".html": self.send_header('Content-Type', 'text/html') elif self.path[-4:] == ".css": self.send_header('Content-Type', 'text/css') elif self.path[-3:] == ".js": self.send_header('Content-Type', 'application/javascript') elif self.path[-4:] == ".ico": self.send_header('Content-Type', 'image/x-icon') else: self.send_header('Content-Type', 'text/plain') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) else: self.do_404() def do_POST(self): content_length = int(self.headers['Content-Length']) post = self.rfile.read(content_length) decoded_post = parse_qs(post.decode("utf-8")) self.logger.info("Server got POST request from " + self.client_address[0] + " Action: " + decoded_post["action"][0]) action = decoded_post["action"][0] body = open(os.path.join(os.getcwd(), "root", "message.html"), "rb").read() if action == "price-input": price = str(float(decoded_post["price"][0])).replace(",", ".") localtime = time.localtime() with open(os.path.join(os.getcwd(), "config.json")) as config_file: config_json = json.loads(config_file.read()) if config_json["bic"] != "" and config_json["name"] != "" and config_json["iban"] != "" and config_json["usecase"] != "": qrcodedata = ("BCD\n002\n1\nSCT\n%s\n%s\n%s\nEUR%s\n%s\n%s\n%s %s.%s.%s %s:%s:%s" % ( config_json["bic"], config_json["name"], config_json["iban"].replace(" ", ""), price, "", "", config_json["usecase"], localtime.tm_mday, localtime.tm_mon, localtime.tm_year, localtime.tm_hour, localtime.tm_min, localtime.tm_sec ) ) temp_io = BytesIO() img = qrcode.make(qrcodedata) img.save(temp_io, format="PNG") img_base64 = base64.b64encode(temp_io.getvalue()) body = open(os.path.join(os.getcwd(), "root", "qrcode.html"), "rb").read() body = body.replace(b"{{ base64qrcode }}", img_base64) else: body = open(os.path.join(os.getcwd(), "root", "message.html"), "rb").read() body = body.replace(b"{{ message }}", bytes("Keine Einstellungen hinterlegt.", "utf-8")) elif action == "save-settings": iban = decoded_post["iban"][0].replace(" ", "") iban_failed = False iban_checker = IBANCheck() if iban_checker.check_validation_chars_iban(iban) == int(iban[2:4]): if iban_checker.validate_iban(iban) == 1: generate_config(decoded_post["name"][0], decoded_post["iban"][0], decoded_post["bic"][0], decoded_post["usecase"][0]) body = open(os.path.join(os.getcwd(), "root", "message.html"), "rb").read() body = body.replace(b"{{ message }}", bytes("Einstellungen gespeichert.", "utf-8")) else: iban_failed = True else: iban_failed = True if iban_failed: body = open(os.path.join(os.getcwd(), "root", "message.html"), "rb").read() body = body.replace(b"{{ message }}", bytes("IBAN ist ungültig.", "utf-8")) else: body = open(os.path.join(os.getcwd(), "root", "error.html"), "rb").read() body = body.replace(b"{{ message }}", bytes("Ein Fehler ist aufgetreten.", "utf-8")) body = body.replace(b"{{ path }}", bytes(self.path, "utf-8")) self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(body) class EPC: def __init__(self, port=8000, listen="0.0.0.0"): self.logger = logging.getLogger("webEPC.Webserver") self.port = port self.listen = listen self.httpd = None if not os.path.exists(os.path.join(os.getcwd(), "config.json")): self.logger.info("Generating new config file...") generate_config() self.logger.info("Starting Webserver...") self.logger.info("Port: %s" % str(self.port)) self.logger.info("Listen on: %s" % str(self.listen)) self.logger.info("Open: http://%s:%s" % ("localhost", str(self.port))) self.handler = ServerHandler self.httpd = http.server.HTTPServer((self.listen, self.port), self.handler) self.httpd.serve_forever() self.shutdown_thread = threading.Thread(target=self.shutdown_webserver_thread) self.shutdown_thread.start() def shutdown_webserver_thread(self): while True: time.sleep(0.1) if not threading.main_thread().is_alive(): logging.getLogger("webEPC.Webserver").info("Shutting down Webserver...") self.httpd.shutdown() return False if __name__ == '__main__': logger = logging.getLogger('webEPC') logger.setLevel(logging.DEBUG) logger.propagate = False formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_file_handler = logging.FileHandler('webEPC.log') log_file_handler.setLevel(LOG_LEVEL_FILE) log_file_handler.setFormatter(formatter) logger.addHandler(log_file_handler) log_stream_handler = logging.StreamHandler() log_stream_handler.setLevel(LOG_LEVEL_SCREEN) log_stream_handler.setFormatter(formatter) logger.addHandler(log_stream_handler) EPC(8083, "0.0.0.0")