Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

github.com/zhaohaijun/go-async-queue

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/zhaohaijun/go-async-queue

  • v0.0.0-20181025075425-96c54ee7c60c
  • Source
  • Go
  • Socket score

Version published
Created
Source

Actor模型的Go语言实现

进程内通讯

跨节点通讯

关于Actor模型

Actor是计算机科学领域中的一个并行计算模型,它把actors当做通用的并行计算原语:一个actor对接收到的消息做出响应,进行本地决策,可以创建更多的actor,或者发送更多的消息;同时准备接收下一条消息。

在Actor理论中,一切都被认为是actor,这和面向对象语言里一切都被看成对象很类似。但包括面向对象语言在内的软件通常是顺序执行的,而Actor模型本质上则是并发的。

每个Actor都有一个(只有一个)Mailbox。Mailbox相当于是一个小型的队列,一旦Sender发送消息,就是将该消息入队到Mailbox中。入队的顺序按照消息发送的时间顺序。Mailbox有多种实现,默认为FIFO。但也可以根据优先级考虑出队顺序,实现算法则不相同。

actor

Actor VS Channel

actor

channel

Actor模型的优点:

  1. actor的mailbox容量是无限的,不会造成写入时的阻塞

  2. 每个actor中所有消息共用一个mailbox(channel)。

  3. actor并不关心消息的发送方(writer),可以对各模块间的逻辑进行解耦合。

  4. actor可以部署在不同节点上。

Actor模型的缺点

  1. 因为Actor被设计为异步模型,同步调用的性能不高。

创建Actors

Props为声明如何创建Actors提供了基础,下面的例子通过定义如何处理消息的函数声明定义了Actor Props:

var props Props = actor.FromFunc(func(c Context) {
	// process messages
})

另外,可以创建一个结构体,通过定义一个Receive方法,实现了Actor的接口:

type MyActor struct {}

func (a *MyActor) Receive(c Context) {
	// process messages
}

var props Props = actor.FromProducer(func() Actor { return &MyActor{} })

Spawn和SpawnNamed使用给定的props去创建Actor的运行实例。一旦启动Actor就开始准备处理发来的消息。用系统给定的唯一名称来启动actor,使用:

pid := actor.Spawn(props)

结果返回唯一的PID。自己命名PID请使用 SpawnNamed 来启动Actor。

每次一个actor启动时,一个新的邮箱会被创建并关联PID。消息会发送到该邮箱然后actor来处理这些消息。

处理消息

Actor通过Receive方法来处理消息,此函数定义为:

Receive(c actor.Context)

系统会保证该方法被同步调用,因此无需做另外的保护措施。

与其他actors通讯

PID是向actors发送消息的主要接口,PID.Tell方法用于向该PID异步的发送消息:

pid.Tell("Hello World")

根据不同的业务需求,actors之间的通讯可以异步或者同步进行,不论任何时候,actors总是通过PID来进行通讯。

当使用PID.Request或者PID.RequestFuture来进行消息发送时,接受消息的actor将会通过Context.Sender方法来回应发送者,该方法返回发送者的PID。

同步通讯方面,actor使用Future来实现,actor再继续下一步之前会等待结果获取。

向actor发送消息并等待结果获取,请使用RequestFuture方法,该方法会返回一个Future:

f := actor.RequestFuture(pid,"Hello", 50 * time.Millisecond)
res, err := f.Result() // waits for pid to reply */

进程内通讯

性能

异步调用

protoactor-go目前可以每秒在两个actor之间传递200万条消息,并且能够保证消息的顺序。

/app/go/bin/go build -o "/tmp/Build performanceTest.go and rungo" 
/app/gopath/src/github.com/zhaohaijun/go-async-queue/example/performanceTest.go
start at time: 1516953710985385134
end at time 1516953716291953904
run time:10000000     elapsed time:5306 ms

串行同步调用

protoactor-go目前可以在串行同步调用的情况下每秒在client和server间传递超过50万条消息!

