Dendrite
Dendrite is a Go package that implements distributed hash table (DHT) based on Chord Protocol.
Included sub-package 'dtable' is built on top of dendrite and implements
distributed in-memory key/value database, with replication and failover support,
with query interface to Get() or Set() items with different consistency levels.
For better key distribution, dendrite allows configurable number of virtual nodes
per instance (vnodes). The number of replicas in dtable is also configurable.
Calling application can bootstrap the cluster, or join existing one by connecting to any of
existing nodes (must be manually specified). Node discovery is not part of the implementation.
Use consul (consul.io) or something else for that purpose.
Chord protocol defines ring stabilization. In dendrite, stabilization period is configurable.
Node to node (network) communication is built on top of ZeroMQ sockets over TCP for speed, clustering
and reliability. Dendrite starts configurable number of goroutines (default: 10) for load balanced
serving of remote requests, but scales that number up and down depending on the load (aka prefork model).
All messages sent through dendrite are encapsulated in ChordMsg structure, where first byte indicates message type,
and actual data follows. Data part is serialized with protocol buffers.
Dendrite can be extended through two interfaces:
- TransportHook
- DelegateHook
TransportHook allows other packages to provide additional message types, decoders and handlers, while DelegateHook
can be used to capture chord events that dendrite emits:
- EvPredecessorJoined
- EvPredecessorLeft
- EvReplicasChanged
Documentation
Usage
import "github.com/fastfn/dendrite"
import "github.com/fastfn/dendrite/dtable"
...
// Initialize ZMQTransport with timeout set to 5 seconds
transport, err := dendrite.InitZMQTransport("127.0.0.1:5000", 5*time.Second)
if err != nil {
panic(err)
return
}
config := dendrite.DefaultConfig("127.0.0.1:5000")
Bootstrap the cluster (first node)
// Start new cluster
ring, err = dendrite.CreateRing(config, transport)
if err != nil {
panic(err)
}
table = dtable.Init(ring, transport, dtable.LogInfo)
Joining the cluster
// We join the cluster by providing the address of one of existing nodes in the cluster.
ring, err = dendrite.JoinRing(config, transport, "192.168.0.50:5000")
if err != nil {
panic(err)
}
table = dtable.Init(ring, transport, dtable.LogInfo)
DTable Query examples
Set()
query := table.NewQuery()
err := query.Set([]byte("testkey"), []byte("testvalue"))
if err != nil {
panic(err)
}
Set() with consistency
Consistency() is used prior to Set() to request minimum writes before operation returns success.
If dtable runs with 2 replicas, user may request 2 writes (primary + 1 replica) and let dtable
handle final write in the background. If requested value is larger than configured dendrite replicas,
value is reset to default. Default is 1.
query := table.NewQuery()
err := query.Consistency(2).Set([]byte("testkey"), []byte("testvalue"))
if err != nil {
panic(err)
}
Get()
query := table.NewQuery()
item, err := query.Get([]byte("testkey"))
if err != nil {
log.Println("Got error in table Get: ", err)
} else if item == nil {
log.Printf("item not found")
} else {
log.Printf("Value is: %s\n", string(item.Val))
}
GetLocalKeys()
GetLocalKeys() returns the list of all keys stored on local node.
query := table.NewQuery()
for _, key := range query.GetLocalKeys() {
log.Printf("Key: %s\n", string(key))
}
Todo
- dtable: support SetMulti() and GetMulti() on public interface
- dtable: support batches on replication/migration ops
- dendrite: add some kind of security for inter communication between nodes