Skip to content
This repository was archived by the owner on Feb 22, 2020. It is now read-only.

Commit 5583462

Browse files
author
Han Xiao
authored
Merge pull request #326 from gnes-ai/fix-flow-4
feat(flow): add multiple functions to gnes flow
2 parents e3ab1aa + 80cb530 commit 5583462

File tree

3 files changed

+295
-28
lines changed

3 files changed

+295
-28
lines changed

gnes/flow/__init__.py

Lines changed: 249 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ class FlowTopologyError(ValueError):
3333
"""Exception when the topology is ambiguous"""
3434

3535

36+
class FlowMissingNode(ValueError):
37+
"""Exception when the topology is ambiguous"""
38+
39+
3640
class FlowBuildLevelMismatch(ValueError):
3741
"""Exception when required level is higher than the current build level"""
3842

@@ -86,7 +90,7 @@ class Flow:
8690
You can change this behavior by giving an argument `copy_flow=False`.
8791
8892
"""
89-
_supported_orch = {'swarm', 'k8s'}
93+
9094
_service2parser = {
9195
Service.Encoder: set_encoder_parser,
9296
Service.Router: set_router_parser,
@@ -108,36 +112,92 @@ class BuildLevel(BetterEnum):
108112
RUNTIME = 2
109113

110114
def __init__(self, with_frontend: bool = True, **kwargs):
115+
"""
116+
Create a new Flow object.
117+
118+
:param with_frontend: adding frontend service to the flow
119+
:param kwargs: keyword-value arguments that will be shared by all services
120+
"""
111121
self.logger = set_logger(self.__class__.__name__)
112122
self._service_nodes = OrderedDict()
113123
self._service_edges = {}
114124
self._service_name_counter = {k: 0 for k in Flow._service2parser.keys()}
115125
self._service_contexts = []
116-
self._last_add_service = None
126+
self._last_changed_service = []
117127
self._common_kwargs = kwargs
118128
self._frontend = None
119129
self._client = None
120130
self._build_level = Flow.BuildLevel.EMPTY
121131
self._backend = None
132+
self._init_with_frontend = False
122133
if with_frontend:
123134
self.add_frontend(copy_flow=False)
135+
self._init_with_frontend = True
124136
else:
125137
self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself')
126138

127139
@_build_level(BuildLevel.GRAPH)
128-
def to_yaml(self, orchestration: str) -> str:
129-
if orchestration not in Flow._supported_orch:
130-
raise TypeError(
131-
'%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch))
140+
def to_swarm_yaml(self) -> str:
141+
swarm_yml = ''
142+
return swarm_yml
132143

133-
@staticmethod
134-
def from_yaml(orchestration: str) -> 'Flow':
135-
if orchestration not in Flow._supported_orch:
136-
raise TypeError(
137-
'%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch))
144+
def to_python_code(self, indent: int = 4) -> str:
145+
"""
146+
Generate the python code of this flow
147+
148+
:return: the generated python code
149+
"""
150+
py_code = ['from gnes.flow import Flow', '']
151+
kwargs = []
152+
if not self._init_with_frontend:
153+
kwargs.append('with_frontend=False')
154+
if self._common_kwargs:
155+
kwargs.extend('%s=%s' % (k, v) for k, v in self._common_kwargs.items())
156+
py_code.append('f = (Flow(%s)' % (', '.join(kwargs)))
157+
158+
known_service = set()
159+
last_add_name = ''
160+
161+
for k, v in self._service_nodes.items():
162+
kwargs = OrderedDict()
163+
kwargs['service'] = str(v['service'])
164+
kwargs['name'] = k
165+
kwargs['service_in'] = '[%s]' % (
166+
','.join({'\'%s\'' % k for k in v['incomes'] if k in known_service}))
167+
if kwargs['service_in'] == '[\'%s\']' % last_add_name:
168+
kwargs.pop('service_in')
169+
kwargs['service_out'] = '[%s]' % (','.join({'\'%s\'' % k for k in v['outgoings'] if k in known_service}))
170+
171+
known_service.add(k)
172+
last_add_name = k
173+
174+
py_code.append('%s.add(%s)' % (
175+
' ' * indent,
176+
', '.join(
177+
'%s=%s' % (kk, '\'%s\'' % vv if isinstance(vv, str)
178+
and not vv.startswith('\'') and not vv.startswith('[')
179+
else vv) for kk, vv
180+
in
181+
(list(kwargs.items()) + list(v['kwargs'].items())) if
182+
vv and vv != '[]' and kk not in self._common_kwargs)))
183+
184+
py_code[-1] += ')'
185+
186+
py_code.extend(['',
187+
'# build the flow and visualize it',
188+
'f.build(backend=None).to_url()'
189+
])
190+
py_code.extend(['',
191+
'# use this flow in multi-thread mode for indexing',
192+
'with f.build(backend=\'thread\') as fl:',
193+
'%sfl.index(txt_file=\'test.txt\')' % (' ' * indent)
194+
])
195+
py_code.append('')
196+
197+
return '\n'.join(py_code)
138198

139199
@_build_level(BuildLevel.GRAPH)
140-
def to_mermaid(self, left_right: bool = True):
200+
def to_mermaid(self, left_right: bool = True) -> str:
141201
"""
142202
Output the mermaid graph for visualization
143203
@@ -238,7 +298,7 @@ def to_url(self, **kwargs) -> str:
238298
return 'https://mermaidjs.github.io/mermaid-live-editor/#/view/%s' % encoded_str
239299