goos: linux
goarch: amd64
pkg: github.com/zhaohaijun/go-async-queue/example/benchmark
benchmark                  iter                 time/iter          bytes alloc          allocs
---------                  ----                ---------           -----------          ------
BenchmarkSyncTest-4   	 1000000	      1967 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1987 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1952 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1975 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1987 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
PASS
ok  	github.com/zhaohaijun/go-async-queue/example/benchmarks	10.984s

Hello world

type Hello struct{ Who string }
type HelloActor struct{}

func (state *HelloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    props := actor.FromProducer(func() actor.Actor { return &HelloActor{} })
    pid := actor.Spawn(props)
    pid.Tell(Hello{Who: "Roger"})
    console.ReadLine()
}

Two actors communicates each other

本例主要描述两个actor之间如何进行异步通讯,主要定义actor接收到信息之后的行为(Receive),包括处理方式和处理后的消息发送给哪个actor,异步通讯保证了actor的利用率。

type ping struct{ val int }
type pingActor struct{}

func (state *pingActor) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started, initialize actor here")
	case *actor.Stopping:
		fmt.Println("Stopping, actor is about shut down")
	case *actor.Restarting:
		fmt.Println("Restarting, actor is about restart")
	case *ping:
		val := msg.val
		if val < 10000000 {
			context.Sender().Request(&ping{val: val + 1}, context.Self())
		} else {
			end := time.Now().UnixNano()
			fmt.Printf("%s end %d\n", context.Self().Id, end)
		}
	}
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	props := actor.FromProducer(func() actor.Actor { return &pingActor{} })
	actora := actor.Spawn(props)
	actorb := actor.Spawn(props)
	fmt.Printf("begin time %d\n", time.Now().UnixNano())
	actora.Request(&ping{val: 1}, actorb)
	time.Sleep(10 * time.Second)
	actora.Stop()
	actorb.Stop()
	console.ReadLine()
}

Server/Client 同步调用

本例主要描述如何与actor(server)进行同步通讯,客户端将需求消息发送给actor,并等待actor的返回结果,该需求可能需要多个actor共同协作完成,多个actor之间采用上面例子中的异步通讯来进行处理,最后处理结果返回给client。

message.go
type Request struct {
	Who string
}

type Response struct {
	Welcome string
}
server.go
type Server struct {}

func (server *Server) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started, initialize server actor here")
	case *actor.Stopping:
		fmt.Println("Stopping, actor is about shut down")
	case *actor.Restarting:
		fmt.Println("Restarting, actor is about restart")
	case *message.Request:
		fmt.Println("Receive message", msg.Who)
		context.Sender().Request(&message.Response{Welcome: "Welcome!"}, context.Self())
	}
}

func (server *Server) Start() *actor.PID{
	props := actor.FromProducer(func() actor.Actor { return &Server{} })
	pid := actor.Spawn(props)
	return pid
}

func (server *Server) Stop(pid *actor.PID) {
	pid.Stop()
}
client.go
type Client struct {}


//Call the server synchronously
func (client *Client) SyncCall(serverPID *actor.PID) (interface{}, error) {
	future := serverPID.RequestFuture(&message.Request{Who: "Ontology"}, 10*time.Second)
	result, err := future.Result()
	return result, err
}
main.go
func main() {
	server := &server.Server{}
	client := &client.Client{}
	serverPID := server.Start()
	result, err := client.SyncCall(serverPID)
	if err != nil {
		fmt.Println("ERROR:", err)
	}
	fmt.Println(result)
}

EventHub

Actor可以通过EventHub 进行广播和订阅操作,支持ALL,ROUNDROBIN,RANDOM的广播模式

Example

package main

import (
	"github.com/zhaohaijun/go-async-queue/eventhub"
	"fmt"
	"github.com/zhaohaijun/go-async-queue/actor"

	"time"
)


type PubMessage struct{
	message string
}

type ResponseMessage struct{
	message string
}

