Socket
Socket
Sign inDemoInstall

github.com/kumustone/tcpstream

Package Overview
Dependencies
0
Alerts
File Explorer

Install Socket

Detect and block malicious and high-risk dependencies

Install

    github.com/kumustone/tcpstream


Version published

Readme

Source

一个简单的TCP长链的网络库,可以帮助你很简便的开发基于tcp socket连接的业务。同时封装了一个类似于rpc功能的同步调用接口,可以模拟双向RPC调用,服务端主动推送,广播等业务功能。

整个库的核心在于tcpstream,主要支持一下功能:

  • 支持Tcp字节流自动分包功能,使用者感知到的是msg层面的数据,不用关心底层的数据收发;
  • 支持client主动发起的Tcpstream,和Server端Accept派生出来的TcpStream,除了重连功能外,二者其他的调用接口完全相同;SyncClient基于同一条TcpStream实现双向RPC调用;
  • 支持多路复用,多个RPC请求使用一条socket连接即可,内部通信是全异步;在用户端看来SynClient是同步功能;
  • Client的tcpstream默认支持自动重连功能,连接断开后每隔5s向对端发起重连,如果不想重连调用stop接口销毁次tcpstream即可;
  • 不支持默认的心跳包功能,如业务方需要自己在上层实现;

Sync调用功能:

  • client与server建立一个连接后,可通过RequestID的映射功能,实现请求方类似RPC调用功能;
  • 基于一条连接可以实现双向RPC调用的效果;这样就解决了server向client发起rpc调用复杂的问题,client端可以不用再启用rpcServer.
  • 同步调用接口的业务包序列化和反序列化有自己来实现;

同步调用客户端的例子

  • 创建一个客户端
	c := NewSyncClient("127.0.0.1:7001")

基于已经存在tcpstream的创建,client端和server都可以

	c := NewSyncFromTcpStream(tcpstream)
  • 同步调用,输入是要发送的msg,接收到的是对端返回的msg;然后对msg反序列化即可实现自己的业务;

	if resp, err := c.Call(&m, time.Duration(2*time.Second)); err != nil {
		fmt.Println("call fail ", err.Error())
	} else {
		//反序列化resp,do something
	}

一个client和server的例子

server


type Handler struct {
	name string
}

func (h *handler) OnData(conn *TcpStream, msg *Message) error {
	_ = conn.Write(&Message{
		//因为是多路复用的,返回的seq必须要与client请求的seq对应
		Header: ProtoHeader{
			Seq: msg.Header.Seq,
		},
		Body: []byte("I am server"),
	})
	return nil
}
func (h *handler) OnConn(conn *TcpStream)    {}
func (h *handler) OnDisConn(conn *TcpStream) {}

func main() {
	tcpstream.NewTCPServer(address, &Handler{}).Serve()
	select {
	}
}

client:


var address = "127.0.0.1:7001"

type Handler struct{}

func (h *Handler) OnData(conn *tcpstream.TcpStream, msg *tcpstream.Message) error {
	fmt.Println("receive from server: ", string(msg.Body))
	resp := &tcpstream.Message{
		Header: msg.Header,
		Body:   []byte("I am client"),
	}
	time.Sleep(time.Second * 1)
	return conn.Write(resp)
}

func (h *Handler) OnConn(conn *tcpstream.TcpStream) {
	fmt.Println("Connection to ", conn.RemoteAddr)

	_ = conn.Write(&tcpstream.Message{
		Body: []byte("I am client"),
	})
}

func (h *Handler) OnDisConn(conn *tcpstream.TcpStream) {
	fmt.Println("DisConnection to ", conn.RemoteAddr)
}

func main() {
	go func() {
		fmt.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	for i := 0; i < 100; i++ {
		go func() {
			client := tcpstream.NewConnTcpStream(address, &Handler{})
			client.Start()
			time.Sleep(10 * time.Second)
			client.Stop()
		}()
	}
	select {}
}

FAQs

Last updated on 25 Sep 2019

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc