Security News
Supply Chain Attack Detected in Solana's web3.js Library
A supply chain attack has been detected in versions 1.95.6 and 1.95.7 of the popular @solana/web3.js library.
github.com/zhaohaijun/go-async-queue
Actor是计算机科学领域中的一个并行计算模型,它把actors当做通用的并行计算原语:一个actor对接收到的消息做出响应,进行本地决策,可以创建更多的actor,或者发送更多的消息;同时准备接收下一条消息。
在Actor理论中,一切都被认为是actor,这和面向对象语言里一切都被看成对象很类似。但包括面向对象语言在内的软件通常是顺序执行的,而Actor模型本质上则是并发的。
每个Actor都有一个(只有一个)Mailbox。Mailbox相当于是一个小型的队列,一旦Sender发送消息,就是将该消息入队到Mailbox中。入队的顺序按照消息发送的时间顺序。Mailbox有多种实现,默认为FIFO。但也可以根据优先级考虑出队顺序,实现算法则不相同。
actor的mailbox容量是无限的,不会造成写入时的阻塞
每个actor中所有消息共用一个mailbox(channel)。
actor并不关心消息的发送方(writer),可以对各模块间的逻辑进行解耦合。
actor可以部署在不同节点上。
Props为声明如何创建Actors提供了基础,下面的例子通过定义如何处理消息的函数声明定义了Actor Props:
var props Props = actor.FromFunc(func(c Context) {
// process messages
})
另外,可以创建一个结构体,通过定义一个Receive方法,实现了Actor的接口:
type MyActor struct {}
func (a *MyActor) Receive(c Context) {
// process messages
}
var props Props = actor.FromProducer(func() Actor { return &MyActor{} })
Spawn和SpawnNamed使用给定的props去创建Actor的运行实例。一旦启动Actor就开始准备处理发来的消息。用系统给定的唯一名称来启动actor,使用:
pid := actor.Spawn(props)
结果返回唯一的PID。自己命名PID请使用 SpawnNamed 来启动Actor。
每次一个actor启动时,一个新的邮箱会被创建并关联PID。消息会发送到该邮箱然后actor来处理这些消息。
Actor通过Receive方法来处理消息,此函数定义为:
Receive(c actor.Context)
系统会保证该方法被同步调用,因此无需做另外的保护措施。
PID是向actors发送消息的主要接口,PID.Tell方法用于向该PID异步的发送消息:
pid.Tell("Hello World")
根据不同的业务需求,actors之间的通讯可以异步或者同步进行,不论任何时候,actors总是通过PID来进行通讯。
当使用PID.Request或者PID.RequestFuture来进行消息发送时,接受消息的actor将会通过Context.Sender方法来回应发送者,该方法返回发送者的PID。
同步通讯方面,actor使用Future来实现,actor再继续下一步之前会等待结果获取。
向actor发送消息并等待结果获取,请使用RequestFuture方法,该方法会返回一个Future:
f := actor.RequestFuture(pid,"Hello", 50 * time.Millisecond)
res, err := f.Result() // waits for pid to reply */
protoactor-go目前可以每秒在两个actor之间传递200万条消息,并且能够保证消息的顺序。
/app/go/bin/go build -o "/tmp/Build performanceTest.go and rungo"
/app/gopath/src/github.com/zhaohaijun/go-async-queue/example/performanceTest.go
start at time: 1516953710985385134
end at time 1516953716291953904
run time:10000000 elapsed time:5306 ms
protoactor-go目前可以在串行同步调用的情况下每秒在client和server间传递超过50万条消息!
goos: linux
goarch: amd64
pkg: github.com/zhaohaijun/go-async-queue/example/benchmark
benchmark iter time/iter bytes alloc allocs
--------- ---- --------- ----------- ------
BenchmarkSyncTest-4 1000000 1967 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1987 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1952 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1975 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1987 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
PASS
ok github.com/zhaohaijun/go-async-queue/example/benchmarks 10.984s
type Hello struct{ Who string }
type HelloActor struct{}
func (state *HelloActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
}
}
func main() {
props := actor.FromProducer(func() actor.Actor { return &HelloActor{} })
pid := actor.Spawn(props)
pid.Tell(Hello{Who: "Roger"})
console.ReadLine()
}
本例主要描述两个actor之间如何进行异步通讯,主要定义actor接收到信息之后的行为(Receive),包括处理方式和处理后的消息发送给哪个actor,异步通讯保证了actor的利用率。
type ping struct{ val int }
type pingActor struct{}
func (state *pingActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
fmt.Println("Started, initialize actor here")
case *actor.Stopping:
fmt.Println("Stopping, actor is about shut down")
case *actor.Restarting:
fmt.Println("Restarting, actor is about restart")
case *ping:
val := msg.val
if val < 10000000 {
context.Sender().Request(&ping{val: val + 1}, context.Self())
} else {
end := time.Now().UnixNano()
fmt.Printf("%s end %d\n", context.Self().Id, end)
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
props := actor.FromProducer(func() actor.Actor { return &pingActor{} })
actora := actor.Spawn(props)
actorb := actor.Spawn(props)
fmt.Printf("begin time %d\n", time.Now().UnixNano())
actora.Request(&ping{val: 1}, actorb)
time.Sleep(10 * time.Second)
actora.Stop()
actorb.Stop()
console.ReadLine()
}
本例主要描述如何与actor(server)进行同步通讯,客户端将需求消息发送给actor,并等待actor的返回结果,该需求可能需要多个actor共同协作完成,多个actor之间采用上面例子中的异步通讯来进行处理,最后处理结果返回给client。
type Request struct {
Who string
}
type Response struct {
Welcome string
}
type Server struct {}
func (server *Server) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
fmt.Println("Started, initialize server actor here")
case *actor.Stopping:
fmt.Println("Stopping, actor is about shut down")
case *actor.Restarting:
fmt.Println("Restarting, actor is about restart")
case *message.Request:
fmt.Println("Receive message", msg.Who)
context.Sender().Request(&message.Response{Welcome: "Welcome!"}, context.Self())
}
}
func (server *Server) Start() *actor.PID{
props := actor.FromProducer(func() actor.Actor { return &Server{} })
pid := actor.Spawn(props)
return pid
}
func (server *Server) Stop(pid *actor.PID) {
pid.Stop()
}
type Client struct {}
//Call the server synchronously
func (client *Client) SyncCall(serverPID *actor.PID) (interface{}, error) {
future := serverPID.RequestFuture(&message.Request{Who: "Ontology"}, 10*time.Second)
result, err := future.Result()
return result, err
}
func main() {
server := &server.Server{}
client := &client.Client{}
serverPID := server.Start()
result, err := client.SyncCall(serverPID)
if err != nil {
fmt.Println("ERROR:", err)
}
fmt.Println(result)
}
Actor可以通过EventHub 进行广播和订阅操作,支持ALL,ROUNDROBIN,RANDOM的广播模式
package main
import (
"github.com/zhaohaijun/go-async-queue/eventhub"
"fmt"
"github.com/zhaohaijun/go-async-queue/actor"
"time"
)
type PubMessage struct{
message string
}
type ResponseMessage struct{
message string
}
func main() {
eh:= eventhub.GlobalEventHub
subprops := actor.FromFunc(func(context actor.Context) {
switch msg := context.Message().(type) {
case PubMessage:
fmt.Println(context.Self().Id + " get message "+msg.message)
context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())
default:
}
})
pubprops := actor.FromFunc(func(context actor.Context) {
switch msg := context.Message().(type) {
case ResponseMessage:
fmt.Println(context.Self().Id + " get message "+msg.message)
//context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())
default:
//fmt.Println("unknown message type")
}
})
publisher, _ := actor.SpawnNamed(pubprops, "publisher")
sub1, _ := actor.SpawnNamed(subprops, "sub1")
sub2, _ := actor.SpawnNamed(subprops, "sub2")
sub3, _ := actor.SpawnNamed(subprops, "sub3")
topic:= "TEST"
eh.Subscribe(topic,sub1)
eh.Subscribe(topic,sub2)
eh.Subscribe(topic,sub3)
event := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyAll}
eh.Publish(event)
time.Sleep(2*time.Second)
fmt.Println("before unsubscribe sleeping...")
eh.Unsubscribe(topic,sub2)
eh.Publish(event)
time.Sleep(2*time.Second)
fmt.Println("random event...")
randomevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRandom}
for i:=0 ;i<10;i++{
eh.Publish(randomevent)
}
time.Sleep(2*time.Second)
fmt.Println("roundrobin event...")
roundevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRoundRobin}
for i:=0 ;i<10;i++{
eh.Publish(roundevent)
}
time.Sleep(2*time.Second)
}
本项目采用两种方式实现跨节点通讯,分别是grpc和zeromq,对应于项目中的remote和zmqremote包,使用过程中请根据需求选择所需导入的包(接口是一样的)。
使用zero mq 需要安装libzmq [https://github.com/zeromq/libzmq]
模式 | 256B大小消息 | 512B大小消息 | 1k大小消息 | 10k大小消息 | 100k大小消息 | 1M大小消息 | 4M大小消息 | 8M大小消息 |
---|---|---|---|---|---|---|---|---|
grpc | 120000/s | 100000/s | 85000/s | 40000/s | 4600/s | 490/s | 123/s | 超出grpc默认4m限制 |
zeromq | 170000/s | 140000/s | 10000/s | 45000/s | 4900/s | 500/s | 123/s | 62/s |
该模块默认采用protobuf进行序列化和反序列化,使用过程中需要定义一系列的protobuf消息结构,为了和Ontology项目中采用的序列化方式集成,减轻系统改造工作量,目前也支持个性化的序列化和反序列化方式:
目前采用的方式是定义了一个通用的系统消息,消息结构为:消息类型 + 消息内容(序列化之后的数据),针对Ontology的常用结构体,目前枚举了六种常用的消息类型(address, block, header, Transaction, TxAttribute, VMCode),如下所示:
enum MsgType {
ADDRESS_MSG_TYPE = 0;
BLOCK_MSG_TYPE = 1;
HEADER_MSG_TYPE = 2;
TX_MSG_TYPE = 3;
TX_ATT_MSG_TYPE = 4;
VM_CODE_MSG_TYPE = 5;
}
message MsgData {
MsgType msgType = 1;
bytes data = 2;
}
使用时现将所需传输的数据采用自定义的方式序列化,然后构造MsgData{msgType:xx, data:xx}, msgType按照上述枚举定义,data即为自定义序列化之后的数据。接收到的消息同样如此,收到消息之后按照msgType来执行相应的反序列化方法去反序列化data。简单的案例如下:
server.go
func main() {
log.Debug("test")
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
zmqremote.Start("127.0.0.1:8080")
props := actor.
FromFunc(
func(context actor.Context) {
switch context.Message().(type) {
case *zmqremote.MsgData:
switch MsgData.MsgType:
case 0: //反序列化MsgData.Data
case 1: //反序列化MsgData.Data
case 2: //反序列化MsgData.Data
case 3: //反序列化MsgData.Data
case 4: //反序列化MsgData.Data
case 5: //反序列化MsgData.Data
context.Sender().Tell(&zmqremote.MsgData{MsgType: 1, Data: []byte("123")})
}
}).
WithMailbox(mailbox.Bounded(1000000))
pid, _ := actor.SpawnNamed(props, "remote")
fmt.Println(pid)
for {
time.Sleep(1 * time.Second)
}
}
client.go
func main() {
log.Debug("test")
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
var wg sync.WaitGroup
messageCount := 500
zmqremote.Start("127.0.0.1:8081")
props := actor.
FromProducer(newLocalActor(&wg, messageCount)).
WithMailbox(mailbox.Bounded(1000000))
pid := actor.Spawn(props)
fmt.Println(pid)
remotePid := actor.NewPID("127.0.0.1:8080", "remote")
wg.Add(1)
start := time.Now()
fmt.Println("Starting to send")
message := &zmqremote.MsgData{MsgType: 1, Data: []byte("123")}
for i := 0; i < messageCount; i++ {
remotePid.Request(message, pid)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Elapsed %s", elapsed)
x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
fmt.Printf("Msg per sec %v", x)
}
node2/main.go
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
remote.Start("127.0.0.1:8080")
var sender *actor.PID
props := actor.
FromFunc(
func(context actor.Context) {
switch msg := context.Message().(type) {
case *messages.StartRemote:
fmt.Println("Starting")
sender = msg.Sender
context.Respond(&messages.Start{})
case *messages.Ping:
sender.Tell(&messages.Pong{})
}
}).
WithMailbox(mailbox.Bounded(1000000))
actor.SpawnNamed(props, "remote")
for{
time.Sleep(1 * time.Second)
}
}
node1/main.go
type localActor struct {
count int
wgStop *sync.WaitGroup
messageCount int
}
func (state *localActor) Receive(context actor.Context) {
switch context.Message().(type) {
case *messages.Pong:
state.count++
if state.count%50000 == 0 {
fmt.Println(state.count)
}
if state.count == state.messageCount {
state.wgStop.Done()
}
}
}
func newLocalActor(stop *sync.WaitGroup, messageCount int) actor.Producer {
return func() actor.Actor {
return &localActor{
wgStop: stop,
messageCount: messageCount,
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
var wg sync.WaitGroup
messageCount := 50000
//remote.DefaultSerializerID = 1
remote.Start("127.0.0.1:8081")
props := actor.
FromProducer(newLocalActor(&wg, messageCount)).
WithMailbox(mailbox.Bounded(1000000))
pid := actor.Spawn(props)
remotePid := actor.NewPID("127.0.0.1:8080", "remote")
remotePid.
RequestFuture(&messages.StartRemote{
Sender: pid,
}, 5*time.Second).
Wait()
wg.Add(1)
start := time.Now()
fmt.Println("Starting to send")
bb := bytes.NewBuffer([]byte(""))
for i := 0; i < 2000; i++ {
bb.WriteString("1234567890")
}
message := &messages.Ping{Data: bb.Bytes()}
for i := 0; i < messageCount; i++ {
remotePid.Tell(message)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Elapsed %s", elapsed)
x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
fmt.Printf("Msg per sec %v", x)
}
messages/protos.proto
protobuf文件的生成命令: protoc -I=$GOPATH/src -I=$GOPATH/src/github.com/gogo/protobuf/protobuf/ --gogoslick_out=plugins=grpc:. /path/to/protos.proto
syntax = "proto3";
package messages;
import "github.com/zhaohaijun/go-async-queue/actor/protos.proto";
message Start {}
message StartRemote {
actor.PID Sender = 1;
}
message Ping {
bytes Data = 1;
}
message Pong {}
FAQs
Unknown package
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
A supply chain attack has been detected in versions 1.95.6 and 1.95.7 of the popular @solana/web3.js library.
Research
Security News
A malicious npm package targets Solana developers, rerouting funds in 2% of transactions to a hardcoded address.
Security News
Research
Socket researchers have discovered malicious npm packages targeting crypto developers, stealing credentials and wallet data using spyware delivered through typosquats of popular cryptographic libraries.