func main() {

	eh:= eventhub.GlobalEventHub
	subprops := actor.FromFunc(func(context actor.Context) {
		switch msg := context.Message().(type) {

		case PubMessage:
			fmt.Println(context.Self().Id + " get message "+msg.message)
			context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())

		default:

		}
	})

	pubprops := actor.FromFunc(func(context actor.Context) {
		switch msg := context.Message().(type) {

		case ResponseMessage:
			fmt.Println(context.Self().Id + " get message "+msg.message)
			//context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())

		default:
			//fmt.Println("unknown message type")
		}
	})


	publisher, _ := actor.SpawnNamed(pubprops, "publisher")
	sub1, _ := actor.SpawnNamed(subprops, "sub1")
	sub2, _ := actor.SpawnNamed(subprops, "sub2")
	sub3, _ := actor.SpawnNamed(subprops, "sub3")

	topic:= "TEST"
	eh.Subscribe(topic,sub1)
	eh.Subscribe(topic,sub2)
	eh.Subscribe(topic,sub3)

	event := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyAll}
	eh.Publish(event)
	time.Sleep(2*time.Second)
	fmt.Println("before unsubscribe sleeping...")
	eh.Unsubscribe(topic,sub2)
	eh.Publish(event)
	time.Sleep(2*time.Second)

	fmt.Println("random event...")
	randomevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRandom}
	for i:=0 ;i<10;i++{
		eh.Publish(randomevent)
	}

	time.Sleep(2*time.Second)

	fmt.Println("roundrobin event...")
	roundevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRoundRobin}
	for i:=0 ;i<10;i++{
		eh.Publish(roundevent)
	}

	time.Sleep(2*time.Second)
}


跨节点通讯

本项目采用两种方式实现跨节点通讯,分别是grpc和zeromq,对应于项目中的remote和zmqremote包,使用过程中请根据需求选择所需导入的包(接口是一样的)。

使用zero mq 需要安装libzmq [https://github.com/zeromq/libzmq]

性能

微软云跨节点

模式256B大小消息512B大小消息1k大小消息10k大小消息100k大小消息1M大小消息4M大小消息8M大小消息
grpc120000/s100000/s85000/s40000/s4600/s490/s123/s超出grpc默认4m限制
zeromq170000/s140000/s10000/s45000/s4900/s500/s123/s62/s

序列化

该模块默认采用protobuf进行序列化和反序列化,使用过程中需要定义一系列的protobuf消息结构,为了和Ontology项目中采用的序列化方式集成,减轻系统改造工作量,目前也支持个性化的序列化和反序列化方式:

目前采用的方式是定义了一个通用的系统消息,消息结构为:消息类型 + 消息内容(序列化之后的数据),针对Ontology的常用结构体,目前枚举了六种常用的消息类型(address, block, header, Transaction, TxAttribute, VMCode),如下所示:

enum MsgType {
  ADDRESS_MSG_TYPE = 0;
  BLOCK_MSG_TYPE = 1;
  HEADER_MSG_TYPE = 2;
  TX_MSG_TYPE    = 3;
  TX_ATT_MSG_TYPE = 4;
  VM_CODE_MSG_TYPE = 5;
}

message MsgData {
  MsgType msgType = 1;
  bytes data = 2;
}

使用时现将所需传输的数据采用自定义的方式序列化,然后构造MsgData{msgType:xx, data:xx}, msgType按照上述枚举定义,data即为自定义序列化之后的数据。接收到的消息同样如此,收到消息之后按照msgType来执行相应的反序列化方法去反序列化data。简单的案例如下:

server.go

func main() {
	log.Debug("test")
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	zmqremote.Start("127.0.0.1:8080")

	props := actor.
		FromFunc(
			func(context actor.Context) {
				switch context.Message().(type) {
				case *zmqremote.MsgData:
					switch MsgData.MsgType:
						case 0:  //反序列化MsgData.Data
						case 1:  //反序列化MsgData.Data
						case 2:  //反序列化MsgData.Data
						case 3:  //反序列化MsgData.Data
						case 4:  //反序列化MsgData.Data
						case 5:  //反序列化MsgData.Data
					context.Sender().Tell(&zmqremote.MsgData{MsgType: 1, Data: []byte("123")})
				}
			}).
		WithMailbox(mailbox.Bounded(1000000))

	pid, _ := actor.SpawnNamed(props, "remote")
	fmt.Println(pid)

	for {
		time.Sleep(1 * time.Second)
	}
}

client.go

func main() {

	log.Debug("test")
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	var wg sync.WaitGroup

	messageCount := 500

	zmqremote.Start("127.0.0.1:8081")

	props := actor.
		FromProducer(newLocalActor(&wg, messageCount)).
		WithMailbox(mailbox.Bounded(1000000))

	pid := actor.Spawn(props)
	fmt.Println(pid)

	remotePid := actor.NewPID("127.0.0.1:8080", "remote")
	wg.Add(1)

	start := time.Now()
	fmt.Println("Starting to send")

	message := &zmqremote.MsgData{MsgType: 1, Data: []byte("123")}
	for i := 0; i < messageCount; i++ {
		remotePid.Request(message, pid)
	}

	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Elapsed %s", elapsed)

	x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
	fmt.Printf("Msg per sec %v", x)
}

Benchmark

node2/main.go

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	remote.Start("127.0.0.1:8080")
	var sender *actor.PID
	props := actor.
		FromFunc(
			func(context actor.Context) {
				switch msg := context.Message().(type) {
				case *messages.StartRemote:
					fmt.Println("Starting")
					sender = msg.Sender
					context.Respond(&messages.Start{})
				case *messages.Ping:
					sender.Tell(&messages.Pong{})
				}
			}).
		WithMailbox(mailbox.Bounded(1000000))
	actor.SpawnNamed(props, "remote")
	for{
		time.Sleep(1 * time.Second)
	}
}

