
Product
Introducing Socket Firewall Enterprise: Flexible, Configurable Protection for Modern Package Ecosystems
Socket Firewall Enterprise is now available with flexible deployment, configurable policies, and expanded language support.
@things-factory/integration-base
Advanced tools
A comprehensive integration framework for the Things Factory ecosystem that enables seamless connectivity with external systems and services through scenario-driven automation.
The Integration Base module provides a robust, scenario-driven automation engine that enables complex integration workflows through a visual, step-based approach. It serves as the central nervous system for enterprise automation within the Things Factory framework.
npm install @things-factory/integration-base
import { ScenarioEngine } from '@things-factory/integration-base'
// Create a simple HTTP integration scenario
const scenario = {
name: 'fetch-user-data',
description: 'Fetch user data from external API',
steps: [
{
name: 'fetch-users',
task: 'http-get',
connection: 'api-server',
params: {
path: '/api/users',
headers: { 'Authorization': 'Bearer ${token}' }
}
},
{
name: 'store-data',
task: 'database-query',
connection: 'main-db',
params: {
query: 'INSERT INTO users (id, name, email) VALUES (?, ?, ?)',
bindings: '${fetch-users.data}'
}
}
]
}
// Database connection
const dbConnection = {
name: 'main-db',
type: 'postgresql-connector',
endpoint: 'postgresql://localhost:5432/mydb',
params: {
username: 'dbuser',
password: 'dbpass',
database: 'myapp'
}
}
// HTTP API connection
const apiConnection = {
name: 'api-server',
type: 'http-connector',
endpoint: 'https://api.example.com',
params: {
timeout: 30000,
rejectUnauthorized: true
}
}
postgresql-connector)mysql-connector)mssql-connector)oracle-connector)sqlite-connector)http-connector)graphql-connector) - Apollo Client with OAuth2 supportmqtt-connector) - Async message pub/subsocket-server) - Raw TCP socket communicationheadless-connector) - Puppeteer-based web automationpyrun-connector) - Python script executionoperato-connector) - Things Factory cross-instanceecho-back-connector) - Testing and debuggingproxy-connector) - Edge appliance delegationdatabase-query - Execute SQL queries with parameter bindinggraphql-query / graphql-mutate - GraphQL operationsdata-mapper - Transform data structuresjsonata - JSONata transformationscsv-readline - CSV file processinghttp-get / http-post / http-put / http-patch / http-deleteheadless-get / headless-post / headless-put / headless-patch / headless-deletemqtt-publish / mqtt-subscribesocket-listener / publishgoto / end - Flow controlswitch-goto / switch-range-goto - Conditional branchingsub-scenario - Nested scenario executionempty-check - Validation tasksvariables - Variable managementheadless-scrap - Web scrapingpyrun-execute - Python script executionsleep / random - Utility functionsscript - Custom JavaScript execution# Database connection
DB_HOST=localhost
DB_PORT=5432
DB_NAME=integration_db
DB_USERNAME=user
DB_PASSWORD=password
# Headless browser pool
HEADLESS_POOL_MIN=2
HEADLESS_POOL_MAX=10
# Logging
DEBUG=things-factory:integration-base:*
LOG_LEVEL=info
// Domain-specific environment variables
const domainConfig = {
domain: 'tenant1.example.com',
envVars: [
{ name: 'Connection::api-server::apiKey', value: 'secret-key-123' },
{ name: 'Connection::database::password', value: 'encrypted-password' }
]
}
type Scenario {
id: String!
name: String!
description: String
active: Boolean!
schedule: String
timezone: String
steps: [Step!]!
}
type Connection {
id: String!
name: String!
type: String!
endpoint: String!
active: Boolean!
state: ConnectionStatus!
}
type ScenarioInstance {
id: String!
scenario: Scenario!
status: ScenarioStatus!
startedAt: String
finishedAt: String
result: JSON
}
GET /api/unstable/scenarios # List scenarios
POST /api/unstable/scenarios # Create scenario
PUT /api/unstable/scenarios/:id # Update scenario
DELETE /api/unstable/scenarios/:id # Delete scenario
GET /api/unstable/scenario-instances # List instances
POST /api/unstable/run-scenario # Execute scenario
POST /api/unstable/start-scenario # Start async scenario
POST /api/unstable/stop-scenario # Stop running scenario
const databaseScenario = {
name: 'sync-customer-data',
steps: [
{
name: 'fetch-customers',
task: 'database-query',
connection: 'source-db',
params: {
query: 'SELECT * FROM customers WHERE updated_at > ?',
bindings: ['${lastSync}']
}
},
{
name: 'transform-data',
task: 'data-mapper',
params: {
mapping: {
customer_id: 'id',
full_name: 'name',
email_address: 'email'
}
}
},
{
name: 'insert-customers',
task: 'database-query',
connection: 'target-db',
params: {
query: 'INSERT INTO customers (id, name, email) VALUES ${fetch-customers.data}',
batchSize: 1000
}
}
]
}
const scrapingScenario = {
name: 'scrape-product-prices',
steps: [
{
name: 'login-to-site',
task: 'headless-post',
connection: 'ecommerce-site',
params: {
path: '/login',
contentType: 'application/json',
accessor: 'credentials'
}
},
{
name: 'scrape-products',
task: 'headless-scrap',
connection: 'ecommerce-site',
params: {
path: '/products',
selectors: [
{ text: 'productName', value: '.product-title' },
{ text: 'price', value: '.price-display' },
{ text: 'availability', value: '.stock-status' }
],
waitForSelectors: '.product-list',
waitForTimeout: 5000
}
},
{
name: 'save-results',
task: 'database-query',
connection: 'analytics-db',
params: {
query: 'INSERT INTO product_prices (name, price, availability, scraped_at) VALUES ${scrape-products.data}'
}
}
]
}
const mqttScenario = {
name: 'iot-data-processing',
steps: [
{
name: 'subscribe-sensors',
task: 'mqtt-subscribe',
connection: 'iot-broker',
params: {
topic: 'sensors/+/temperature',
qos: 1
}
},
{
name: 'validate-data',
task: 'empty-check',
params: {
accessor: 'temperature',
throwOnEmpty: true
}
},
{
name: 'store-reading',
task: 'database-query',
connection: 'timeseries-db',
params: {
query: 'INSERT INTO sensor_readings (sensor_id, temperature, timestamp) VALUES (?, ?, NOW())',
bindings: ['${deviceId}', '${temperature}']
}
},
{
name: 'alert-if-critical',
task: 'switch-range-goto',
params: {
value: '${temperature}',
cases: [
{ min: 80, max: 999, goto: 'send-alert' }
]
}
},
{
name: 'send-alert',
task: 'http-post',
connection: 'alert-service',
params: {
path: '/alerts',
contentType: 'application/json',
accessor: 'alertData'
}
}
]
}
Configure automated scenario execution using cron expressions:
const scheduledScenario = {
name: 'daily-data-sync',
description: 'Sync data every day at 2 AM',
schedule: '0 2 * * *', // Cron expression
timezone: 'Asia/Seoul',
active: true,
steps: [/* scenario steps */]
}
# Enable debug logging
DEBUG=things-factory:integration-base:* npm start
# Specific component logging
DEBUG=things-factory:integration-base:scenario-engine npm start
DEBUG=things-factory:integration-base:connector:* npm start
# Subscribe to scenario status updates
subscription {
scenarioInstanceStatusUpdated {
id
status
progress
result
error
}
}
# Subscribe to connection status
subscription {
connectionStatusUpdated {
name
state
lastConnected
}
}
# Clone the repository
git clone https://github.com/hatiolab/things-factory.git
cd things-factory/packages/integration-base
# Install dependencies
npm install
# Build the module
npm run build
# Run tests
npm test
import { Connector } from '@things-factory/integration-base'
export class CustomConnector implements Connector {
async connect(connection) {
// Implementation
}
async disconnect(connection) {
// Cleanup
}
get parameterSpec() {
return [
{ type: 'string', name: 'apiKey', label: 'API Key' },
{ type: 'number', name: 'timeout', label: 'Timeout (ms)' }
]
}
get taskPrefixes() {
return ['custom']
}
}
import { TaskRegistry } from '@things-factory/integration-base'
async function CustomTask(step, context) {
const { params } = step
const { logger, data, domain } = context
// Task implementation
return { data: result }
}
CustomTask.parameterSpec = [
{ type: 'string', name: 'parameter1', label: 'Parameter 1' }
]
TaskRegistry.registerTaskHandler('custom-task', CustomTask)
MIT © Hatiolab
Note: This module is part of the Things Factory ecosystem. For complete setup and configuration, refer to the main Things Factory documentation.
FAQs
Module for managing integration scenario.
We found that @things-factory/integration-base demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 11 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket Firewall Enterprise is now available with flexible deployment, configurable policies, and expanded language support.

Security News
Open source dashboard CNAPulse tracks CVE Numbering Authorities’ publishing activity, highlighting trends and transparency across the CVE ecosystem.

Product
Detect malware, unsafe data flows, and license issues in GitHub Actions with Socket’s new workflow scanning support.