New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

bb-core

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bb-core

Core package for the brickblock library.

  • 0.3.12
  • PyPI
  • Socket score

Maintainers
1

Example Pipeline with brickblock [bb-core]

Overview

This project demonstrates how to build a flexible data processing pipeline using the bb-core library. The pipeline is designed to process data through a series of functions or modules, supporting both synchronous and asynchronous execution, with real-time updates via server-sent events (SSE).

Installation

  1. Install the repository:

    pip install bb-core
    
  2. Set up a virtual environment (optional but recommended):

    python -m venv venv
    

    Activate the virtual environment:

    • Windows:
      .env\Scriptsctivate
      
    • macOS/Linux:
      source venv/bin/activate
      

Project Structure

  • Pipeline Class: Manages the flow of data through a series of functions.
  • BaseModule Class: Abstract base class that all modules should inherit from. Modules must implement the following methods:
    • run(): The main processing function.
    • onProgressStartMessage(): Sends a progress start message.
    • onProgressEndMessage(): Sends a progress end message.
  • SSE Generator: Asynchronous generator that sends real-time updates to clients.

Example Usage

from your_module import Pipeline, BModule, InputModel2

# Initialize the pipeline
pipeline = Pipeline.init(name="example_pipeline", sse=True)

# Add a module to the pipeline
bmodule = BModule()
pipeline.modules([bmodule])

# Input data for the pipeline
input_data = {"c": 5.0}

# Asynchronously run the pipeline and get SSE events
async def test_sse():
    async for event in pipeline.sse_generator(InputModel2(**input_data)):
        print(event)

# Run the asynchronous SSE generator
asyncio.run(test_sse())

This project is licensed under the MIT License.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc