Skip to content

Commit 8a2913b

Browse files
authored
Merge pull request #2 from pyper-dev/dev
Dev
2 parents d5b8b67 + 83ebd7c commit 8a2913b

File tree

10 files changed

+33
-53
lines changed

10 files changed

+33
-53
lines changed

assets/pyper.png

-30.8 KB
Binary file not shown.

docs/Tutorial/Pipeline.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

docs/Tutorial/task.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

docs/index.html

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/pyper/_core/pipeline.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111

1212

1313
class Pipeline:
14-
"""A sequence of at least 1 connected Tasks.
14+
"""A sequence of at least 1 Tasks.
1515
1616
Two pipelines can be piped into another via:
1717
```python
18-
new_pipeline = p1 >> p2
18+
new_pipeline = p1 | p2
1919
# OR
2020
new_pipeline = p1.pipe(p2)
2121
```
@@ -43,8 +43,8 @@ def pipe(self, other) -> Pipeline:
4343
raise TypeError(f"{other} of type {type(other)} cannot be piped into a Pipeline")
4444
return Pipeline(self.tasks + other.tasks)
4545

46-
def __rshift__(self, other: Pipeline) -> Pipeline:
47-
"""Allow the syntax `pipeline1 >> pipeline2`."""
46+
def __or__(self, other: Pipeline) -> Pipeline:
47+
"""Allow the syntax `pipeline1 | pipeline2`."""
4848
return self.pipe(other)
4949

5050
def consume(self, other: Callable) -> Callable:
@@ -55,8 +55,8 @@ def consumer(*args, **kwargs):
5555
return consumer
5656
raise TypeError(f"{other} must be a callable that takes a generator")
5757

58-
def __and__(self, other: Callable) -> Callable:
59-
"""Allow the syntax `pipeline & consumer`."""
58+
def __gt__(self, other: Callable) -> Callable:
59+
"""Allow the syntax `pipeline > consumer`."""
6060
return self.consume(other)
6161

6262
def __repr__(self):

src/pyper/_core/sync_helper/output.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __call__(self, *args, **kwargs):
3838
tp.raise_error_if_exists()
3939
try:
4040
# Use the timeout strategy for unblocking main thread without busy waiting
41-
if (data := q_out.get(timeout=1)) is StopSentinel:
41+
if (data := q_out.get(timeout=0.1)) is StopSentinel:
4242
tp.raise_error_if_exists()
4343
break
4444
yield data

