
Research
Shai-Hulud Descends to Hades: Miasma Worm Campaign Spreads with New PyPI Wave
Socket found 37 malicious PyPI wheels that abuse Python startup hooks to launch a Bun-powered credential stealer tied to Mini Shai-Hulud/Miasma.
github.com/rulego/streamsql
Advanced tools
English| 简体中文
StreamSQL is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.
📖 Documentation | Similar to: Apache Flink
device.info.name) and array indexing (sensors[0].value) for accessing nested structured datago get github.com/rulego/streamsql
StreamSQL supports two main processing modes for different business scenarios:
Suitable for scenarios requiring real-time response and low latency, where each data record is processed and output immediately.
Typical Use Cases:
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
// Create StreamSQL instance
ssql := streamsql.New()
defer ssql.Stop()
// Non-aggregation SQL: Real-time data transformation and filtering
// Feature: Each input data is processed immediately, no need to wait for windows
rsql := `SELECT deviceId,
UPPER(deviceType) as device_type,
temperature * 1.8 + 32 as temp_fahrenheit,
CASE WHEN temperature > 30 THEN 'hot'
WHEN temperature < 15 THEN 'cold'
ELSE 'normal' END as temp_category,
CONCAT(location, '-', deviceId) as full_identifier,
NOW() as processed_time
FROM stream
WHERE temperature > 0 AND STARTSWITH(deviceId, 'sensor')`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// Handle real-time transformation results
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Real-time result: %+v\n", results)
})
// Simulate sensor data input
sensorData := []map[string]interface{}{
{
"deviceId": "sensor001",
"deviceType": "temperature",
"temperature": 25.0,
"location": "warehouse-A",
},
{
"deviceId": "sensor002",
"deviceType": "humidity",
"temperature": 32.5,
"location": "warehouse-B",
},
{
"deviceId": "pump001", // Will be filtered out
"deviceType": "actuator",
"temperature": 20.0,
"location": "factory",
},
}
// Process data one by one, each will output results immediately
for _, data := range sensorData {
ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
}
time.Sleep(500 * time.Millisecond) // Wait for processing completion
}
Suitable for scenarios requiring statistical analysis and batch processing, collecting data over a period of time for aggregated computation.
Typical Use Cases:
package main
import (
"context"
"fmt"
"time"
"math/rand"
"sync"
"github.com/rulego/streamsql"
)
// StreamSQL Usage Example
// This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling
func main() {
// Step 1: Create StreamSQL Instance
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
ssql := streamsql.New()
defer ssql.Stop()
// Step 2: Define Stream SQL Query Statement
// This SQL statement showcases StreamSQL's core capabilities:
// - SELECT: Choose output fields and aggregation functions
// - FROM stream: Specify the data source as stream data
// - WHERE: Filter condition, excluding device3 data
// - GROUP BY: Group by deviceId, combined with tumbling window for aggregation
// - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds
// - avg(), min(): Aggregation functions for calculating average and minimum values
// - window_start(), window_end(): Window functions to get window start and end times
rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
"window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
// Step 3: Execute SQL Statement and Start Stream Analysis Task
// The Execute method parses SQL, builds execution plan, initializes window manager and aggregators
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// Step 4: Setup Test Environment and Concurrency Control
var wg sync.WaitGroup
wg.Add(1)
// Set 30-second test timeout to prevent infinite running
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Step 5: Start Data Producer Goroutine
// Simulate real-time data stream, continuously feeding data into StreamSQL
go func() {
defer wg.Done()
// Create ticker to trigger data generation every second
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Generate 10 random test data points per second, simulating high-frequency data stream
// This data density tests StreamSQL's real-time processing capability
for i := 0; i < 10; i++ {
// Construct device data containing deviceId, temperature, and humidity
randomData := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2
"temperature": 20.0 + rand.Float64()*10, // Temperature range: 20-30 degrees
"humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70%
}
// Add data to stream, triggering StreamSQL's real-time processing
// Emit distributes data to corresponding windows and aggregators
ssql.Emit(randomData)
}
case <-ctx.Done():
// Timeout or cancellation signal, stop data generation
return
}
}
}()
// Step 6: Setup Result Processing Pipeline
resultChan := make(chan interface{})
// Add computation result callback function (Sink)
// When window triggers computation, results are output through this callback
ssql.AddSink(func(results []map[string]interface{}) {
resultChan <- results
})
// Step 7: Start Result Consumer Goroutine
// Count received results for effect verification
resultCount := 0
go func() {
for result := range resultChan {
// Print results when window computation is triggered (every 5 seconds)
// This demonstrates StreamSQL's window-based aggregation results
fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result)
resultCount++
}
}()
// Step 8: Wait for Processing Completion
// Wait for data producer goroutine to finish (30-second timeout or manual cancellation)
wg.Wait()
// Step 9: Display Final Statistics
// Show total number of window results received during the test period
fmt.Printf("\nTotal window results received: %d\n", resultCount)
fmt.Println("StreamSQL processing completed successfully!")
}
StreamSQL supports querying nested structured data using dot notation (.) syntax to access nested fields:
// Nested field access example
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// SQL query using nested fields - supports dot notation syntax for accessing nested structures
rsql := `SELECT device.info.name as device_name,
device.location,
AVG(sensor.temperature) as avg_temp,
COUNT(*) as sensor_count,
window_start() as start,
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// Handle aggregation results
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Aggregation result: %+v\n", results)
})
// Add nested structured data
nestedData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "temperature-sensor-001",
"type": "temperature",
},
"location": "smart-greenhouse-A",
},
"sensor": map[string]interface{}{
"temperature": 25.5,
"humidity": 60.2,
},
"timestamp": time.Now().Unix(),
}
ssql.Emit(nestedData)
}
StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. Documentation
StreamSQL supports two main processing modes:
Used when the SQL query contains aggregate functions (SUM, AVG, COUNT, etc.) or GROUP BY clauses. Data is collected in windows and aggregated results are output when windows are triggered.
Used for immediate data transformation and filtering without aggregation operations. Each input record is processed and output immediately, providing ultra-low latency for real-time scenarios like data cleaning, enrichment, and filtering.
Since stream data is unbounded, it cannot be processed as a whole. Windows provide a mechanism to divide unbounded data into a series of bounded data segments for computation. StreamSQL includes the following types of windows:
Sliding Window
Tumbling Window
Count Window
Session Window
StreamSQL supports two time concepts that determine how windows are divided and triggered:
event_time, timestamp, order_time, etc.).WITH (TIMESTAMP='field_name', TIMEUNIT='ms') to specify the event time fieldSELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
time.Now())WITH (TIMESTAMP=...) is not specifiedSELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- No WITH clause specified, defaults to processing time
| Feature | Event Time | Processing Time |
|---|---|---|
| Time Source | Timestamp field in data | System current time |
| Window Division | Based on event timestamp | Based on data arrival time |
| Late Data Handling | Supported (Watermark mechanism) | Not supported |
| Out-of-Order Handling | Supported (Watermark mechanism) | Not supported |
| Result Accuracy | Accurate | May be inaccurate |
| Processing Latency | Higher (need to wait for late data) | Low (real-time trigger) |
| Configuration | WITH (TIMESTAMP='field') | Default (no WITH clause) |
| Use Cases | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |
Window Start Time
TumblingWindow('5m'), the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).Window End Time
watermark >= window_end.Watermark = max(event_time) - MaxOutOfOrdernesswatermark >= window_endMAXOUTOFORDERNESS: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)ALLOWEDLATENESS: Time window can accept late data after triggering (default: 0, no late data accepted)IDLETIMEOUT: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
)
Pull requests and issues are welcome. Please ensure that the code conforms to Go standards and include relevant test cases.
Apache License 2.0
FAQs
Unknown package
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
Socket found 37 malicious PyPI wheels that abuse Python startup hooks to launch a Bun-powered credential stealer tied to Mini Shai-Hulud/Miasma.

Security News
RubyGems and Bundler 4.0.13 introduced an opt-in cooldown feature that delays newly published gems during dependency resolution.

Security News
pnpm 11.5 now recognizes npm staged publish approvals in release metadata, preventing those releases from being mistaken for lower-trust package publishes.