240300
@_build_level(BuildLevel.GRAPH)
241-
def to_jpg(self, path: str = 'flow.jpg', **kwargs):
301+
def to_jpg(self, path: str = 'flow.jpg', **kwargs) -> None:
242302
"""
243303
Rendering the current flow as a jpg image, this will call :py:meth:`to_mermaid` and it needs internet connection
244304
@@ -249,7 +309,9 @@ def to_jpg(self, path: str = 'flow.jpg', **kwargs):
249309

250310
from urllib.request import Request, urlopen
251311
encoded_str = self.to_url().replace('https://mermaidjs.github.io/mermaid-live-editor/#/view/', '')
252-
self.logger.info('saving jpg...')
312+
self.logger.warning('jpg exporting relies on https://mermaid.ink/, but it is not very stable. '
313+
'some syntax are not supported, please use with caution.')
314+
self.logger.info('downloading as jpg...')
253315
req = Request('https://mermaid.ink/img/%s' % encoded_str, headers={'User-Agent': 'Mozilla/5.0'})
254316
with open(path, 'wb') as fp:
255317
fp.write(urlopen(req).read())
@@ -318,17 +380,154 @@ def add_router(self, *args, **kwargs) -> 'Flow':
318380
"""Add a router to the current flow, a shortcut of :py:meth:`add(Service.Router)`"""
319381
return self.add(Service.Router, *args, **kwargs)
320382

321-
def add(self, service: 'Service',
383+
def set_last_service(self, name: str, copy_flow: bool = True) -> 'Flow':
384+
"""
385+
Set a service as the last service in the flow, useful when modifying the flow.
386+
387+
:param name: the name of the existing service
388+
:param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
389+
:return: a (new) flow object with modification
390+
"""
391+
op_flow = copy.deepcopy(self) if copy_flow else self
392+
393+
if name not in op_flow._service_nodes:
394+
raise FlowMissingNode('service_in: %s can not be found in this Flow' % name)
395+
396+
if op_flow._last_changed_service and name == op_flow._last_changed_service[-1]:
397+
pass
398+
else:
399+
op_flow._last_changed_service.append(name)
400+
401+
# graph is now changed so we need to
402+
# reset the build level to the lowest
403+
op_flow._build_level = Flow.BuildLevel.EMPTY
404+
405+
return op_flow
406+
407+
def set(self, name: str, service_in: Union[str, Tuple[str], List[str], 'Service'] = None,
408+
service_out: Union[str, Tuple[str], List[str], 'Service'] = None,
409+
copy_flow: bool = True,
410+
clear_old_attr: bool = False,
411+
as_last_service: bool = False,
412+
**kwargs) -> 'Flow':
413+
"""
414+
Set the attribute of an existing service (added by :py:meth:`add`) in the flow.
415+
For the attributes or kwargs that aren't given, they will remain unchanged as before.
416+
417+
:param name: the name of the existing service
418+
:param service_in: the name of the service(s) that this service receives data from.
419+
One can also use 'Service.Frontend' to indicate the connection with the frontend.
420+
:param service_out: the name of the service(s) that this service sends data to.
421+
One can also use 'Service.Frontend' to indicate the connection with the frontend.
422+
:param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
423+
:param clear_old_attr: remove old attribute value before setting the new one
424+
:param as_last_service: whether setting the changed service as the last service in the graph
425+
:param kwargs: other keyword-value arguments that the service CLI supports
426+
:return: a (new) flow object with modification
427+
"""
428+
op_flow = copy.deepcopy(self) if copy_flow else self
429+
430+
if name not in op_flow._service_nodes:
431+
raise FlowMissingNode('service_in: %s can not be found in this Flow' % name)
432+
433+
node = op_flow._service_nodes[name]
434+
service = node['service']
435+
436+
if service_in:
437+
service_in = op_flow._parse_service_endpoints(op_flow, name, service_in, connect_to_last_service=True)
438+
439+
if clear_old_attr:
440+
node['incomes'] = service_in
441+
# remove all edges point to this service
442+
for n in op_flow._service_nodes.values():
443+
if name in n['outgoings']:
444+
n['outgoings'].remove(name)
445+
else:
446+
node['incomes'] = node['incomes'].union(service_in)
447+
448+
# add it the new edge back
449+
for s in service_in:
450+
op_flow._service_nodes[s]['outgoings'].add(name)
451+
452+
if service_out:
453+
service_out = op_flow._parse_service_endpoints(op_flow, name, service_out, connect_to_last_service=False)
454+
node['outgoings'] = service_out
455+
if clear_old_attr:
456+
# remove all edges this service point to
457+
for n in op_flow._service_nodes.values():
458+
if name in n['incomes']:
459+
n['incomes'].remove(name)
460+
else:
461+
node['outgoings'] = node['outgoings'].union(service_out)
462+
463+
for s in service_out:
464+
op_flow._service_nodes[s]['incomes'].add(name)
465+
466+
if kwargs:
467+
if not clear_old_attr:
468+
node['kwargs'].update(kwargs)
469+
kwargs = node['kwargs']
470+
args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs)
471+
node['args'] = args
472+
node['parsed_args'] = p_args
473+
node['kwargs'] = kwargs
474+
475+
if as_last_service:
476+
op_flow.set_last_service(name, False)
477+
478+
# graph is now changed so we need to
479+
# reset the build level to the lowest
480+
op_flow._build_level = Flow.BuildLevel.EMPTY
481+
482+
return op_flow
483+
484+
def remove(self, name: str = None, copy_flow: bool = True) -> 'Flow':
485+
"""
486+
Remove a service from the flow.
487+
488+
:param name: the name of the existing service
489+
:param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
490+
:return: a (new) flow object with modification
491+
"""
492+
493+
op_flow = copy.deepcopy(self) if copy_flow else self
494+
495+
if name not in op_flow._service_nodes:
496+
raise FlowMissingNode('service_in: %s can not be found in this Flow' % name)
497+
498+
op_flow._service_nodes.pop(name)
499+
500+
# remove all edges point to this service
501+
for n in op_flow._service_nodes.values():
502+
if name in n['outgoings']:
503+
n['outgoings'].remove(name)
504+
if name in n['incomes']:
505+
n['incomes'].remove(name)
506+
507+
if op_flow._service_nodes:
508+
op_flow._last_changed_service = [v for v in op_flow._last_changed_service if v != name]
509+
else:
510+
op_flow._last_changed_service = []
511+
512+
# graph is now changed so we need to
513+
# reset the build level to the lowest
514+
op_flow._build_level = Flow.BuildLevel.EMPTY
515+
516+
return op_flow
517+
518+
def add(self, service: Union['Service', str],
322519
name: str = None,
323520
service_in: Union[str, Tuple[str], List[str], 'Service'] = None,
324521
service_out: Union[str, Tuple[str], List[str], 'Service'] = None,
325522
copy_flow: bool = True,
326523
**kwargs) -> 'Flow':
327524
"""
328-
Add a service to the current flow object and return the new modified flow object
525+
Add a service to the current flow object and return the new modified flow object.
526+
The attribute of the service can be later changed with :py:meth:`set` or deleted with :py:meth:`remove`
329527
330-
:param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
331-
:param name: the name indentifier of the service, useful in 'service_in' and 'service_out'
528+
:param service: a 'Service' enum or string, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
529+
:param name: the name identifier of the service, can be used in 'service_in',
530+
'service_out', :py:meth:`set` and :py:meth:`remove`.
332531
:param service_in: the name of the service(s) that this service receives data from.
333532
One can also use 'Service.Frontend' to indicate the connection with the frontend.
334533
:param service_out: the name of the service(s) that this service sends data to.
@@ -340,6 +539,9 @@ def add(self, service: 'Service',
340539

341540
op_flow = copy.deepcopy(self) if copy_flow else self
342541

542+
if isinstance(service, str):
543+
service = Service.from_string(service)
544+
343545
if service not in Flow._service2parser:
344546
raise ValueError('service: %s is not supported, should be one of %s' % (service, Flow._service2parser))
345547

@@ -366,15 +568,16 @@ def add(self, service: 'Service',
366568
'parsed_args': p_args,
367569
'args': args,
368570
'incomes': service_in,
369-
'outgoings': service_out}
571+
'outgoings': service_out,
572+
'kwargs': kwargs}
370573

