This repository was archived by the owner on Feb 22, 2020. It is now read-only.
File tree Expand file tree Collapse file tree 4 files changed +13
-7
lines changed Expand file tree Collapse file tree 4 files changed +13
-7
lines changed Original file line number Diff line number Diff line change @@ -63,9 +63,11 @@ def get_default_fn(r_type):
63
63
fn = self .routes .get (resp_type )
64
64
else :
65
65
fn = get_default_fn (type (resp ))
66
-
67
- self .logger .info ('handling response with %s' % fn .__name__ )
68
- return fn (self ._context , resp )
66
+ self .logger .info ('handling response with %s' % fn .__name__ )
67
+ return fn (self ._context , resp )
68
+ else :
69
+ self .logger .warning ('the received message is not response' )
70
+ return None
69
71
70
72
71
73
class ZmqClient :
Original file line number Diff line number Diff line change @@ -44,7 +44,7 @@ class StreamingClient(GrpcClient):
44
44
def __init__ (self , args ):
45
45
super ().__init__ (args )
46
46
47
- self ._request_queue = queue .Queue (maxsize = 1000 )
47
+ self ._request_queue = queue .Queue (maxsize = 10 )
48
48
self ._is_streaming = threading .Event ()
49
49
50
50
self ._dispatch_thread = threading .Thread (target = self ._start )
@@ -63,16 +63,16 @@ def _start(self):
63
63
self ._is_streaming .clear ()
64
64
65
65
def _request_generator (self ):
66
- while self . _is_streaming . is_set () :
66
+ while True :
67
67
try :
68
68
request = self ._request_queue .get (block = True , timeout = 5.0 )
69
69
if request is None :
70
70
break
71
71
yield request
72
72
except queue .Empty :
73
- continue
73
+ break
74
74
except Exception as e :
75
- print ('exception: %s' % str (e ))
75
+ self . logger . error ('exception: %s' % str (e ))
76
76
break
77
77
78
78
@handler .register (NotImplementedError )
Original file line number Diff line number Diff line change @@ -46,6 +46,7 @@ def apply(self, doc: 'gnes_pb2.Document') -> None:
46
46
else :
47
47
idx = np .sort (np .random .choice (len (images ), self .sframes , replace = False ))
48
48
chunk .blob .CopyFrom (array2blob (images [idx ]))
49
+ del images
49
50
else :
50
51
self .logger .error (
51
52
'bad document: "doc.chunks" is empty!' )
Original file line number Diff line number Diff line change @@ -125,6 +125,9 @@ def get_response(num_recv, blocked=False):
125
125
with self .zmq_context as zmq_client :
126
126
127
127
for request in request_iterator :
128
+ num_recv = max (self .pending_request - self .args .max_pending_request , 0 )
129
+ yield from get_response (num_recv , num_recv > 0 )
130
+
128
131
zmq_client .send_message (self .add_envelope (request , zmq_client ), ** self .send_recv_kwargs )
129
132
self .pending_request += 1
130
133
You can’t perform that action at this time.
0 commit comments