pipeline: remove old tests, integrate with our test suite (#5729)

These have probably not been run by anyone in ages, better to move the
code to our test suite where it is regularly exercised. In fact, the
latter covers most of the cases already. The only missing tests seem to
be those were exceptions are raised in the first or last stage. Thus,
this adds such tests.
This commit is contained in:
Šarūnas Nejus 2025-05-06 13:58:09 +01:00 committed by GitHub
commit e1101b7a38
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 36 additions and 69 deletions

View file

@ -492,64 +492,3 @@ class Pipeline:
msgs = next_msgs
for msg in msgs:
yield msg
# Smoke test.
if __name__ == "__main__":
import time
# Test a normally-terminating pipeline both in sequence and
# in parallel.
def produce():
for i in range(5):
print("generating %i" % i)
time.sleep(1)
yield i
def work():
num = yield
while True:
print("processing %i" % num)
time.sleep(2)
num = yield num * 2
def consume():
while True:
num = yield
time.sleep(1)
print("received %i" % num)
ts_start = time.time()
Pipeline([produce(), work(), consume()]).run_sequential()
ts_seq = time.time()
Pipeline([produce(), work(), consume()]).run_parallel()
ts_par = time.time()
Pipeline([produce(), (work(), work()), consume()]).run_parallel()
ts_end = time.time()
print("Sequential time:", ts_seq - ts_start)
print("Parallel time:", ts_par - ts_seq)
print("Multiply-parallel time:", ts_end - ts_par)
print()
# Test a pipeline that raises an exception.
def exc_produce():
for i in range(10):
print("generating %i" % i)
time.sleep(1)
yield i
def exc_work():
num = yield
while True:
print("processing %i" % num)
time.sleep(3)
if num == 3:
raise Exception()
num = yield num * 2
def exc_consume():
while True:
num = yield
print("received %i" % num)
Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)

View file

@ -39,11 +39,16 @@ def _consume(result):
result.append(i)
# A worker that raises an exception.
# Pipeline stages that raise an exception.
class PipelineError(Exception):
pass
def _exc_produce(num=5):
yield from range(num)
raise PipelineError()
def _exc_work(num=3):
i = None
while True:
@ -53,6 +58,14 @@ def _exc_work(num=3):
i *= 2
def _exc_consume(result, num=4):
while True:
i = yield
if i == num:
raise PipelineError()
result.append(i)
# A worker that yields a bubble.
def _bub_work(num=3):
i = None
@ -121,17 +134,32 @@ class ParallelStageTest(unittest.TestCase):
class ExceptionTest(unittest.TestCase):
def setUp(self):
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), _exc_work(), _consume(self.result))
)
def run_sequential(self, *stages):
pl = pipeline.Pipeline(stages)
with pytest.raises(PipelineError):
pl.run_sequential()
def run_parallel(self, *stages):
pl = pipeline.Pipeline(stages)
with pytest.raises(PipelineError):
pl.run_parallel()
def test_run_sequential(self):
with pytest.raises(PipelineError):
self.pl.run_sequential()
"""Test that exceptions from various stages of the pipeline are
properly propagated when running sequentially.
"""
self.run_sequential(_exc_produce(), _work(), _consume(self.result))
self.run_sequential(_produce(), _exc_work(), _consume(self.result))
self.run_sequential(_produce(), _work(), _exc_consume(self.result))
def test_run_parallel(self):
with pytest.raises(PipelineError):
self.pl.run_parallel()
"""Test that exceptions from various stages of the pipeline are
properly propagated when running in parallel.
"""
self.run_parallel(_exc_produce(), _work(), _consume(self.result))
self.run_parallel(_produce(), _exc_work(), _consume(self.result))
self.run_parallel(_produce(), _work(), _exc_consume(self.result))
def test_pull(self):
pl = pipeline.Pipeline((_produce(), _exc_work()))