This repository was archived by the owner on Dec 31, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path__init__.py
112 lines (91 loc) · 3.16 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import ast
import atexit
import json
import marshal
import os
import socket
import time
from base64 import b64encode
from contextlib import contextmanager
from pathlib import Path
from urllib.request import Request, urlopen
import docker
from pysandbox.purifier import Insecure, Purifier
DOCKER_FILE_PATH = os.fspath(Path(__file__).parent)
class PySandbox:
def __init__(self, docker_client, api_client):
self._docker_client = docker_client
self._api_client = api_client
self._image = self.obtain_image()
self._instances = {}
self._ports = {}
self._delay = 1.5
self._purifier = Purifier()
atexit.register(self.quit)
def run_cmd(self, code, idx=0):
"""Runs python code on the containers,
code is self-explaining, step-by-step"""
tree = ast.parse(code)
try:
self._purifier.visit(tree)
code = compile(tree, "<pysandbox>", "exec")
code = marshal.dumps(code)
code = b64encode(code)
payload = {"code": code.decode()}
payload = json.dumps(payload)
with self.run_instance(idx) as container:
request = Request(
f"http://localhost:{self._ports[container]}",
data=payload.encode(),
method="POST",
)
response = urlopen(request)
response = json.loads(response.read().decode())
except Insecure as exc:
response = {"result": "FAIL", "msg": exc}
return response
@contextmanager
def run_instance(self, idx):
new = self._instances.get(idx)
if new:
container = new
else:
port = self._get_free_port()
container = self._docker_client.containers.run(
"pysandbox", ports={"18888/tcp": port}, detach=True
)
self._ports[container] = port
self._instances[idx] = container
try:
if new:
container.unpause()
else:
time.sleep(self._delay)
yield container
finally:
container.pause()
def quit(self):
for container in self._instances.copy():
self.quit_single(container)
self._ports = {}
def quit_single(self, idx):
return self._instances.pop(idx).kill()
def obtain_image(self, force_build = False):
try:
if force_build:
raise docker.errors.ImageNotFound(None)
return self._docker_client.images.get("pysandbox")
except docker.errors.ImageNotFound:
return self._docker_client.images.build(
path=DOCKER_FILE_PATH, tag="pysandbox"
)[0]
def _get_free_port(self, plus=1):
port = max(self._ports.values(), default=1764) + plus
if self.check_empty_port(port):
return self._get_free_port(plus=(port - 1764) + 1)
else:
return port
@staticmethod
def check_empty_port(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
return not bool(sock.connect_ex(("127.0.0.1", port)))