go-floc
Floc: Orchestrate goroutines with ease.
The goal of the project is to make the process of running goroutines in parallel
and synchronizing them easy.
Announcements
The version v2
is released on the 1st of December, 2017 and that is the main maintained branch.
Branch master
became v1_dev
. Please consider that change could broke your repository clones of master
branch.
Installation and requirements
The package requires Go v1.8
To install the package use go get gopkg.in/workanator/go-floc.v1
Documentation and examples
Please refer Godoc reference
of the package for more details.
Some examples are available at the Godoc reference. Additional examples can
be found in go-floc-showcase.
Features
- Easy to use functional interface.
- Simple parallelism and synchronization of jobs.
- As little overhead as possible, in comparison to direct use of goroutines
and sync primitives.
- Provide better control over execution with one entry point and one exit
point. That is achieved by allowing any job finish execution with
Cancel
or
Complete
.
Introduction
Floc introduces some terms which are widely used through the package.
Flow
Flow is the overall process which can be controlled through floc.Flow
. Flow
can be canceled or completed with any arbitrary data at any point of execution.
Flow has only one enter point and only one exit point.
job := run.Sequence(do, something, here, ...)
floc.Run(flow, state, update, job)
result, data := flow.Result()
State
State is an arbitrary data shared across all jobs in flow. Since floc.State
contains shared data it provides methods which return data alongside with
read-only and/or read/write lockers. Returned lockers are not locked and
the caller is responsible for obtaining and releasing locks.
data, locker := state.DataWithReadLocker()
locker.Lock()
container := data.(*MyContainer)
name := container.Name
date := container.Date
locker.Unlock()
data, locker := state.DataWithWriteLocker()
locker.Lock()
container := data.(*MyContainer)
container.Counter = container.Counter + 1
locker.Unlock()
Floc does not restrict to use state locking methods, safe data read-write
operations can be done using for example sync/atomic
. As well Floc does
not restrict to have data in state. State can contain say channels for
communication between jobs.
type ChunkStream chan []byte
func WriteToDisk(flow floc.Flow, state floc.State, update floc.Update) {
stream := state.Data().(ChunkStream)
file, _ := os.Create("/tmp/file")
defer file.Close()
for {
select {
case <-flow.Done():
break
case chunk := <-stream:
file.Write(chunk)
}
}
}
Update
Update is a function of prototype floc.Update
which is responsible for
updating state. To identify what piece of state should be updated key
is used
while value
contains the data which should be written. It's up to the
implementation how to interpret key
and value
.
type Dictionary map[string]interface{}
func UpdateMap(flow floc.Flow, state floc.State, key string, value interface{}) {
data, locker := state.DataWithWriteLocker()
locker.Lock();
defer locker.Unlock()
m := data.(Dictionary)
m[key] = value
}
Job
Job in Floc is a smallest piece of flow. The prototype of job function is
floc.Job
. Each job has access to floc.State
and floc.Update
, so it can
read/write state data, and to floc.Flow
, what allows finish flow with
Cancel()
or Complete()
.
Cancel()
and Complete()
methods of floc.Flow
has permanent effect. So once
finished flow cannot be canceled or completed anymore.
func ValidateContentLength(flow floc.Flow, state floc.State, update floc.Update) {
request := state.Data().(http.Request)
if request.ContentLength > MaxContentLength {
flow.Cancel(errors.New("content is too big"))
}
}
Example
Lets have some fun and write a simple example which calculates some statistics
on text given. The example designed so it does not require locking because each
part of the Statistics
struct is accessible only by one job at a moment.
const Text = `Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed
do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim
veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum
dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident,
sunt in culpa qui officia deserunt mollit anim id est laborum.`
var sanitizeWordRe = regexp.MustCompile(`\W`)
type Statistics struct {
Words []string
Characters int
Occurrence map[string]int
}
SplitToWords := func(flow floc.Flow, state floc.State, update floc.Update) {
statistics := state.Data().(*Statistics)
statistics.Words = strings.Split(Text, " ")
for i, word := range statistics.Words {
statistics.Words[i] = sanitizeWordRe.ReplaceAllString(word, "")
}
}
CountCharacters := func(flow floc.Flow, state floc.State, update floc.Update) {
statistics := state.Data().(*Statistics)
for _, word := range statistics.Words {
statistics.Characters += len(word)
}
}
CountUniqueWords := func(flow floc.Flow, state floc.State, update floc.Update) {
statistics := state.Data().(*Statistics)
statistics.Occurrence = make(map[string]int)
for _, word := range statistics.Words {
statistics.Occurrence[word] = statistics.Occurrence[word] + 1
}
}
PrintResult := func(flow floc.Flow, state floc.State, update floc.Update) {
statistics := state.Data().(*Statistics)
fmt.Printf("Words Total : %d\n", len(statistics.Words))
fmt.Printf("Unique Word Count : %d\n", len(statistics.Occurrence))
fmt.Printf("Character Count : %d\n", statistics.Characters)
}
job := run.Sequence(
SplitToWords,
run.Parallel(
CountCharacters,
CountUniqueWords,
),
PrintResult,
)
floc.Run(
floc.NewFlow(),
floc.NewState(new(Statistics)),
nil,
job,
)
Contributing
Please found information about contributing in CONTRIBUTING.md.