šŸš€ Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more →
Socket
DemoInstallSign in
Socket

gigq

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

gigq

A lightweight job queue system with SQLite backend and zero dependencies

0.2.0
PyPI
Maintainers
1

GigQ

Lightweight SQLite Job Queue

PyPI Python Versions License Build Status

GigQ

GigQ is a lightweight job queue system with SQLite as its backend. It provides a reliable way to manage and execute small jobs ("gigs") locally with atomicity guarantees, particularly suited for processing tasks like data transformations, API calls, or batch operations.

Features

  • Zero External Dependencies

    • No external packages required
    • Uses Python's built-in sqlite3 module
    • Everything needed is bundled with GigQ - no dependency conflicts to worry about
  • Simple Job Definition & Management

    • Define small jobs with parameters, priority, and basic dependencies
    • Organize jobs into simple workflows
    • Enable job cancellation and status checking
  • SQLite State Storage

    • Maintain job states in SQLite (pending, running, completed, failed)
    • Use transactions to ensure state consistency
    • Simple, efficient schema design optimized for local usage
    • Handle SQLite locking appropriately for local concurrency
  • Lightweight Concurrency

    • Prevent duplicate job execution using SQLite locking mechanisms
    • Support a modest number of workers processing jobs simultaneously
    • Implement transaction-based state transitions
    • Handle worker crashes and job recovery
  • Basic Recovery

    • Configurable retry for failed jobs with backoff
    • Timeout detection for hung jobs
    • Simple but effective error logging
  • CLI Interface

    • Submit and monitor jobs
    • View job queue and history
    • Simple worker management commands

Project Structure

The GigQ library is organized as follows:

gigq/
ā”œā”€ā”€ docs/                        # Documentation
│   ā”œā”€ā”€ advanced/               # Advanced topics
│   ā”œā”€ā”€ api/                    # API reference
│   ā”œā”€ā”€ examples/               # Documentation examples
│   ā”œā”€ā”€ getting-started/        # Getting started guides
│   └── user-guide/             # User guides
│
ā”œā”€ā”€ examples/                    # Example applications
│   ā”œā”€ā”€ __init__.py
│   └── github_archive.py       # GitHub Archive processing example
│
ā”œā”€ā”€ gigq/                        # Main package code
│   ā”œā”€ā”€ __init__.py             # Package initialization and exports
│   ā”œā”€ā”€ job.py                  # Job class implementation
│   ā”œā”€ā”€ job_status.py           # JobStatus enum implementation
│   ā”œā”€ā”€ job_queue.py            # JobQueue class implementation
│   ā”œā”€ā”€ worker.py               # Worker class implementation
│   ā”œā”€ā”€ workflow.py             # Workflow class implementation
│   ā”œā”€ā”€ utils.py                # Utility functions
│   ā”œā”€ā”€ cli.py                  # Command-line interface
│   └── table_formatter.py      # Table formatting utilities
│
ā”œā”€ā”€ tests/                       # Test directory
│   ā”œā”€ā”€ __init__.py             # Test package initialization
│   ā”œā”€ā”€ README.md               # Test documentation
│   ā”œā”€ā”€ job_functions.py        # Shared test functions
│   │
│   ā”œā”€ā”€ unit/                   # Unit tests
│   │   ā”œā”€ā”€ __init__.py
│   │   ā”œā”€ā”€ run_all.py          # Run all unit tests
│   │   ā”œā”€ā”€ test_cli.py         # CLI unit tests
│   │   ā”œā”€ā”€ test_cli_formatter.py  # CLI formatter tests
│   │   ā”œā”€ā”€ test_job.py         # Job class tests
│   │   ā”œā”€ā”€ test_job_queue.py   # JobQueue class tests
│   │   ā”œā”€ā”€ test_table_formatter.py  # Table formatter tests
│   │   ā”œā”€ā”€ test_worker.py      # Worker class tests
│   │   ā”œā”€ā”€ test_workflow.py    # Workflow class tests
│   │   └── test_refactoring.py # Tests for refactored modules
│   │
│   └── integration/            # Integration tests
│       ā”œā”€ā”€ __init__.py
│       ā”œā”€ā”€ base.py             # Base class for integration tests
│       ā”œā”€ā”€ run_all.py          # Run all integration tests
│       ā”œā”€ā”€ test_basic.py       # Basic job processing tests
│       ā”œā”€ā”€ test_basic_workflow.py  # Simple workflow tests
│       ā”œā”€ā”€ test_cli.py         # CLI integration tests
│       ā”œā”€ā”€ test_concurrent_workers.py  # Multiple workers tests
│       ā”œā”€ā”€ test_error_handling.py  # Error handling tests
│       ā”œā”€ā”€ test_persistence.py  # Persistence tests
│       ā”œā”€ā”€ test_timeout_handling.py  # Timeout handling tests
│       └── test_workflow_dependencies.py  # Workflow dependencies tests
│
ā”œā”€ā”€ .github/                     # GitHub configuration
│   └── workflows/               # GitHub Actions workflows
│       ā”œā”€ā”€ ci.yml              # Continuous integration workflow
│       └── docs.yml            # Documentation deployment workflow
│
ā”œā”€ā”€ LICENSE                      # MIT License
ā”œā”€ā”€ README.md                    # Project readme
ā”œā”€ā”€ README_REFACTORING.md        # Refactoring documentation
ā”œā”€ā”€ REFACTORING_SUMMARY.md       # Summary of refactoring changes
ā”œā”€ā”€ update_test_imports.py       # Script to update test imports
ā”œā”€ā”€ test_refactoring.py          # Script to test refactored modules
ā”œā”€ā”€ pyproject.toml               # Project configuration
ā”œā”€ā”€ setup.py                     # Package setup script
└── py.typed                     # Type hint marker