node1/main.go

type localActor struct {
	count        int
	wgStop       *sync.WaitGroup
	messageCount int
}

func (state *localActor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *messages.Pong:
		state.count++
		if state.count%50000 == 0 {
			fmt.Println(state.count)
		}
		if state.count == state.messageCount {
			state.wgStop.Done()
		}
	}
}

func newLocalActor(stop *sync.WaitGroup, messageCount int) actor.Producer {
	return func() actor.Actor {
		return &localActor{
			wgStop:       stop,
			messageCount: messageCount,
		}
	}
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	var wg sync.WaitGroup

	messageCount := 50000
	//remote.DefaultSerializerID = 1
	remote.Start("127.0.0.1:8081")

	props := actor.
		FromProducer(newLocalActor(&wg, messageCount)).
		WithMailbox(mailbox.Bounded(1000000))

	pid := actor.Spawn(props)

	remotePid := actor.NewPID("127.0.0.1:8080", "remote")
	remotePid.
		RequestFuture(&messages.StartRemote{
			Sender: pid,
		}, 5*time.Second).
		Wait()

	wg.Add(1)

	start := time.Now()
	fmt.Println("Starting to send")

	bb := bytes.NewBuffer([]byte(""))
	for i := 0; i < 2000; i++ {
		bb.WriteString("1234567890")
	}
	message := &messages.Ping{Data: bb.Bytes()}
	for i := 0; i < messageCount; i++ {
		remotePid.Tell(message)
	}

	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Elapsed %s", elapsed)

	x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
	fmt.Printf("Msg per sec %v", x)
}

messages/protos.proto

protobuf文件的生成命令: protoc -I=$GOPATH/src -I=$GOPATH/src/github.com/gogo/protobuf/protobuf/ --gogoslick_out=plugins=grpc:. /path/to/protos.proto

syntax = "proto3";
package messages;
import "github.com/zhaohaijun/go-async-queue/actor/protos.proto";

message Start {}
message StartRemote {
    actor.PID Sender = 1;
}
message Ping {
    bytes Data = 1;
}
message Pong {}

FAQs

Package last updated on 25 Oct 2018

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc