pipeline module

class pipeline.Buffer(*args, n=2, **kwds)[source]

Bases: Processor

A Processor with an explicit buffer that can be used to adjust buffer sizes. E.g. group or split up items.

full() bool[source]
class pipeline.Combiner(*args, n=2, **kwds)[source]

Bases: Buffer

Many-to-one e.g. Combiner(n=2) transforms items into pairs of items

process_item(item)[source]

Override this method to change the default identiy method.

ready_to_process() bool[source]
class pipeline.Distributer(*args, n=2, **kwds)[source]

Bases: Buffer

One-to-many e.g. Distributer(n=2) transforms pairs into items

append(group_of_items)[source]
extend(groups_of_items)[source]
class pipeline.Pipeline(items=[], *, processors: List[Processor] = [])[source]

Bases: Processor

A Processor that combines of multiple processing-stages.

Note that len(Pipeline) returns the length of the buffer of the whole Pipeline and not the total length of all components.

init_processors(processors: List[Processor])[source]
class pipeline.Processor(items=[])[source]

Bases: object

A queue-like object that can “process” items.

The superclass list allows the “queue” to be sorted and grouped. * Use .append() or .extend() to add items to the queue/buffer. * Use .process() to handle or process an item.

append(*args)[source]
clear()[source]
extend(*args)[source]
static from_function(pure_func)[source]

Decorator that defines a “stateless” processor.

Usage

new_func = Processor.from_function(func)

Note that new_func and func must have the same name in order to be compatible with pickle.

Parameters

pure_func : (object) -> object

process(item=None)[source]

Add an item to a queue, process the next item in the queue and return the result. Override this method to change queue behaviour into e.g. FIFO.

process_item(item)[source]

Override this method to change the default identiy method.

ready_to_process() bool[source]
class pipeline.PushPull(*args, strategy=Strategy.constant, **kwds)[source]

Bases: Pipeline

append(item)[source]
extend(items)[source]
property in_queue
init_resources(strategy)[source]
n_processors = 1
property out_queue
process(item=None)[source]

Process an item and then yield the result

process_buffer()[source]
process_item(item)[source]

Override this method to change the default identiy method.

start_resources()[source]
class pipeline.Resource(processor: pipeline.Processor, delivery_queues: Tuple[queue.Queue, queue.Queue], demand_queues: Tuple[queue.Queue, queue.Queue] = None, strategy: pipeline.Strategy = <Strategy.push: 1>)[source]

Bases: object

delivery_queues: Tuple[Queue, Queue]
demand_input()[source]
demand_queues: Tuple[Queue, Queue] = None
property in_queue: Queue
property out_queue: Queue
process()[source]
processor: Processor
put(item)[source]
start(max_items=None)[source]
strategy: Strategy = 1
wait_for_input_demand()[source]
class pipeline.Strategy(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

constant = 3
pull = 2
push = 1
pipeline.constant_(*args)[source]
pipeline.duplicate_(x)[source]
pipeline.identity_(x)[source]