šŸš€ Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more →
Socket
Sign inDemoInstall
Socket

github.com/svenvdam/linea

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/svenvdam/linea

v0.3.0
Source
Go
Version published
Created
Source

Linea

GoDoc GitHub Tag codecov GitHub Actions Workflow Status Go Report Card GitHub License

Linea is a composable stream processing library for Go.

It allows you to build data processing pipelines by connecting three main components:

  • Sources: Produce data items (e.g., from slices, channels)
  • Flows: Transform data items (e.g., map, filter, batch)
  • Sinks: Consume and reduce data items to final results

Key features:

  • Type-safe: Uses Go generics for compile-time type checking
  • Composable: Components can be chained together flexibly
  • Concurrent: Built on Go channels and goroutines
  • Cancellable: Supports context-based cancellation
  • Parallel: Offers parallel processing flows (MapPar, FlatMapPar)

Quickstart

package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/svenvdam/linea/compose"
	"github.com/svenvdam/linea/flows"
	"github.com/svenvdam/linea/sinks"
	"github.com/svenvdam/linea/sources"
)

func main() {
	ctx := context.Background()
	stream := compose.SourceThroughFlowToSink2(
		sources.Slice([]int{1, 2, 3, 4, 5}), // source from slice
		flows.Filter(func(i int) bool { return i%2 == 0 }), // filter even numbers
		flows.Map(func(i int) string { return strconv.Itoa(i) }), // map to string
		sinks.Slice[string](), // sink to slice
	)

	result := <-stream.Run(ctx) // run the stream, blocks until the stream is done

	fmt.Println(result.Value) // prints [2 4]
}

Installation

go get github.com/svenvdam/linea

Creating Streams

Linea provides several ways to create and compose streams. The recommended approach is to use the pre-built components in the sources, flows, and sinks packages:

// Create a stream using SourceThroughFlowToSink helpers
stream1 := compose.SourceThroughFlowToSink(
    sources.Slice([]int{1, 2, 3}),    // Source
    flows.Map(func(i int) int { return i * 2 }), // Flow
    sinks.Slice[int](),               // Sink
)

// Chain multiple flows using SourceThroughFlowToSink2, 3, etc.
stream2 := compose.SourceThroughFlowToSink2(
    sources.Slice([]string{"a", "b"}),
    flows.Map(strings.ToUpper),
    flows.Filter(func(s string) bool { return s != "B" }),
    sinks.Slice[string](),
)

The core package provides more advanced functionality for creating custom components and composing streams manually. However, for most use cases, the pre-built components should be sufficient and are the recommended approach.

Stream Lifecycle Management

Streams in Linea follow a simple lifecycle model that helps manage resources and control execution:

  • Creation: When you create a stream using any of the composition helpers (like SourceThroughFlowToSink), the stream is initialized but not yet running.

  • Execution: Streams start processing data when you call Run(ctx). This method:

    • Starts all internal goroutines
    • Begins data flow from source through flows to sink
    • Returns a channel that will receive the final result
  • Termination: Streams can be terminated in several ways:

    • Natural completion: when the source is exhausted
    • Context cancellation: cancel()
    • Immediate shutdown: stream.Cancel().
    • Graceful shutdown: stream.Drain()
  • Cleanup: When a stream terminates:

    • All internal goroutines are properly terminated
    • Channels are closed in the correct order
    • Resources are released
    • Resource cleanup can be awaited through stream.AwaitDone()

Shutdown Options

Linea provides multiple ways to stop stream processing, and you can determine how the stream terminated by checking the Result:

func processWithShutdown(data []int) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    stream := compose.SourceThroughFlowToSink2(
        sources.Slice(data),
        flows.Map(slowOperation),
        sinks.Slice[int](),
    )

    // Option 1: Immediate shutdown - stops processing immediately
    go func() {
        time.Sleep(1 * time.Second)
        stream.Cancel() // Cancels immediately, may leave items unprocessed. Blocks until all resources have been cleaned up.
    }()

    // Option 2: Graceful shutdown - processes remaining items
    go func() {
        time.Sleep(1 * time.Second)
        stream.Drain() // Stops accepting new items but processes existing ones
    }()

    // Option 3: Context cancellation - similar to Cancel()
    go func() {
        time.Sleep(1 * time.Second)
        cancel() // Cancels via context
    }()

    result := <-stream.Run(ctx)

    if !result.Ok {
        // Stream was cancelled or encountered an error
        // Items may not have been fully processed
    } else {
        // Stream completed successfully
        // All items were processed
        processedItems := result.Value
    }
}

FAQs

Package last updated on 19 Apr 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts