Skip to content

Commit

Permalink
added multiple retries on receiving and transmitting in pybird_zmq_cl…
Browse files Browse the repository at this point in the history
…ient

Signed-off-by: syaakov <syaakov@cisco.com>
  • Loading branch information
syaakov committed Mar 29, 2022
1 parent 7a63185 commit cc7339f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,35 +1,40 @@
import zmq
import json
from trex.common.trex_ctx import TRexCtx
from trex.common.trex_exceptions import TRexError
from trex.common.trex_logger import ScreenLogger
from trex.common.trex_req_resp_client import JsonRpcClient
from trex.utils.common import get_current_user
import time
import hashlib
import random as rand
from argparse import *
from trex.pybird.bird_cfg_creator import *

class ConnectionException(Exception): pass
class ConfigurationException(Exception): pass

def rand_32_bit():
return rand.randint(0, 0xFFFFFFFF)

# TIMEOUTS
SEND_TIMEOUT = 10000
RCV_TIMEOUT = 10000
RCV_TIMEOUT_ON_EXIT = 500
HEARTBEAT_TIMEOUT = 6000
HEARTBEAT_IVL = 5000
SYNC_TIMEOUT = 10

class PyBirdClient():

CLIENT_VERSION = "1.0" # need to sync with bird zmq sever

def __init__(self, ip = 'localhost', port = 4509):
def __init__(self, ip = 'localhost', port = 4509, verbose_level = "error", logger = None, sync_timeout = None):
self.ip = ip
self.socket = None
self.context = None
self.port = port
self.handler = None # represent non connected client
self.is_connected = False
logger = logger if logger is not None else ScreenLogger()
logger.set_verbose(verbose_level)
sync_timeout = sync_timeout if sync_timeout is not None else SYNC_TIMEOUT
self.ctx = TRexCtx(None,
get_current_user(),
ip,
port,
None,
logger,
sync_timeout,
None)
self.rpc = JsonRpcClient(self.ctx)
self.rpc.set_retry_base(True)

def __del__(self):
try:
Expand All @@ -38,70 +43,21 @@ def __del__(self):
print(e.message)

def _close_conn(self):
if not self.socket.close:
if self.handler is not None:
self.release()
if self.socket is not None:
self.socket.close()
if self.context is not None:
self.context.destroy()

def _get_response(self, id):
while True:
try:
message = self.socket.recv()
except zmq.Again:
raise ConnectionException("Didn't get answer from Pybird Server for %s seconds, probably shutdown before client disconnect" % (RCV_TIMEOUT / 1000))

message = message.decode()
try:
message_parsed = json.loads(message)
except:
print('"Error in parsing response! got: "%s"' % message)
break
if type(message_parsed) is not dict:
print('Error in message: "%s"' % message)
raise Exception('Got from server "{}" type instead of dictionary! content: {}'.format(type(message_parsed), message_parsed))
if 'error' in message_parsed.keys():
print('Error in message: "%s"' % message)
raise Exception('Got exception from server! message: %s' % (message_parsed['error']['message']))
if 'id' not in message_parsed.keys():
print("Got response with no id, waiting for another one")
elif message_parsed['id'] != id:
print("Got response with different id, waiting for another one")
else:
break # found the wanted response
if 'result' in message_parsed.keys():
return message_parsed['result']
raise Exception(message_parsed['error'])
self.rpc.disconnect()

def _call_method(self, method_name, method_params):
rand_id = rand_32_bit() # generate 32 bit random id for request
json_rpc_req = { "jsonrpc": "2.0", "method": method_name , "params": method_params, "id": rand_id}
request = json.dumps(json_rpc_req)
try:
self.socket.send(request.encode('utf-8'))
except zmq.Again:
raise ConnectionException("Didn't get answer from Pybird Server for %s seconds, probably shutdown before client disconnect" % (RCV_TIMEOUT / 1000))
return self._get_response(rand_id)
rc = self.rpc.transmit(method_name=method_name, params=method_params)
if not rc:
raise TRexError(rc.err())
return rc.data()

def connect(self):
'''
Connect client to PyBird server. Only check versions and open the socket to bird.
'''
if not self.is_connected:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)

self.socket.setsockopt(zmq.SNDTIMEO, SEND_TIMEOUT)
self.socket.setsockopt(zmq.RCVTIMEO, RCV_TIMEOUT)
self.socket.setsockopt(zmq.HEARTBEAT_IVL, HEARTBEAT_IVL)
self.socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT)