371574
# direct all income services' output to the current service
372575
for s in service_in:
373576
op_flow._service_nodes[s]['outgoings'].add(name)
374577
for s in service_out:
375578
op_flow._service_nodes[s]['incomes'].add(name)
376579

377-
op_flow._last_add_service = name
580+
op_flow.set_last_service(name, False)
378581

379582
# graph is now changed so we need to
380583
# reset the build level to the lowest
@@ -390,16 +593,16 @@ def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connec
390593
elif service_endpoint == Service.Frontend:
391594
service_endpoint = [op_flow._frontend]
392595
elif not service_endpoint:
393-
if op_flow._last_add_service and connect_to_last_service:
394-
service_endpoint = [op_flow._last_add_service]
596+
if op_flow._last_changed_service and connect_to_last_service:
597+
service_endpoint = [op_flow._last_changed_service[-1]]
395598
else:
396599
service_endpoint = []
397600
if isinstance(service_endpoint, list) or isinstance(service_endpoint, tuple):
398601
for s in service_endpoint:
399602
if s == cur_service_name:
400603
raise FlowTopologyError('the income of a service can not be itself')
401604
if s not in op_flow._service_nodes:
402-
raise FlowTopologyError('service_in: %s can not be found in this Flow' % s)
605+
raise FlowMissingNode('service_in: %s can not be found in this Flow' % s)
403606
else:
404607
raise ValueError('service_in=%s is not parsable' % service_endpoint)
405608
return set(service_endpoint)
@@ -439,11 +642,11 @@ def _build_graph(self, copy_flow: bool) -> 'Flow':
439642
if not op_flow._frontend:
440643
raise FlowImcompleteError('frontend does not exist, you may need to add_frontend()')
441644