src/pyper/_core/task.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
class Task:
99
"""The representation of a function within a Pipeline."""
10+
11+
__slots__ = "is_gen", "is_async", "func", "join", "concurrency", "throttle", "daemon"
12+
1013
def __init__(
1114
self,
1215
func: Callable,

tests/test_async.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,23 @@ async def consumer(data):
3232

3333
@pytest.mark.asyncio
3434
async def test_pipeline():
35-
p = task(f1) >> task(f2)
35+
p = task(f1) | task(f2)
3636
assert p(1).__next__() == 1
3737

3838
@pytest.mark.asyncio
3939
async def test_joined_pipeline():
40-
p = task(af1) >> task(af2) >> task(af4, join=True)
40+
p = task(af1) | task(af2) | task(af4, join=True)
4141
assert await p(1).__anext__() == 1
4242

4343
@pytest.mark.asyncio
4444
async def test_consumer():
45-
p = task(af1) >> task(af2) & consumer
45+
p = task(af1) | task(af2) > consumer
4646
assert await p(1) == 1
4747

4848
@pytest.mark.asyncio
4949
async def test_invalid_first_stage_concurrency():
5050
try:
51-
p = task(af1, concurrency=2) >> task(af2) & consumer
51+
p = task(af1, concurrency=2) | task(af2) > consumer
5252
await p(1)
5353
except Exception as e:
5454
assert isinstance(e, RuntimeError)
@@ -58,7 +58,7 @@ async def test_invalid_first_stage_concurrency():
5858
@pytest.mark.asyncio
5959
async def test_invalid_first_stage_join():
6060
try:
61-
p = task(af1, join=True) >> task(af2) & consumer
61+
p = task(af1, join=True) | task(af2) > consumer
6262
await p(1)
6363
except Exception as e:
6464
assert isinstance(e, RuntimeError)
@@ -68,7 +68,7 @@ async def test_invalid_first_stage_join():
6868
@pytest.mark.asyncio
6969
async def test_error_handling():
7070
try:
71-
p = task(af1) >> task(af2) >> task(af3) & consumer
71+
p = task(af1) | task(af2) | task(af3) > consumer
7272
await p(1)
7373
except Exception as e:
7474
assert isinstance(e, RuntimeError)
@@ -77,13 +77,13 @@ async def test_error_handling():
7777

7878
@pytest.mark.asyncio
7979
async def test_unified_pipeline():
80-
p = task(af1) >> task(f1) >> task(af2) >> task(f2) & consumer
80+
p = task(af1) | task(f1) | task(af2) | task(f2) > consumer
8181
assert await p(1) == 1
8282

8383
@pytest.mark.asyncio
8484
async def test_error_handling_in_daemon():
8585
try:
86-
p = task(af1) >> task(af2) >> task(f3, daemon=True) & consumer
86+
p = task(af1) | task(af2) | task(f3, daemon=True) > consumer
8787
await p(1)
8888
except Exception as e:
8989
assert isinstance(e, RuntimeError)

tests/test_sync.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def f4(a1, a2, a3, data, k1, k2):
1717

1818
def f5(data):
1919
# Make queue monitor timeout on main thread
20-
time.sleep(1.2)
20+
time.sleep(0.2)
2121
raise RuntimeError
2222

2323
def consumer(data):
@@ -27,28 +27,28 @@ def consumer(data):
2727
return total
2828

2929
def test_pipeline():
30-
p = task(f1) >> task(f2)
30+
p = task(f1) | task(f2)
3131
assert p(1).__next__() == 1
3232

3333
def test_joined_pipeline():
34-
p = task(f1) >> task(f2) >> task(f3, join=True)
34+
p = task(f1) | task(f2) | task(f3, join=True)
3535
assert p(1).__next__() == 1
3636

3737
def test_bind():
38-
p = task(f1) >> task(f4, bind=task.bind(1, 1, 1, k1=1, k2=2))
38+
p = task(f1) | task(f4, bind=task.bind(1, 1, 1, k1=1, k2=2))
3939
assert p(1).__next__() == 1
4040

4141
def test_redundant_bind_ok():
42-
p = task(f1) >> task(f2, bind=task.bind())
42+
p = task(f1) | task(f2, bind=task.bind())
4343
assert p(1).__next__() == 1
4444

4545
def test_consumer():
46-
p = task(f1) >> task(f2) & consumer
46+
p = task(f1) | task(f2) > consumer
4747
assert p(1) == 1
4848

4949
def test_invalid_first_stage_concurrency():
5050
try:
51-
p = task(f1, concurrency=2) >> task(f2) & consumer
51+
p = task(f1, concurrency=2) | task(f2) > consumer
5252
p(1)
5353
except Exception as e:
5454
assert isinstance(e, RuntimeError)
@@ -57,7 +57,7 @@ def test_invalid_first_stage_concurrency():
5757

5858
def test_invalid_first_stage_join():
5959
try:
60-
p = task(f1, join=True) >> task(f2) & consumer
60+
p = task(f1, join=True) | task(f2) > consumer
6161
p(1)
6262
except Exception as e:
6363
assert isinstance(e, RuntimeError)
@@ -66,7 +66,7 @@ def test_invalid_first_stage_join():
6666

6767
def test_error_handling():
6868
try:
69-
p = task(f1) >> task(f2) >> task(f5) & consumer
69+
p = task(f1) | task(f2) | task(f5) > consumer
7070
p(1)
7171
except Exception as e:
7272
print(e)
@@ -76,7 +76,7 @@ def test_error_handling():
7676

7777
def test_error_handling_in_daemon():
7878
try:
79-
p = task(f5, daemon=True) >> task(f2) & consumer
79+
p = task(f5, daemon=True) | task(f2) > consumer
8080
p(1)
8181
except Exception as e:
8282
print(e)

tests/test_task.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,36 +87,36 @@ def test_async_task():
8787
assert isinstance(p, AsyncPipeline)
8888

8989
def test_piped_async_task():
90-
p = task(afunc) >> task(func)
90+
p = task(afunc) | task(func)
9191
assert isinstance(p, AsyncPipeline)
9292

9393
def test_invalid_pipe():
9494
try:
95-
task(func) >> 1
95+
task(func) | 1
9696
except Exception as e:
9797
assert isinstance(e, TypeError)
9898
else:
9999
raise AssertionError
100100

101101
def test_invalid_async_pipe():
102102
try:
103-
task(afunc) >> 1
103+
task(afunc) | 1
104104
except Exception as e:
105105
assert isinstance(e, TypeError)
106106
else:
107107
raise AssertionError
108108

109109
def test_invalid_consumer():
110110
try:
111-
task(func) & 1
111+
task(func) > 1
112112
except Exception as e:
113113
assert isinstance(e, TypeError)
114114
else:
115115
raise AssertionError
116116

117117
def test_invalid_async_consumer():
118118
try:
119-
task(afunc) & func
119+
task(afunc) > func
120120
except Exception as e:
121121
assert isinstance(e, TypeError)
122122
else:

0 commit comments

Comments
 (0)