|
20 | 20 | import grpc
|
21 | 21 | from google.protobuf.json_format import MessageToJson
|
22 | 22 |
|
23 |
| -from .. import __version__, __proto_version__ |
24 | 23 | from ..client.base import ZmqClient
|
25 | 24 | from ..helper import set_logger, make_route_table
|
26 | 25 | from ..proto import gnes_pb2_grpc, gnes_pb2, router2str, add_route, add_version
|
@@ -59,6 +58,7 @@ def __init__(self, args):
|
59 | 58 | check_version=self.args.check_version,
|
60 | 59 | timeout=self.args.timeout,
|
61 | 60 | squeeze_pb=self.args.squeeze_pb)
|
| 61 | + self.pending_request = 0 |
62 | 62 |
|
63 | 63 | def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'):
|
64 | 64 | msg = gnes_pb2.Message()
|
@@ -104,21 +104,27 @@ def Search(self, request, context):
|
104 | 104 | return self.Call(request, context)
|
105 | 105 |
|
106 | 106 | def StreamCall(self, request_iterator, context):
|
| 107 | + self.pending_request = 0 |
| 108 | + |
| 109 | + def get_response(num_recv, blocked=False): |
| 110 | + for _ in range(num_recv): |
| 111 | + if blocked or zmq_client.receiver.poll(1): |
| 112 | + msg = zmq_client.recv_message(**self.send_recv_kwargs) |
| 113 | + self.pending_request -= 1 |
| 114 | + yield self.remove_envelope(msg) |
| 115 | + |
107 | 116 | with self.zmq_context as zmq_client:
|
108 |
| - num_request = 0 |
109 | 117 |
|
110 | 118 | for request in request_iterator:
|
111 | 119 | zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs)
|
112 |
| - num_request += 1 |
| 120 | + self.pending_request += 1 |
113 | 121 |
|
114 |
| - if zmq_client.receiver.poll(1): |
115 |
| - msg = zmq_client.recv_message(**self.send_recv_kwargs) |
116 |
| - num_request -= 1 |
117 |
| - yield self.remove_envelope(msg) |
| 122 | + num_recv = max(self.pending_request - self.args.max_pending_request, 1) |
| 123 | + |
| 124 | + # switch to blocked recv when too many pending requests |
| 125 | + yield from get_response(num_recv, num_recv > 1) |
118 | 126 |
|
119 |
| - for _ in range(num_request): |
120 |
| - msg = zmq_client.recv_message(**self.send_recv_kwargs) |
121 |
| - yield self.remove_envelope(msg) |
| 127 | + yield from get_response(self.pending_request, blocked=True) |
122 | 128 |
|
123 | 129 | class ZmqContext:
|
124 | 130 | """The zmq context class."""
|
|
0 commit comments