pipeline
![Go Report Card](https://goreportcard.com/badge/deliveryhero/pipeline)
Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency.
It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.
If you have another common use case you would like to see covered by this package, please open a feature request.
Cookbook
Functions
func Buffer[Item any](size int, in <-chan Item) <-chan Item
Buffer creates a buffered channel that will close after the input
is closed and the buffer is fully drained
func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item
Cancel passes an Item any
from the in <-chan Item
directly to the out <-chan Item
until the Context
is canceled.
After the context is canceled, everything from in <-chan Item
is sent to the cancel
func instead with the ctx.Err()
.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
p := pipeline.Delay(ctx, time.Second/4,
pipeline.Emit(1, 2, 3, 4, 5),
)
p = pipeline.Cancel(ctx, func(i int, err error) {
log.Printf("%+v could not be processed, %s", i, err)
}, p)
for out := range p {
log.Printf("process: %+v", out)
}
Output:
process: 1
process: 2
process: 3
process: 4
5 could not be processed, context deadline exceeded
func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item
Collect collects [Item any]
s from its in channel and returns []Item
from its out channel.
It will collect up to maxSize
inputs from the in <-chan Item
over up to maxDuration
before returning them as []Item
.
That means when maxSize
is reached before maxDuration
, [maxSize]Item
will be passed to the out channel.
But if maxDuration
is reached before maxSize
inputs are collected, [< maxSize]Item
will be passed to the out channel.
When the context
is canceled, everything in the buffer will be flushed to the out channel.
func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item
Delay delays reading each input by duration
.
If the context is canceled, the delay will not be applied.
func Drain[Item any](in <-chan Item)
Drain empties the input and blocks until the channel is closed
func Emit[Item any](is ...Item) <-chan Item
Emit fans is ...Item`` out to a
<-chan Item`
func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item
Emitter continuously emits new items generated by the next func
until the context is canceled
func Merge[Item any](ins ...<-chan Item) <-chan Item
Merge fans multiple channels in to a single channel
one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)
for i := range pipeline.Merge(one, two, three) {
fmt.Printf("output: %d\n", i)
}
fmt.Println("done")
Output:
Output:: 1
Output:: 3
Output:: 2
Output:: 2
Output:: 3
Output:: 3
done
func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output
Process takes each input from the in <-chan Input
and calls Processor.Process
on it.
When Processor.Process
returns an Output
, it will be sent to the output <-chan Output
.
If Processor.Process
returns an error, Processor.Cancel
will be called with the corresponding input and error message.
Finally, if the Context
is canceled, all inputs remaining in the in <-chan Input
will go directly to Processor.Cancel
.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
return in * 10, nil
}, func(i int, err error) {
fmt.Printf("error: could not multiply %v, %s\n", i, err)
}), p)
for result := range p {
fmt.Printf("result: %d\n", result)
}
Output:
result: 10
result: 20
result: 30
result: 40
result: 50
error: could not multiply 6, context deadline exceeded
func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output
ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of Input
s.
It passed an []Output to the Processor.Process
method and expects a []Input back.
It passes []Input batches of inputs to the Processor.Cancel
method.
If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))
p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) {
o := 1
for _, i := range is {
o *= i
}
return []int{o}, nil
}, func(is []int, err error) {
fmt.Printf("error: could not multiply %v, %s\n", is, err)
}), p)
for result := range p {
fmt.Printf("result: %d\n", result)
}
Output:
result: 2
result: 12
error: could not multiply [5 6], context deadline exceeded
func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output
ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently,
then it fans the out channels of the batch Processors back into a single out chan
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)
p = pipeline.Delay(ctx, time.Second, p)
p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) {
return ins, nil
}, func(i []int, err error) {
fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)
for result := range p {
fmt.Printf("result: %d\n", result)
}
func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output
ProcessConcurrently fans the in channel out to multiple Processors running concurrently,
then it fans the out channels of the Processors back into a single out chan
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)
p = pipeline.Delay(ctx, 2*time.Second, p)
p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
return in, nil
}, func(i int, err error) {
fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)
for result := range p {
log.Printf("result: %d\n", result)
}
func Split[Item any](in <-chan []Item) <-chan Item
Split takes an interface from Collect and splits it back out into individual elements
Examples
PipelineShutsDownOnError
The following example shows how you can shutdown a pipeline
gracefully when it receives an error message
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
if i != 1 {
cancel()
return i, fmt.Errorf("%d caused the shutdown", i)
}
return i, nil
}, func(i int, err error) {
fmt.Printf("could not process %d: %s\n", i, err)
}), p)
for result := range p {
fmt.Printf("result: %d\n", result)
}
fmt.Println("exiting the pipeline after all data is processed")
Output:
could not process 2: 2 caused the shutdown
result: 1
could not process 3: context canceled
could not process 4: context canceled
could not process 5: context canceled
could not process 6: context canceled
could not process 7: context canceled
could not process 8: context canceled
could not process 9: context canceled
could not process 10: context canceled
exiting the pipeline after all data is processed
PipelineShutsDownWhenContainerIsKilled
This example demonstrates a pipline
that runs until the os / container the pipline is running in kills it
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer cancel()
var count int
p := pipeline.Emitter(ctx, func() int {
count++
return count
})
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
if i%2 == 0 {
return i, nil
}
return i, fmt.Errorf("'%d' is an odd number", i)
}, func(i int, err error) {
fmt.Printf("error processing '%v': %s\n", i, err)
}), p)
go func() {
time.Sleep(time.Millisecond / 10)
fmt.Print("\n--- os kills the app ---\n\n")
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()
for result := range p {
fmt.Printf("result: %d\n", result)
}
fmt.Println("exiting after the input channel is closed")
Output:
error processing '1': '1' is an odd number
result: 2
--- os kills the app ---
error processing '3': '3' is an odd number
error processing '4': context canceled
exiting after the input channel is closed
PipelineShutsDownWhenInputChannelIsClosed
The following example demonstrates a pipeline
that naturally finishes its run when the input channel is closed
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
return i * 2, nil
}, func(i int, err error) {
fmt.Printf("could not multiply %d: %s\n", i, err)
}), p)
for result := range p {
fmt.Printf("result: %d\n", result)
}
fmt.Println("exiting after the input channel is closed")
Output:
result: 2
result: 4
result: 6
result: 8
result: 10
result: 12
result: 14
result: 16
result: 18
result: 20
exiting after the input channel is closed