Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

github.com/amlun/linda

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/amlun/linda

  • v0.0.0-20190604041258-330980f26cfd
  • Source
  • Go
  • Socket score

Version published
Created
Source

Linda

Build Status GoDoc GoReport License

Linda is a background manager to poll jobs from broker and dispatch them to multi workers.

Linda Broker provides a unified API across different broker (queue) services.

Linda Saver provides a unified API across different saver (db) services.

Brokers allow you to defer the processing of a time consuming task.

When job done, use Release func to release the job with a delay (seconds), you can implement a cron job service.

The real period is job.Period + Interval

Inspiration comes from beanstalkd and goworker

Installation

To install Linda, use

go get github.com/amlun/linda

to get the main package, and then use glide

glide install

to install the dependencies

Getting Started

Terminology

  • Broker

message transport [MQ]

  • Saver

job info storage [Database]

  • poller

poll job from the broker and send to local job channels

poller also migrate the expire jobs

  • worker

worker is the main process to work the job

Worker Type

type workerFunc func(...interface{}) error

Register Worker

linda.RegisterWorkers("MyClass", myFunc)

Broker Interface

type Broker interface {
	Connect(url *neturl.URL) error
	Close() error
	MigrateExpiredJobs(queue string)
	Reserve(queue string, timeout int64) (string, error)
	Delete(queue, id string) error
	Release(queue, id string, delay int64) error
	Push(queue, id string) error
	Later(queue, id string, delay int64) error
}

Saver Interface

type Saver interface {
	Connect(url *neturl.URL) error
	Close() error
	Put(job *Job) error
	Get(id string) (*Job, error)
}

Examples

Add jobs to saver and push them to broker

go run example/push_jobs/main.go

example/push_jobs/main.go

package main

import (
	"github.com/amlun/linda"
	"github.com/sirupsen/logrus"
	"time"
)

func main() {
	var err error
	var b linda.Broker
	var s linda.Saver
	// broker
	if b, err = linda.NewBroker("redis://localhost:6379/"); err != nil {
		logrus.Error(err)
		return
	}
	// saver
	if s, err = linda.NewSaver("redis://localhost:6379/"); err != nil {
		logrus.Error(err)
		return
	}
	// job
	var jobID = "1"
	var queue = "test"
	var job = &linda.Job{
		ID:        jobID,
		Queue:     queue,
		Period:    60,
		Retry:     3,
		CreatedAt: time.Now(),
		Payload: linda.Payload{
			Class: "printArgs",
			Args:  []interface{}{"a", "b", "c"},
		},
	}
	// save job
	if err = s.Put(job); err != nil {
		logrus.Error(err)
		return
	}
	// push to broker
	if err = b.Push(queue, jobID); err != nil {
		logrus.Error(err)
		return
	}
}

Worker run to consume the job

go run example/print_args/main.go

example/print_args/main.go

package main

import (
	"fmt"
	"github.com/amlun/linda"
	"github.com/sirupsen/logrus"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func init() {
	linda.RegisterWorkers("printArgs", PrintArgs)
}

func main() {
	logrus.SetLevel(logrus.DebugLevel)
	// broker
	b, _ := linda.NewBroker("redis://localhost:6379/")
	// saver
	s, _ := linda.NewSaver("redis://localhost:6379/")
	// config
	c := linda.Config{
		Queue:     "test",
		Timeout:   60,
		Interval:  time.Second,
		WorkerNum: 4,
	}
	quit := signals()
	linda.Init(c, b, s)
	go func() {
		defer func() {
			linda.Quit()
		}()
		<-quit
	}()

	if err := linda.Run(); err != nil {
		fmt.Println("Error:", err)
	}
}

func PrintArgs(args ...interface{}) error {
	fmt.Println(args)
	return nil
}

// Signal Handling
func signals() <-chan bool {
	quit := make(chan bool)
	go func() {
		signals := make(chan os.Signal)
		defer close(signals)
		signal.Notify(signals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)
		defer signalStop(signals)
		<-signals
		quit <- true
	}()
	return quit
}

// Stops signals channel.
func signalStop(c chan<- os.Signal) {
	signal.Stop(c)
}

Features

Broker List

  • Redis
  • beanstalkd
  • RabbitMQ

Design

System Design

system-design

Job State

   later                                release
  ----------------> [DELAYED] <------------.
                        |                   |
                   kick | (time passes)     |
                        |                   |
   push                 v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                        ^                   |
                         \                  |
                          `-----------------'
                           kick (time out)
 

Thanks

FAQs

Package last updated on 04 Jun 2019

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc