1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
import socket
import struct
from http.server import BaseHTTPRequestHandler, HTTPServer
from json import load
from os.path import expanduser
from re import compile
from traceback import print_exc
import config
def rcon_packet(id: int, type: int, body: str) -> bytes:
return (
struct.pack(
"<iii",
len(body) + 10,
id,
type,
)
+ body.encode("utf-8")
+ bytes((0, 0))
)
def rcon_login() -> socket.socket:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", config.rcon_port))
s.send(rcon_packet(0, 3, config.rcon_password))
return s
def rcon(command: str) -> None:
rcon_socket.send(rcon_packet(0, 2, command))
def pipe(command: str) -> None:
with open(expanduser(config.pipe_path), "w") as pipe:
pipe.write(f"{command}\n")
class HttpHandler(BaseHTTPRequestHandler):
def disabled(self) -> None:
self.send_response(503)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write("Endpoint disabled".encode("utf-8"))
def do_GET(self):
if "GET" not in config.enabled_endpoints:
self.disabled()
return
try:
with open(expanduser(config.whitelist_file_path)) as file:
status = 200
response = file.read()
except Exception:
status = 500
print_exc()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(response.encode("utf-8"))
def do_POST(self) -> None:
if "POST" not in config.enabled_endpoints:
self.disabled()
return
try:
length = int(self.headers.get("Content-Length", 0))
username = self.rfile.read(length).decode("utf-8")
if allowed_usernames_regex.match(username) is None:
status = 400
response = "Incorrect username"
elif username in whitelist:
status = 409
response = "Player is already whitelisted"
else:
status = 201
response = f"Added {username} to the whitelist"
whitelist.add(username)
console(f"whitelist add {username}")
except Exception:
status = 500
print_exc()
self.send_response(status)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(response.encode("utf-8"))
def do_DELETE(self) -> None:
if "DELETE" not in config.enabled_endpoints:
self.disabled()
return
try:
length = int(self.headers.get("Content-Length", 0))
username = self.rfile.read(length).decode("utf-8")
if allowed_usernames_regex.match(username) is None:
status = 400
response = "Incorrect username"
elif username not in whitelist:
status = 409
response = "Player is not whitelisted"
else:
status = 200
response = f"Removed {username} from the whitelist"
whitelist.remove(username)
console(f"whitelist remove {username}")
except Exception:
status = 500
print_exc()
self.send_response(status)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(response.encode("utf-8"))
server = HTTPServer(("0.0.0.0", config.http_port), HttpHandler)
allowed_usernames_regex = compile(r"^[a-zA-Z0-9_]+$")
with open(expanduser(config.whitelist_file_path)) as file:
whitelist = {e["name"] for e in load(file)}
if config.whitelist_access_type == "pipe":
console = pipe
elif config.whitelist_access_type == "rcon":
rcon_socket = rcon_login()
console = rcon
try:
server.serve_forever()
except KeyboardInterrupt:
rcon_socket.close()
server.server_close()
|