aboutsummaryrefslogtreecommitdiff
path: root/server.py
blob: 7edd081168261b6034658f5cad3e4c6b3f0cba3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import socket
import struct
from http.server import BaseHTTPRequestHandler, HTTPServer
from json import load
from os.path import expanduser
from re import compile
from traceback import print_exc

import config


def rcon_packet(id: int, type: int, body: str) -> bytes:
    return (
        struct.pack(
            "<iii",
            len(body) + 10,
            id,
            type,
        )
        + body.encode("utf-8")
        + bytes((0, 0))
    )


def rcon_login() -> socket.socket:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("127.0.0.1", config.rcon_port))
    s.send(rcon_packet(0, 3, config.rcon_password))
    return s


def rcon(command: str) -> None:
    rcon_socket.send(rcon_packet(0, 2, command))


def pipe(command: str) -> None:
    with open(expanduser(config.pipe_path), "w") as pipe:
        pipe.write(f"{command}\n")


class HttpHandler(BaseHTTPRequestHandler):
    def disabled(self) -> None:
        self.send_response(503)
        self.send_header("Content-Type", "text/plain")
        self.end_headers()
        self.wfile.write("Endpoint disabled".encode("utf-8"))

    def do_GET(self):
        if "GET" not in config.enabled_endpoints:
            self.disabled()
            return

        try:
            with open(expanduser(config.whitelist_file_path)) as file:
                status = 200
                response = file.read()
        except Exception:
            status = 500
            print_exc()

        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.end_headers()
        self.wfile.write(response.encode("utf-8"))

    def do_POST(self) -> None:
        if "POST" not in config.enabled_endpoints:
            self.disabled()
            return

        try:
            length = int(self.headers.get("Content-Length", 0))
            username = self.rfile.read(length).decode("utf-8")

            if allowed_usernames_regex.match(username) is None:
                status = 400
                response = "Incorrect username"
            elif username in whitelist:
                status = 409
                response = "Player is already whitelisted"
            else:
                status = 201
                response = f"Added {username} to the whitelist"

                whitelist.add(username)
                console(f"whitelist add {username}")
        except Exception:
            status = 500
            print_exc()

        self.send_response(status)
        self.send_header("Content-Type", "text/plain")
        self.end_headers()
        self.wfile.write(response.encode("utf-8"))

    def do_DELETE(self) -> None:
        if "DELETE" not in config.enabled_endpoints:
            self.disabled()
            return

        try:
            length = int(self.headers.get("Content-Length", 0))
            username = self.rfile.read(length).decode("utf-8")

            if allowed_usernames_regex.match(username) is None:
                status = 400
                response = "Incorrect username"
            elif username not in whitelist:
                status = 409
                response = "Player is not whitelisted"
            else:
                status = 200
                response = f"Removed {username} from the whitelist"

                whitelist.remove(username)
                console(f"whitelist remove {username}")
        except Exception:
            status = 500
            print_exc()

        self.send_response(status)
        self.send_header("Content-Type", "text/plain")
        self.end_headers()
        self.wfile.write(response.encode("utf-8"))


server = HTTPServer(("0.0.0.0", config.http_port), HttpHandler)
allowed_usernames_regex = compile(r"^[a-zA-Z0-9_]+$")

with open(expanduser(config.whitelist_file_path)) as file:
    whitelist = {e["name"] for e in load(file)}

if config.whitelist_access_type == "pipe":
    console = pipe
elif config.whitelist_access_type == "rcon":
    rcon_socket = rcon_login()
    console = rcon


try:
    server.serve_forever()
except KeyboardInterrupt:
    rcon_socket.close()
    server.server_close()