blob: 633ff13c35ae36eb1504c8fc99a6220f8e501090 [file] [log] [blame]
vjpaifdeacd92016-03-17 21:52:201#!/usr/bin/env ruby
2
Jan Tattermusch7897ae92017-06-07 20:57:363# Copyright 2016 gRPC authors.
vjpaifdeacd92016-03-17 21:52:204#
Jan Tattermusch7897ae92017-06-07 20:57:365# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
vjpaifdeacd92016-03-17 21:52:208#
Jan Tattermusch7897ae92017-06-07 20:57:369# http://www.apache.org/licenses/LICENSE-2.0
vjpaifdeacd92016-03-17 21:52:2010#
Jan Tattermusch7897ae92017-06-07 20:57:3611# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
vjpaifdeacd92016-03-17 21:52:2016
17# Worker and worker service implementation
18
19this_dir = File.expand_path(File.dirname(__FILE__))
20lib_dir = File.join(File.dirname(this_dir), 'lib')
21$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
22$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
23
24require 'grpc'
25require 'optparse'
26require 'histogram'
27require 'etc'
28require 'facter'
vjpai75291c92016-03-21 14:59:2629require 'client'
30require 'qps-common'
31require 'server'
kpayson647a20c962018-04-18 22:19:5532require 'src/proto/grpc/testing/worker_service_services_pb'
vjpaifdeacd92016-03-17 21:52:2033
34class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
vjpai75291c92016-03-21 14:59:2635 def cpu_cores
36 Facter.value('processors')['count']
vjpaifdeacd92016-03-17 21:52:2037 end
vjpai75291c92016-03-21 14:59:2638 def run_server(reqs)
39 q = EnumeratorQueue.new(self)
40 Thread.new {
vjpaie8ff3882016-03-30 01:31:3441 bms = ''
vjpaied25a332016-03-30 19:16:2542 gtss = Grpc::Testing::ServerStatus
Jan Tattermusch38493352016-04-19 01:07:4943 reqs.each do |req|
vjpai45be26e2016-03-30 00:21:2844 case req.argtype.to_s
45 when 'setup'
vjpaie8ff3882016-03-30 01:31:3446 bms = BenchmarkServer.new(req.setup, @server_port)
vjpaied25a332016-03-30 19:16:2547 q.push(gtss.new(stats: bms.mark(false), port: bms.get_port))
Jan Tattermusch38493352016-04-19 01:07:4948 when 'mark'
vjpaied25a332016-03-30 19:16:2549 q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores))
vjpai75291c92016-03-21 14:59:2650 end
51 end
vjpaie8ff3882016-03-30 01:31:3452 bms.stop
Alexander Polcyn847f9ec2016-08-16 01:44:1453 q.push(self)
vjpai75291c92016-03-21 14:59:2654 }
55 q.each_item
56 end
57 def run_client(reqs)
58 q = EnumeratorQueue.new(self)
59 Thread.new {
vjpaid7f43b32016-03-30 01:38:1060 client = ''
vjpai75291c92016-03-21 14:59:2661 reqs.each do |req|
vjpaiad1c1cc2016-03-30 16:58:4662 case req.argtype.to_s
vjpai45be26e2016-03-30 00:21:2863 when 'setup'
64 client = BenchmarkClient.new(req.setup)
vjpai75291c92016-03-21 14:59:2665 q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
vjpai45be26e2016-03-30 00:21:2866 when 'mark'
vjpaied25a332016-03-30 19:16:2567 q.push(Grpc::Testing::ClientStatus.new(stats:
68 client.mark(req.mark.reset)))
vjpai75291c92016-03-21 14:59:2669 end
70 end
vjpaiad1c1cc2016-03-30 16:58:4671 client.shutdown
Alexander Polcyn847f9ec2016-08-16 01:44:1472 q.push(self)
vjpai75291c92016-03-21 14:59:2673 }
74 q.each_item
vjpaifdeacd92016-03-17 21:52:2075 end
76 def core_count(_args, _call)
vjpai75291c92016-03-21 14:59:2677 Grpc::Testing::CoreResponse.new(cores: cpu_cores)
vjpaifdeacd92016-03-17 21:52:2078 end
79 def quit_worker(_args, _call)
Alex Polcyn81e95812017-09-24 01:48:0280 @shutdown_thread = Thread.new {
vjpaifdeacd92016-03-17 21:52:2081 @server.stop
82 }
83 Grpc::Testing::Void.new
84 end
vjpai45be26e2016-03-30 00:21:2885 def initialize(s, sp)
vjpaifdeacd92016-03-17 21:52:2086 @server = s
vjpai45be26e2016-03-30 00:21:2887 @server_port = sp
vjpaifdeacd92016-03-17 21:52:2088 end
Alex Polcyn81e95812017-09-24 01:48:0289 def join_shutdown_thread
90 @shutdown_thread.join
91 end
vjpaifdeacd92016-03-17 21:52:2092end
93
94def main
95 options = {
vjpai45be26e2016-03-30 00:21:2896 'driver_port' => 0,
97 'server_port' => 0
vjpaifdeacd92016-03-17 21:52:2098 }
99 OptionParser.new do |opts|
vjpai45be26e2016-03-30 00:21:28100 opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]'
vjpaifdeacd92016-03-17 21:52:20101 opts.on('--driver_port PORT', '<port>') do |v|
102 options['driver_port'] = v
103 end
vjpai45be26e2016-03-30 00:21:28104 opts.on('--server_port PORT', '<port>') do |v|
105 options['server_port'] = v
106 end
vjpaifdeacd92016-03-17 21:52:20107 end.parse!
Alexander Polcyn847f9ec2016-08-16 01:44:14108
109 # Configure any errors with client or server child threads to surface
110 Thread.abort_on_exception = true
111
Alex Polcyn81e95812017-09-24 01:48:02112 s = GRPC::RpcServer.new(poll_period: 3)
vjpaifdeacd92016-03-17 21:52:20113 s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
114 :this_port_is_insecure)
Alex Polcyn81e95812017-09-24 01:48:02115 worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i)
116 s.handle(worker_service)
vjpaifdeacd92016-03-17 21:52:20117 s.run
Alex Polcyn81e95812017-09-24 01:48:02118 worker_service.join_shutdown_thread
vjpaifdeacd92016-03-17 21:52:20119end
120
121main