Skip to content
This repository was archived by the owner on Feb 22, 2020. It is now read-only.

Commit 758797b

Browse files
authored
Merge pull request #333 from gnes-ai/fix-ctrl-ipc
fix(service): use ipc for control socket
2 parents 59539d7 + 02941b6 commit 758797b

File tree

4 files changed

+41
-11
lines changed

4 files changed

+41
-11
lines changed

docs/chapter/enviromentvars.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,6 @@ Default is not set. A random port will be used.
3838

3939
(*depreciated*) Paths of the third party components. See examples in GNES hub for latest usage.
4040

41+
## `GNES_IPC_SOCK_TMP`
42+
43+
Temp directory for ipc sockets, not used on Windows.

gnes/cli/parser.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ def set_service_parser(parser=None):
202202
parser.add_argument('--squeeze_pb', action=ActionNoYes, default=True,
203203
help='sending bytes and ndarray separately apart from the protobuf message, '
204204
'usually yields better network efficiency')
205+
parser.add_argument('--ctrl_with_ipc', action='store_true', default=False,
206+
help='use ipc protocol for control socket')
205207
return parser
206208

207209

gnes/flow/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ def _build_graph(self, copy_flow: bool) -> 'Flow':
703703
op_flow._build_level = BuildLevel.GRAPH
704704
return op_flow
705705

706-
def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *args, **kwargs) -> 'Flow':
706+
def build(self, backend: Optional[str] = 'process', copy_flow: bool = False, *args, **kwargs) -> 'Flow':
707707
"""
708708
Build the current flow and make it ready to use
709709

gnes/service/base.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515

1616
import copy
1717
import multiprocessing
18+
import os
1819
import random
20+
import tempfile
1921
import threading
2022
import time
2123
import types
24+
import uuid
2225
from contextlib import ExitStack
2326
from enum import Enum
2427
from typing import Tuple, List, Union, Type
@@ -113,8 +116,19 @@ class EventLoopEnd(Exception):
113116
pass
114117

115118

116-
def build_socket(ctx: 'zmq.Context', host: str, port: int, socket_type: 'SocketType', identity: 'str' = None) -> Tuple[
117-
'zmq.Socket', str]:
119+
def get_random_ipc() -> str:
120+
try:
121+
tmp = os.environ['GNES_IPC_SOCK_TMP']
122+
if not os.path.exists(tmp):
123+
raise ValueError('This directory for sockets ({}) does not seems to exist.'.format(tmp))
124+
tmp = os.path.join(tmp, str(uuid.uuid1())[:8])
125+
except KeyError:
126+
tmp = tempfile.NamedTemporaryFile().name
127+
return 'ipc://%s' % tmp
128+
129+
130+
def build_socket(ctx: 'zmq.Context', host: str, port: int,
131+
socket_type: 'SocketType', identity: 'str' = None, use_ipc: bool = False) -> Tuple['zmq.Socket', str]:
118132
sock = {
119133
SocketType.PULL_BIND: lambda: ctx.socket(zmq.PULL),
120134
SocketType.PULL_CONNECT: lambda: ctx.socket(zmq.PULL),
@@ -129,11 +143,14 @@ def build_socket(ctx: 'zmq.Context', host: str, port: int, socket_type: 'SocketT
129143
}[socket_type]()
130144

131145
if socket_type.is_bind:
132-
host = BaseService.default_host
133-
if port is None:
134-
sock.bind_to_random_port('tcp://%s' % host)
146+
if use_ipc:
147+
sock.bind(host)
135148
else:
136-
sock.bind('tcp://%s:%d' % (host, port))
149+
host = BaseService.default_host
150+
if port is None:
151+
sock.bind_to_random_port('tcp://%s' % host)
152+
else:
153+
sock.bind('tcp://%s:%d' % (host, port))
137154
else:
138155
if port is None:
139156
sock.connect(host)
@@ -329,8 +346,12 @@ def __init__(self, args):
329346
self.last_dump_time = time.perf_counter()
330347
self._model = None
331348
self.use_event_loop = True
332-
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)
333-
self.logger.info('control address: %s' % self.ctrl_addr)
349+
self.ctrl_with_ipc = (os.name != 'nt') and self.args.ctrl_with_ipc
350+
if self.ctrl_with_ipc:
351+
self.ctrl_addr = get_random_ipc()
352+
else:
353+
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)
354+
334355
self.send_recv_kwargs = dict(
335356
check_version=self.args.check_version,
336357
timeout=self.args.timeout,
@@ -401,7 +422,12 @@ def _run(self, ctx):
401422
self.handler.service_context = self
402423
# print('!!!! t_id: %d service_context: %r' % (threading.get_ident(), self.handler.service_context))
403424
self.logger.info('bind sockets...')
404-
ctrl_sock, ctrl_addr = build_socket(ctx, self.default_host, self.args.port_ctrl, SocketType.PAIR_BIND)
425+
if self.ctrl_with_ipc:
426+
ctrl_sock, ctrl_addr = build_socket(ctx, self.ctrl_addr, None, SocketType.PAIR_BIND,
427+
use_ipc=self.ctrl_with_ipc)
428+
else:
429+
ctrl_sock, ctrl_addr = build_socket(ctx, self.default_host, self.args.port_ctrl, SocketType.PAIR_BIND)
430+
405431
self.logger.info('control over %s' % (colored(ctrl_addr, 'yellow')))
406432

407433
in_sock, _ = build_socket(ctx, self.args.host_in, self.args.port_in, self.args.socket_in,
@@ -412,7 +438,6 @@ def _run(self, ctx):
412438
self.args.identity)
413439
self.logger.info('output %s:%s' % (self.args.host_out, colored(self.args.port_out, 'yellow')))
414440

415-
416441
self.logger.info(
417442
'input %s:%s\t output %s:%s\t control over %s' % (
418443
self.args.host_in, colored(self.args.port_in, 'yellow'),

0 commit comments

Comments
 (0)