self.socket.connect("tcp://"+str(self.ip)+":"+str(self.port))
if not self.rpc.is_connected():
self.rpc.connect()
result = self._call_method('connect', [PyBirdClient.CLIENT_VERSION])
if result:
self.is_connected = True
return result
else:
raise Exception("PyBird Client is already connected!")
Expand All @@ -119,10 +75,9 @@ def acquire(self, force=False):
+ :exc:`ConnectionException` in case of error
'''
if self.is_connected or force:
if self.rpc.is_connected() or force:
result = self._call_method('acquire', [force])
self.handler = result
self.is_connected = True
else:
raise ConnectionException("Cannot acquire before connect!")
return result
Expand All @@ -131,12 +86,12 @@ def get_config(self):
'''
Query, Return the current bird configuration.
'''
if not self.is_connected:
if not self.rpc.is_connected():
raise ConnectionException("Cannot get config when client is not connected!")
return self._call_method('get_config', [])

def get_protocols_info(self):
if not self.is_connected:
if not self.rpc.is_connected():
raise ConnectionException("Cannot get protocols information when client is not connected!")
return self._call_method('get_protocols_info', [])

Expand Down Expand Up @@ -217,7 +172,6 @@ def release(self):
+ :exc:`ConnectionError` in case client is not acquired
'''
if self.handler is not None:
self.socket.setsockopt(zmq.RCVTIMEO, RCV_TIMEOUT_ON_EXIT)
res = self._call_method('release', [self.handler])

self.handler = None
Expand All @@ -234,12 +188,12 @@ def disconnect(self):
'''
if self.handler is not None:
raise Exception('Client is acquired! run "release" first')
if self.is_connected:
self.socket.setsockopt(zmq.RCVTIMEO, RCV_TIMEOUT_ON_EXIT)
return self._call_method('disconnect', [])
else:
if not self.rpc.is_connected():
raise ConnectionException("Cannot disconnect, client is not connected")
else:
ret = self._call_method('disconnect', [])
self._close_conn()
return ret

def _upload_fragmented(self, rpc_cmd, upload_string):
index_start = 0
Expand All @@ -258,20 +212,11 @@ def _upload_fragmented(self, rpc_cmd, upload_string):
params['md5'] = hashlib.md5(upload_string.encode()).hexdigest()

# send the fragment
json_rpc_req = { "jsonrpc":"2.0","method": rpc_cmd ,"params": params, "id": rand_32_bit()}
request = json.dumps(json_rpc_req)

try:
self.socket.send(request.encode('utf-8'))
except zmq.Again:
raise ConnectionException("Didn't get answer from Pybird Server for %s seconds, probably shutdown before client disconnect" % (RCV_TIMEOUT / 1000))

# wait for server response
respond = self._get_response(json_rpc_req['id'])
respond = self._call_method(rpc_cmd, params)
if respond != 'send_another_frag':
return respond
index_start = index_end
fragment_length = 50000
fragment_length = 500000

raise ConfigurationException("Sent all the fragments, but did not get the configuration response")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import traceback
import sys
import random as rand
from trex.utils.zipmsg import ZippedMsg

import zmq
from trex.common.trex_types import LRU_cache
Expand Down Expand Up @@ -216,6 +217,7 @@ def __init__(self, args, port = 4509):
self.current_handler = None # store random 32bit number for current client
self.connected_clients = 0
self.bird_wrapper = BirdWrapper()
self.zipper = ZippedMsg()

try:
self.IP_address = socket.gethostbyname(socket.gethostname())
Expand Down Expand Up @@ -306,13 +308,26 @@ def check_handler(self, params):
if params['handler'] != self.current_handler:
raise InvalidHandler('Cannot make a change while another client is handled')

def recv_msg(self):
to_compress = False
message = self.socket.recv()
if self.zipper.is_compressed(message):
message = self.zipper.decompress(message)
to_compress = True
return message.decode(), to_compress

def send_msg(self, json_response, to_compress=False):
buffer = json_response.encode('utf-8')
if self.zipper.check_threshold(buffer) and to_compress:
buffer = self.zipper.compress(buffer)
self.socket.send(buffer)

def start(self):
self.logger.info('***Bird Server Started***')
try:
while True:
try:
message = self.socket.recv()
message = message.decode()
message, to_compress = self.recv_msg()

self.logger.info('Received Message: %s' % message)
except zmq.ZMQError as e:
Expand Down Expand Up @@ -351,7 +366,7 @@ def start(self):
json_response = json.dumps(self.bird_wrapper.error_handler(e, req_id))

# Send reply back to client
self.socket.send(json_response.encode('utf-8'))
self.send_msg(json_response, to_compress=to_compress)
if (method == 'shut_down'):
break

Expand Down

0 comments on commit cc7339f

Please sign in to comment.