442-
if not op_flow._last_add_service or not op_flow._service_nodes:
645+
if not op_flow._last_changed_service or not op_flow._service_nodes:
443646
raise FlowTopologyError('flow is empty?')
444647

445648
# close the loop
446-
op_flow._service_nodes[op_flow._frontend]['incomes'].add(op_flow._last_add_service)
649+
op_flow._service_nodes[op_flow._frontend]['incomes'] = {op_flow._last_changed_service[-1]}
447650

448651
# build all edges
449652
for k, v in op_flow._service_nodes.items():
@@ -580,3 +783,24 @@ def __getstate__(self):
580783
def __setstate__(self, d):
581784
self.__dict__.update(d)
582785
self.logger = set_logger(self.__class__.__name__)
786+
787+
def __eq__(self, other):
788+
"""
789+
Comparing the topology of a flow with another flow.
790+
Identification is defined by whether two flows share the same set of edges.
791+
792+
:param other: the second flow object
793+
:return:
794+
"""
795+
796+
if self._build_level.value < Flow.BuildLevel.GRAPH.value:
797+
a = self.build(backend=None, copy_flow=True)
798+
else:
799+
a = self
800+
801+
if other._build_level.value < Flow.BuildLevel.GRAPH.value:
802+
b = other.build(backend=None, copy_flow=True)
803+
else:
804+
b = other
805+
806+
return a._service_edges == b._service_edges

gnes/service/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def from_string(cls, s):
4242
try:
4343
return cls[s]
4444
except KeyError:
45-
raise ValueError()
45+
raise ValueError('%s is not a valid enum for %s' % (s, cls))
4646

4747

4848
class ReduceOp(BetterEnum):

0 commit comments

Comments
 (0)