Installation

Basic Installation

Install GigQ from PyPI:

pip install gigq

This installs the core package with minimal dependencies.

Development Installation

For contributors and developers:

  • Clone the repository:

    git clone https://github.com/kpouianou/gigq.git
    cd gigq
    
  • Install in development mode with all dependencies:

    # Install core package in development mode
    pip install -e .
    
    # For running examples
    pip install -e ".[examples]"
    
    # For building documentation
    pip install -e ".[docs]"
    
    # For development (linting, testing)
    pip install -e ".[dev]"
    
    # Or install everything at once
    pip install -e ".[examples,docs,dev]"
    

Dependencies

  • Build dependencies: setuptools (>=42) and wheel
  • Core dependencies: Python 3.9+ and tabulate
  • Examples: Additional dependencies for running examples include pandas, requests, and schedule
  • Documentation: MkDocs and related plugins for building the documentation (mkdocs-material, pymdown-extensions, mkdocstrings[python], etc.)
  • Development: Testing and code quality tools (pytest, flake8, coverage, mypy, etc.)

Note: If you're only interested in using the CLI or basic functionality, the standard installation is sufficient.

Quick Start

Define and Submit a Job

from gigq import Job, JobQueue, Worker

# Define a job function
def process_data(filename, threshold=0.5):
    # Process some data
    print(f"Processing {filename} with threshold {threshold}")
    return {"processed": True, "count": 42}

# Define a job
job = Job(
    name="process_data_job",
    function=process_data,
    params={"filename": "data.csv", "threshold": 0.7},
    max_attempts=3,
    timeout=300
)

# Create or connect to a job queue
queue = JobQueue("jobs.db")
job_id = queue.submit(job)

print(f"Submitted job with ID: {job_id}")

Start a Worker

# Start a worker
worker = Worker("jobs.db")
worker.start()  # This blocks until the worker is stopped

Or use the CLI:

# Start a worker
gigq --db jobs.db worker

# Process just one job
gigq --db jobs.db worker --once

Check Job Status

# Check job status
status = queue.get_status(job_id)
print(f"Job status: {status['status']}")

Or use the CLI:

gigq --db jobs.db status your-job-id

Creating Workflows

GigQ allows you to create workflows of dependent jobs:

from gigq import Workflow

# Create a workflow
workflow = Workflow("data_processing")

# Add jobs with dependencies
job1 = Job(name="download", function=download_data, params={"url": "https://example.com/data.csv"})
job2 = Job(name="process", function=process_data, params={"filename": "data.csv"})
job3 = Job(name="analyze", function=analyze_data, params={"processed_file": "processed.csv"})

# Add jobs to workflow with dependencies
workflow.add_job(job1)
workflow.add_job(job2, depends_on=[job1])
workflow.add_job(job3, depends_on=[job2])

# Submit all jobs in the workflow
job_ids = workflow.submit_all(queue)

CLI Usage

GigQ comes with a command-line interface for common operations:

# Submit a job
gigq submit my_module.my_function --name "My Job" --param "filename=data.csv" --param "threshold=0.7"

# List jobs
gigq list
gigq list --status pending

# Check job status
gigq status your-job-id --show-result

# Cancel a job
gigq cancel your-job-id

# Requeue a failed job
gigq requeue your-job-id

# Start a worker
gigq worker

# Clear completed jobs
gigq clear
gigq clear --before 7  # Clear jobs completed more than 7 days ago

Example: GitHub Archive Processing

See the examples/github_archive.py script for a complete example of using GigQ to process GitHub Archive data.

Technical Details

SQLite Schema

GigQ uses a simple SQLite schema with two main tables:

  • jobs - Stores job definitions and current state
  • job_executions - Tracks individual execution attempts

The schema is designed for simplicity and efficiency with appropriate indexes for common operations.

Concurrency Handling

GigQ uses SQLite's built-in locking mechanisms to ensure safety when multiple workers are running. Each worker claims jobs using an exclusive transaction, preventing duplicate execution.

Error Handling

Failed jobs can be automatically retried up to a configurable number of times. Detailed error information is stored in the database for debugging. Jobs that exceed their timeout are automatically detected and marked as failed or requeued.

Development and Contribution

For local development:

  • Clone the repository
  • Create a virtual environment
  • Install build dependencies: pip install setuptools wheel
  • Install in development mode: pip install -e .
  • Run tests: python -m unittest discover tests

License

This project is licensed under the MIT License - see the LICENSE file for details.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts