Linea

Linea is a composable stream processing library for Go.
It allows you to build data processing pipelines by connecting three main components:
- Sources: Produce data items (e.g., from slices, channels)
- Flows: Transform data items (e.g., map, filter, batch)
- Sinks: Consume and reduce data items to final results
Key features:
- Type-safe: Uses Go generics for compile-time type checking
- Composable: Components can be chained together flexibly
- Concurrent: Built on Go channels and goroutines
- Cancellable: Supports context-based cancellation
- Parallel: Offers parallel processing flows (MapPar, FlatMapPar)
Quickstart
package main
import (
"context"
"fmt"
"strconv"
"github.com/svenvdam/linea/compose"
"github.com/svenvdam/linea/flows"
"github.com/svenvdam/linea/sinks"
"github.com/svenvdam/linea/sources"
)
func main() {
ctx := context.Background()
stream := compose.SourceThroughFlowToSink2(
sources.Slice([]int{1, 2, 3, 4, 5}),
flows.Filter(func(i int) bool { return i%2 == 0 }),
flows.Map(func(i int) string { return strconv.Itoa(i) }),
sinks.Slice[string](),
)
result := <-stream.Run(ctx)
fmt.Println(result.Value)
}
Installation
go get github.com/svenvdam/linea
Creating Streams
Linea provides several ways to create and compose streams. The recommended approach is to use the pre-built components in the sources
, flows
, and sinks
packages:
stream1 := compose.SourceThroughFlowToSink(
sources.Slice([]int{1, 2, 3}),
flows.Map(func(i int) int { return i * 2 }),
sinks.Slice[int](),
)
stream2 := compose.SourceThroughFlowToSink2(
sources.Slice([]string{"a", "b"}),
flows.Map(strings.ToUpper),
flows.Filter(func(s string) bool { return s != "B" }),
sinks.Slice[string](),
)
The core
package provides more advanced functionality for creating custom components and composing streams manually. However, for most use cases, the pre-built components should be sufficient and are the recommended approach.
Stream Lifecycle Management
Streams in Linea follow a simple lifecycle model that helps manage resources and control execution:
-
Creation: When you create a stream using any of the composition helpers (like SourceThroughFlowToSink
), the stream is initialized but not yet running.
-
Execution: Streams start processing data when you call Run(ctx)
. This method:
- Starts all internal goroutines
- Begins data flow from source through flows to sink
- Returns a channel that will receive the final result
-
Termination: Streams can be terminated in several ways:
- Natural completion: when the source is exhausted
- Context cancellation:
cancel()
- Immediate shutdown:
stream.Cancel()
.
- Graceful shutdown:
stream.Drain()
-
Cleanup: When a stream terminates:
- All internal goroutines are properly terminated
- Channels are closed in the correct order
- Resources are released
- Resource cleanup can be awaited through
stream.AwaitDone()
Shutdown Options
Linea provides multiple ways to stop stream processing, and you can determine how the stream terminated by checking the Result:
func processWithShutdown(data []int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := compose.SourceThroughFlowToSink2(
sources.Slice(data),
flows.Map(slowOperation),
sinks.Slice[int](),
)
go func() {
time.Sleep(1 * time.Second)
stream.Cancel()
}()
go func() {
time.Sleep(1 * time.Second)
stream.Drain()
}()
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
result := <-stream.Run(ctx)
if !result.Ok {
} else {
processedItems := result.Value
}
}