forked from nicolargo/glances
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
219 lines (176 loc) · 7.52 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#
"""Manage the Glances server."""
import json
import socket
import sys
from base64 import b64decode
from glances import __version__
from glances.autodiscover import GlancesAutoDiscoverClient
from glances.globals import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
from glances.logger import logger
from glances.stats_server import GlancesStatsServer
from glances.timer import Timer
class GlancesXMLRPCHandler(SimpleXMLRPCRequestHandler):
"""Main XML-RPC handler."""
rpc_paths = ('/RPC2',)
def end_headers(self):
# Hack to add a specific header
# Thk to: https://gist.github.com/rca/4063325
self.send_my_headers()
super().end_headers()
def send_my_headers(self):
# Specific header is here (solved the issue #227)
self.send_header("Access-Control-Allow-Origin", "*")
def authenticate(self, headers):
# auth = headers.get('Authorization')
try:
(basic, _, encoded) = headers.get('Authorization').partition(' ')
except Exception:
# Client did not ask for authentication
# If server need it then exit
return not self.server.isAuth
else:
# Client authentication
(basic, _, encoded) = headers.get('Authorization').partition(' ')
assert basic == 'Basic', 'Only basic authentication supported'
# Encoded portion of the header is a string
# Need to convert to byte-string
encoded_byte_string = encoded.encode()
# Decode base64 byte string to a decoded byte string
decoded_bytes = b64decode(encoded_byte_string)
# Convert from byte string to a regular string
decoded_string = decoded_bytes.decode()
# Get the username and password from the string
(username, _, password) = decoded_string.partition(':')
# Check that username and password match internal global dictionary
return self.check_user(username, password)
def check_user(self, username, password):
# Check username and password in the dictionary
if username in self.server.user_dict:
from glances.password import GlancesPassword
# TODO: config is not taken into account: it may be a problem ?
pwd = GlancesPassword(username=username, config=None)
return pwd.check_password(self.server.user_dict[username], password)
return False
def parse_request(self):
if SimpleXMLRPCRequestHandler.parse_request(self):
# Next we authenticate
if self.authenticate(self.headers):
return True
# if authentication fails, tell the client
self.send_error(401, 'Authentication failed')
return False
def log_message(self, log_format, *args):
# No message displayed on the server side
pass
class GlancesXMLRPCServer(SimpleXMLRPCServer):
"""Init a SimpleXMLRPCServer instance (IPv6-ready)."""
finished = False
def __init__(self, bind_address, bind_port=61209, requestHandler=GlancesXMLRPCHandler, config=None):
self.bind_address = bind_address
self.bind_port = bind_port
self.config = config
try:
self.address_family = socket.getaddrinfo(bind_address, bind_port)[0][0]
except OSError as e:
logger.error(f"Couldn't open socket: {e}")
sys.exit(1)
super().__init__((bind_address, bind_port), requestHandler)
def end(self):
"""Stop the server"""
self.server_close()
self.finished = True
def serve_forever(self):
"""Main loop"""
while not self.finished:
self.handle_request()
class GlancesInstance:
"""All the methods of this class are published as XML-RPC methods."""
def __init__(self, config=None, args=None):
# Init stats
self.stats = GlancesStatsServer(config=config, args=args)
# Initial update
self.stats.update()
# cached_time is the minimum time interval between stats updates
# i.e. XML/RPC calls will not retrieve updated info until the time
# since last update is passed (will retrieve old cached info instead)
self.timer = Timer(0)
self.cached_time = args.cached_time
def __update__(self):
# Never update more than 1 time per cached_time
if self.timer.finished():
self.stats.update()
self.timer = Timer(self.cached_time)
def init(self):
# Return the Glances version
return __version__
def getAll(self):
# Update and return all the stats
self.__update__()
return json.dumps(self.stats.getAll())
def getAllPlugins(self):
# Return the plugins list
return json.dumps(self.stats.getPluginsList())
def getAllLimits(self):
# Return all the plugins limits
return json.dumps(self.stats.getAllLimitsAsDict())
def getAllViews(self):
# Return all the plugins views
return json.dumps(self.stats.getAllViewsAsDict())
def getPlugin(self, plugin):
# Update and return the plugin stat
self.__update__()
return json.dumps(self.stats.get_plugin(plugin).get_raw())
def getPluginView(self, plugin):
# Update and return the plugin view
return json.dumps(self.stats.get_plugin(plugin).get_views())
class GlancesServer:
"""This class creates and manages the TCP server."""
def __init__(self, requestHandler=GlancesXMLRPCHandler, config=None, args=None):
# Args
self.args = args
# Init the XML RPC server
try:
self.server = GlancesXMLRPCServer(args.bind_address, args.port, requestHandler, config=config)
except Exception as e:
logger.critical(f"Cannot start Glances server: {e}")
sys.exit(2)
else:
print(f'Glances XML-RPC server is running on {args.bind_address}:{args.port}')
# The users dict
# username / password couple
# By default, no auth is needed
self.server.user_dict = {}
self.server.isAuth = False
# Register functions
self.server.register_introspection_functions()
self.server.register_instance(GlancesInstance(config, args))
if not self.args.disable_autodiscover:
# Note: The Zeroconf service name will be based on the hostname
# Correct issue: Zeroconf problem with zeroconf service name #889
logger.info('Autodiscover is enabled with service name {}'.format(socket.gethostname().split('.', 1)[0]))
self.autodiscover_client = GlancesAutoDiscoverClient(socket.gethostname().split('.', 1)[0], args)
else:
logger.info("Glances autodiscover announce is disabled")
def add_user(self, username, password):
"""Add an user to the dictionary."""
self.server.user_dict[username] = password
self.server.isAuth = True
def serve_forever(self):
"""Call the main loop."""
# Set the server login/password (if -P/--password tag)
if self.args.password != "":
self.add_user(self.args.username, self.args.password)
# Serve forever
self.server.serve_forever()
def end(self):
"""End of the Glances server session."""
if not self.args.disable_autodiscover:
self.autodiscover_client.close()
self.server.end()