Socket
Book a DemoInstallSign in
Socket

github.com/andy2046/pool

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/andy2046/pool

Source
Go
Version
v1.3.0
Version published
Created
Source

pool

makes concurrency easier

worker pool with job queue

This library is inspired by Golang blog post Go Concurrency Patterns: Pipelines and cancellation

pool

the challenge

While working on a piece of microservice, the goal was to be able to handle a large amount of requests from thousands of endpoints.

The initial implementation started with native Go routines, one Go routine per request, but quickly this was proved to not work very well at a large scale. There is no way to control how many Go routines are spawned. And with the number of requests increasing, it OOMed and crashed.

The second iteration was to create a buffered channel where the requests queueed up and then processed by handler, and since the maximum number of requests in the queue is fixed, there is no more OOM.

The better solution is to utilize the fan-out pattern from the blog, to create a 2-tier channel system, one for queuing requests and another to control how many workers operate on the queue concurrently.

how it works

When the pool is instantiated, it creates a request queue with size JobQueueBufferSize and InitPoolNum dispatchers with WorkerNum workers per dispatcher, each dispatcher reads from the same queue until it is closed, then distribute requests amongst the workers to parallelize.

flow

Install

go get github.com/andy2046/pool

Usage

func main() {
	done := make(chan struct{})
	mu := &sync.RWMutex{}
	data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	sum := 0
	jobHandlerGenerator := func() pool.JobHandler {
		return func(j pool.Job) error {
			mu.Lock()
			defer mu.Unlock()
			sum += j.Data.(int)
			return nil
		}
	}
	size := 2
	opt := func(c *pool.Config) error {
		c.InitPoolNum = size
		c.WorkerNum = 5
		return nil
	}

	p := pool.New(done, jobHandlerGenerator, opt)
	p.Start()

	for i := range data {
		p.JobQueue <- pool.Job{
			Data: data[i],
		}
	}

	close(done)

	// wait for jobs to finish
	for {
		time.Sleep(1 * time.Second)
		if p.Closed() {
			break
		}
	}
	mu.RLock()
	fmt.Println(sum)
	// Output: 55
	mu.RUnlock()
}

FAQs

Package last updated on 24 Oct 2018

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts