summaryrefslogtreecommitdiff
path: root/vendor/github.com/docker
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/docker')
-rw-r--r--vendor/github.com/docker/libkv/store/etcd/etcd.go606
-rw-r--r--vendor/github.com/docker/libkv/store/etcd/etcd_test.go58
-rw-r--r--vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go429
-rw-r--r--vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go54
4 files changed, 1147 insertions, 0 deletions
diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd.go b/vendor/github.com/docker/libkv/store/etcd/etcd.go
new file mode 100644
index 00000000..c932ca66
--- /dev/null
+++ b/vendor/github.com/docker/libkv/store/etcd/etcd.go
@@ -0,0 +1,606 @@
+package etcd
+
+import (
+ "crypto/tls"
+ "errors"
+ "log"
+ "net"
+ "net/http"
+ "strings"
+ "time"
+
+ "golang.org/x/net/context"
+
+ etcd "github.com/coreos/etcd/client"
+ "github.com/docker/libkv"
+ "github.com/docker/libkv/store"
+)
+
+var (
+ // ErrAbortTryLock is thrown when a user stops trying to seek the lock
+ // by sending a signal to the stop chan, this is used to verify if the
+ // operation succeeded
+ ErrAbortTryLock = errors.New("lock operation aborted")
+)
+
+// Etcd is the receiver type for the
+// Store interface
+type Etcd struct {
+ client etcd.KeysAPI
+}
+
+type etcdLock struct {
+ client etcd.KeysAPI
+ stopLock chan struct{}
+ stopRenew chan struct{}
+ key string
+ value string
+ last *etcd.Response
+ ttl time.Duration
+}
+
+const (
+ periodicSync = 5 * time.Minute
+ defaultLockTTL = 20 * time.Second
+ defaultUpdateTime = 5 * time.Second
+)
+
+// Register registers etcd to libkv
+func Register() {
+ libkv.AddStore(store.ETCD, New)
+}
+
+// New creates a new Etcd client given a list
+// of endpoints and an optional tls config
+func New(addrs []string, options *store.Config) (store.Store, error) {
+ s := &Etcd{}
+
+ var (
+ entries []string
+ err error
+ )
+
+ entries = store.CreateEndpoints(addrs, "http")
+ cfg := &etcd.Config{
+ Endpoints: entries,
+ Transport: etcd.DefaultTransport,
+ HeaderTimeoutPerRequest: 3 * time.Second,
+ }
+
+ // Set options
+ if options != nil {
+ if options.TLS != nil {
+ setTLS(cfg, options.TLS, addrs)
+ }
+ if options.ConnectionTimeout != 0 {
+ setTimeout(cfg, options.ConnectionTimeout)
+ }
+ if options.Username != "" {
+ setCredentials(cfg, options.Username, options.Password)
+ }
+ }
+
+ c, err := etcd.New(*cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ s.client = etcd.NewKeysAPI(c)
+
+ // Periodic Cluster Sync
+ go func() {
+ for {
+ if err := c.AutoSync(context.Background(), periodicSync); err != nil {
+ return
+ }
+ }
+ }()
+
+ return s, nil
+}
+
+// SetTLS sets the tls configuration given a tls.Config scheme
+func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) {
+ entries := store.CreateEndpoints(addrs, "https")
+ cfg.Endpoints = entries
+
+ // Set transport
+ t := http.Transport{
+ Dial: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ }).Dial,
+ TLSHandshakeTimeout: 10 * time.Second,
+ TLSClientConfig: tls,
+ }
+
+ cfg.Transport = &t
+}
+
+// setTimeout sets the timeout used for connecting to the store
+func setTimeout(cfg *etcd.Config, time time.Duration) {
+ cfg.HeaderTimeoutPerRequest = time
+}
+
+// setCredentials sets the username/password credentials for connecting to Etcd
+func setCredentials(cfg *etcd.Config, username, password string) {
+ cfg.Username = username
+ cfg.Password = password
+}
+
+// Normalize the key for usage in Etcd
+func (s *Etcd) normalize(key string) string {
+ key = store.Normalize(key)
+ return strings.TrimPrefix(key, "/")
+}
+
+// keyNotFound checks on the error returned by the KeysAPI
+// to verify if the key exists in the store or not
+func keyNotFound(err error) bool {
+ if err != nil {
+ if etcdError, ok := err.(etcd.Error); ok {
+ if etcdError.Code == etcd.ErrorCodeKeyNotFound ||
+ etcdError.Code == etcd.ErrorCodeNotFile ||
+ etcdError.Code == etcd.ErrorCodeNotDir {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+// Get the value at "key", returns the last modified
+// index to use in conjunction to Atomic calls
+func (s *Etcd) Get(key string) (pair *store.KVPair, err error) {
+ getOpts := &etcd.GetOptions{
+ Quorum: true,
+ }
+
+ result, err := s.client.Get(context.Background(), s.normalize(key), getOpts)
+ if err != nil {
+ if keyNotFound(err) {
+ return nil, store.ErrKeyNotFound
+ }
+ return nil, err
+ }
+
+ pair = &store.KVPair{
+ Key: key,
+ Value: []byte(result.Node.Value),
+ LastIndex: result.Node.ModifiedIndex,
+ }
+
+ return pair, nil
+}
+
+// Put a value at "key"
+func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
+ setOpts := &etcd.SetOptions{}
+
+ // Set options
+ if opts != nil {
+ setOpts.Dir = opts.IsDir
+ setOpts.TTL = opts.TTL
+ }
+
+ _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts)
+ return err
+}
+
+// Delete a value at "key"
+func (s *Etcd) Delete(key string) error {
+ opts := &etcd.DeleteOptions{
+ Recursive: false,
+ }
+
+ _, err := s.client.Delete(context.Background(), s.normalize(key), opts)
+ if keyNotFound(err) {
+ return store.ErrKeyNotFound
+ }
+ return err
+}
+
+// Exists checks if the key exists inside the store
+func (s *Etcd) Exists(key string) (bool, error) {
+ _, err := s.Get(key)
+ if err != nil {
+ if err == store.ErrKeyNotFound {
+ return false, nil
+ }
+ return false, err
+ }
+ return true, nil
+}
+
+// Watch for changes on a "key"
+// It returns a channel that will receive changes or pass
+// on errors. Upon creation, the current value will first
+// be sent to the channel. Providing a non-nil stopCh can
+// be used to stop watching.
+func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
+ opts := &etcd.WatcherOptions{Recursive: false}
+ watcher := s.client.Watcher(s.normalize(key), opts)
+
+ // watchCh is sending back events to the caller
+ watchCh := make(chan *store.KVPair)
+
+ go func() {
+ defer close(watchCh)
+
+ // Get the current value
+ pair, err := s.Get(key)
+ if err != nil {
+ return
+ }
+
+ // Push the current value through the channel.
+ watchCh <- pair
+
+ for {
+ // Check if the watch was stopped by the caller
+ select {
+ case <-stopCh:
+ return
+ default:
+ }
+
+ result, err := watcher.Next(context.Background())
+
+ if err != nil {
+ return
+ }
+
+ watchCh <- &store.KVPair{
+ Key: key,
+ Value: []byte(result.Node.Value),
+ LastIndex: result.Node.ModifiedIndex,
+ }
+ }
+ }()
+
+ return watchCh, nil
+}
+
+// WatchTree watches for changes on a "directory"
+// It returns a channel that will receive changes or pass
+// on errors. Upon creating a watch, the current childs values
+// will be sent to the channel. Providing a non-nil stopCh can
+// be used to stop watching.
+func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
+ watchOpts := &etcd.WatcherOptions{Recursive: true}
+ watcher := s.client.Watcher(s.normalize(directory), watchOpts)
+
+ // watchCh is sending back events to the caller
+ watchCh := make(chan []*store.KVPair)
+
+ go func() {
+ defer close(watchCh)
+
+ // Get child values
+ list, err := s.List(directory)
+ if err != nil {
+ return
+ }
+
+ // Push the current value through the channel.
+ watchCh <- list
+
+ for {
+ // Check if the watch was stopped by the caller
+ select {
+ case <-stopCh:
+ return
+ default:
+ }
+
+ _, err := watcher.Next(context.Background())
+
+ if err != nil {
+ return
+ }
+
+ list, err = s.List(directory)
+ if err != nil {
+ return
+ }
+
+ watchCh <- list
+ }
+ }()
+
+ return watchCh, nil
+}
+
+// AtomicPut puts a value at "key" if the key has not been
+// modified in the meantime, throws an error if this is the case
+func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) {
+ var (
+ meta *etcd.Response
+ err error
+ )
+
+ setOpts := &etcd.SetOptions{}
+
+ if previous != nil {
+ setOpts.PrevExist = etcd.PrevExist
+ setOpts.PrevIndex = previous.LastIndex
+ if previous.Value != nil {
+ setOpts.PrevValue = string(previous.Value)
+ }
+ } else {
+ setOpts.PrevExist = etcd.PrevNoExist
+ }
+
+ if opts != nil {
+ if opts.TTL > 0 {
+ setOpts.TTL = opts.TTL
+ }
+ }
+
+ meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts)
+ if err != nil {
+ if etcdError, ok := err.(etcd.Error); ok {
+ // Compare failed
+ if etcdError.Code == etcd.ErrorCodeTestFailed {
+ return false, nil, store.ErrKeyModified
+ }
+ // Node exists error (when PrevNoExist)
+ if etcdError.Code == etcd.ErrorCodeNodeExist {
+ return false, nil, store.ErrKeyExists
+ }
+ }
+ return false, nil, err
+ }
+
+ updated := &store.KVPair{
+ Key: key,
+ Value: value,
+ LastIndex: meta.Node.ModifiedIndex,
+ }
+
+ return true, updated, nil
+}
+
+// AtomicDelete deletes a value at "key" if the key
+// has not been modified in the meantime, throws an
+// error if this is the case
+func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
+ if previous == nil {
+ return false, store.ErrPreviousNotSpecified
+ }
+
+ delOpts := &etcd.DeleteOptions{}
+
+ if previous != nil {
+ delOpts.PrevIndex = previous.LastIndex
+ if previous.Value != nil {
+ delOpts.PrevValue = string(previous.Value)
+ }
+ }
+
+ _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts)
+ if err != nil {
+ if etcdError, ok := err.(etcd.Error); ok {
+ // Key Not Found
+ if etcdError.Code == etcd.ErrorCodeKeyNotFound {
+ return false, store.ErrKeyNotFound
+ }
+ // Compare failed
+ if etcdError.Code == etcd.ErrorCodeTestFailed {
+ return false, store.ErrKeyModified
+ }
+ }
+ return false, err
+ }
+
+ return true, nil
+}
+
+// List child nodes of a given directory
+func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
+ getOpts := &etcd.GetOptions{
+ Quorum: true,
+ Recursive: true,
+ Sort: true,
+ }
+
+ resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts)
+ if err != nil {
+ if keyNotFound(err) {
+ return nil, store.ErrKeyNotFound
+ }
+ return nil, err
+ }
+
+ kv := []*store.KVPair{}
+ for _, n := range resp.Node.Nodes {
+ kv = append(kv, &store.KVPair{
+ Key: n.Key,
+ Value: []byte(n.Value),
+ LastIndex: n.ModifiedIndex,
+ })
+ }
+ return kv, nil
+}
+
+// DeleteTree deletes a range of keys under a given directory
+func (s *Etcd) DeleteTree(directory string) error {
+ delOpts := &etcd.DeleteOptions{
+ Recursive: true,
+ }
+
+ _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts)
+ if keyNotFound(err) {
+ return store.ErrKeyNotFound
+ }
+ return err
+}
+
+// NewLock returns a handle to a lock struct which can
+// be used to provide mutual exclusion on a key
+func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
+ var value string
+ ttl := defaultLockTTL
+ renewCh := make(chan struct{})
+
+ // Apply options on Lock
+ if options != nil {
+ if options.Value != nil {
+ value = string(options.Value)
+ }
+ if options.TTL != 0 {
+ ttl = options.TTL
+ }
+ if options.RenewLock != nil {
+ renewCh = options.RenewLock
+ }
+ }
+
+ // Create lock object
+ lock = &etcdLock{
+ client: s.client,
+ stopRenew: renewCh,
+ key: s.normalize(key),
+ value: value,
+ ttl: ttl,
+ }
+
+ return lock, nil
+}
+
+// Lock attempts to acquire the lock and blocks while
+// doing so. It returns a channel that is closed if our
+// lock is lost or if an error occurs
+func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
+
+ // Lock holder channel
+ lockHeld := make(chan struct{})
+ stopLocking := l.stopRenew
+
+ setOpts := &etcd.SetOptions{
+ TTL: l.ttl,
+ }
+
+ for {
+ setOpts.PrevExist = etcd.PrevNoExist
+ resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts)
+ if err != nil {
+ if etcdError, ok := err.(etcd.Error); ok {
+ if etcdError.Code != etcd.ErrorCodeNodeExist {
+ return nil, err
+ }
+ setOpts.PrevIndex = ^uint64(0)
+ }
+ } else {
+ setOpts.PrevIndex = resp.Node.ModifiedIndex
+ }
+
+ setOpts.PrevExist = etcd.PrevExist
+ l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts)
+
+ if err == nil {
+ // Leader section
+ l.stopLock = stopLocking
+ go l.holdLock(l.key, lockHeld, stopLocking)
+ break
+ } else {
+ // If this is a legitimate error, return
+ if etcdError, ok := err.(etcd.Error); ok {
+ if etcdError.Code != etcd.ErrorCodeTestFailed {
+ return nil, err
+ }
+ }
+
+ // Seeker section
+ errorCh := make(chan error)
+ chWStop := make(chan bool)
+ free := make(chan bool)
+
+ go l.waitLock(l.key, errorCh, chWStop, free)
+
+ // Wait for the key to be available or for
+ // a signal to stop trying to lock the key
+ select {
+ case <-free:
+ break
+ case err := <-errorCh:
+ return nil, err
+ case <-stopChan:
+ return nil, ErrAbortTryLock
+ }
+
+ // Delete or Expire event occurred
+ // Retry
+ }
+ }
+
+ return lockHeld, nil
+}
+
+// Hold the lock as long as we can
+// Updates the key ttl periodically until we receive
+// an explicit stop signal from the Unlock method
+func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) {
+ defer close(lockHeld)
+
+ update := time.NewTicker(l.ttl / 3)
+ defer update.Stop()
+
+ var err error
+ setOpts := &etcd.SetOptions{TTL: l.ttl}
+
+ for {
+ select {
+ case <-update.C:
+ setOpts.PrevIndex = l.last.Node.ModifiedIndex
+ l.last, err = l.client.Set(context.Background(), key, l.value, setOpts)
+ if err != nil {
+ return
+ }
+
+ case <-stopLocking:
+ return
+ }
+ }
+}
+
+// WaitLock simply waits for the key to be available for creation
+func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) {
+ opts := &etcd.WatcherOptions{Recursive: false}
+ watcher := l.client.Watcher(key, opts)
+
+ for {
+ event, err := watcher.Next(context.Background())
+ if err != nil {
+ errorCh <- err
+ return
+ }
+ if event.Action == "delete" || event.Action == "expire" {
+ free <- true
+ return
+ }
+ }
+}
+
+// Unlock the "key". Calling unlock while
+// not holding the lock will throw an error
+func (l *etcdLock) Unlock() error {
+ if l.stopLock != nil {
+ l.stopLock <- struct{}{}
+ }
+ if l.last != nil {
+ delOpts := &etcd.DeleteOptions{
+ PrevIndex: l.last.Node.ModifiedIndex,
+ }
+ _, err := l.client.Delete(context.Background(), l.key, delOpts)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Close closes the client connection
+func (s *Etcd) Close() {
+ return
+}
diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd_test.go b/vendor/github.com/docker/libkv/store/etcd/etcd_test.go
new file mode 100644
index 00000000..e2d224c8
--- /dev/null
+++ b/vendor/github.com/docker/libkv/store/etcd/etcd_test.go
@@ -0,0 +1,58 @@
+package etcd
+
+import (
+ "testing"
+ "time"
+
+ "github.com/docker/libkv"
+ "github.com/docker/libkv/store"
+ "github.com/docker/libkv/testutils"
+ "github.com/stretchr/testify/assert"
+)
+
+var (
+ client = "localhost:4001"
+)
+
+func makeEtcdClient(t *testing.T) store.Store {
+ kv, err := New(
+ []string{client},
+ &store.Config{
+ ConnectionTimeout: 3 * time.Second,
+ Username: "test",
+ Password: "very-secure",
+ },
+ )
+
+ if err != nil {
+ t.Fatalf("cannot create store: %v", err)
+ }
+
+ return kv
+}
+
+func TestRegister(t *testing.T) {
+ Register()
+
+ kv, err := libkv.NewStore(store.ETCD, []string{client}, nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, kv)
+
+ if _, ok := kv.(*Etcd); !ok {
+ t.Fatal("Error registering and initializing etcd")
+ }
+}
+
+func TestEtcdStore(t *testing.T) {
+ kv := makeEtcdClient(t)
+ lockKV := makeEtcdClient(t)
+ ttlKV := makeEtcdClient(t)
+
+ testutils.RunTestCommon(t, kv)
+ testutils.RunTestAtomic(t, kv)
+ testutils.RunTestWatch(t, kv)
+ testutils.RunTestLock(t, kv)
+ testutils.RunTestLockTTL(t, kv, lockKV)
+ testutils.RunTestTTL(t, kv, ttlKV)
+ testutils.RunCleanup(t, kv)
+}
diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go
new file mode 100644
index 00000000..ff8d4ebe
--- /dev/null
+++ b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go
@@ -0,0 +1,429 @@
+package zookeeper
+
+import (
+ "strings"
+ "time"
+
+ "github.com/docker/libkv"
+ "github.com/docker/libkv/store"
+ zk "github.com/samuel/go-zookeeper/zk"
+)
+
+const (
+ // SOH control character
+ SOH = "\x01"
+
+ defaultTimeout = 10 * time.Second
+)
+
+// Zookeeper is the receiver type for
+// the Store interface
+type Zookeeper struct {
+ timeout time.Duration
+ client *zk.Conn
+}
+
+type zookeeperLock struct {
+ client *zk.Conn
+ lock *zk.Lock
+ key string
+ value []byte
+}
+
+// Register registers zookeeper to libkv
+func Register() {
+ libkv.AddStore(store.ZK, New)
+}
+
+// New creates a new Zookeeper client given a
+// list of endpoints and an optional tls config
+func New(endpoints []string, options *store.Config) (store.Store, error) {
+ s := &Zookeeper{}
+ s.timeout = defaultTimeout
+
+ // Set options
+ if options != nil {
+ if options.ConnectionTimeout != 0 {
+ s.setTimeout(options.ConnectionTimeout)
+ }
+ }
+
+ // Connect to Zookeeper
+ conn, _, err := zk.Connect(endpoints, s.timeout)
+ if err != nil {
+ return nil, err
+ }
+ s.client = conn
+
+ return s, nil
+}
+
+// setTimeout sets the timeout for connecting to Zookeeper
+func (s *Zookeeper) setTimeout(time time.Duration) {
+ s.timeout = time
+}
+
+// Get the value at "key", returns the last modified index
+// to use in conjunction to Atomic calls
+func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
+ resp, meta, err := s.client.Get(s.normalize(key))
+
+ if err != nil {
+ if err == zk.ErrNoNode {
+ return nil, store.ErrKeyNotFound
+ }
+ return nil, err
+ }
+
+ // FIXME handle very rare cases where Get returns the
+ // SOH control character instead of the actual value
+ if string(resp) == SOH {
+ return s.Get(store.Normalize(key))
+ }
+
+ pair = &store.KVPair{
+ Key: key,
+ Value: resp,
+ LastIndex: uint64(meta.Version),
+ }
+
+ return pair, nil
+}
+
+// createFullPath creates the entire path for a directory
+// that does not exist
+func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
+ for i := 1; i <= len(path); i++ {
+ newpath := "/" + strings.Join(path[:i], "/")
+ if i == len(path) && ephemeral {
+ _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
+ return err
+ }
+ _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
+ if err != nil {
+ // Skip if node already exists
+ if err != zk.ErrNodeExists {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// Put a value at "key"
+func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
+ fkey := s.normalize(key)
+
+ exists, err := s.Exists(key)
+ if err != nil {
+ return err
+ }
+
+ if !exists {
+ if opts != nil && opts.TTL > 0 {
+ s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
+ } else {
+ s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
+ }
+ }
+
+ _, err = s.client.Set(fkey, value, -1)
+ return err
+}
+
+// Delete a value at "key"
+func (s *Zookeeper) Delete(key string) error {
+ err := s.client.Delete(s.normalize(key), -1)
+ if err == zk.ErrNoNode {
+ return store.ErrKeyNotFound
+ }
+ return err
+}
+
+// Exists checks if the key exists inside the store
+func (s *Zookeeper) Exists(key string) (bool, error) {
+ exists, _, err := s.client.Exists(s.normalize(key))
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// Watch for changes on a "key"
+// It returns a channel that will receive changes or pass
+// on errors. Upon creation, the current value will first
+// be sent to the channel. Providing a non-nil stopCh can
+// be used to stop watching.
+func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
+ // Get the key first
+ pair, err := s.Get(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // Catch zk notifications and fire changes into the channel.
+ watchCh := make(chan *store.KVPair)
+ go func() {
+ defer close(watchCh)
+
+ // Get returns the current value to the channel prior
+ // to listening to any event that may occur on that key
+ watchCh <- pair
+ for {
+ _, _, eventCh, err := s.client.GetW(s.normalize(key))
+ if err != nil {
+ return
+ }
+ select {
+ case e := <-eventCh:
+ if e.Type == zk.EventNodeDataChanged {
+ if entry, err := s.Get(key); err == nil {
+ watchCh <- entry
+ }
+ }
+ case <-stopCh:
+ // There is no way to stop GetW so just quit
+ return
+ }
+ }
+ }()
+
+ return watchCh, nil
+}
+
+// WatchTree watches for changes on a "directory"
+// It returns a channel that will receive changes or pass
+// on errors. Upon creating a watch, the current childs values
+// will be sent to the channel .Providing a non-nil stopCh can
+// be used to stop watching.
+func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
+ // List the childrens first
+ entries, err := s.List(directory)
+ if err != nil {
+ return nil, err
+ }
+
+ // Catch zk notifications and fire changes into the channel.
+ watchCh := make(chan []*store.KVPair)
+ go func() {
+ defer close(watchCh)
+
+ // List returns the children values to the channel
+ // prior to listening to any events that may occur
+ // on those keys
+ watchCh <- entries
+
+ for {
+ _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
+ if err != nil {
+ return
+ }
+ select {
+ case e := <-eventCh:
+ if e.Type == zk.EventNodeChildrenChanged {
+ if kv, err := s.List(directory); err == nil {
+ watchCh <- kv
+ }
+ }
+ case <-stopCh:
+ // There is no way to stop GetW so just quit
+ return
+ }
+ }
+ }()
+
+ return watchCh, nil
+}
+
+// List child nodes of a given directory
+func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
+ keys, stat, err := s.client.Children(s.normalize(directory))
+ if err != nil {
+ if err == zk.ErrNoNode {
+ return nil, store.ErrKeyNotFound
+ }
+ return nil, err
+ }
+
+ kv := []*store.KVPair{}
+
+ // FIXME Costly Get request for each child key..
+ for _, key := range keys {
+ pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
+ if err != nil {
+ // If node is not found: List is out of date, retry
+ if err == store.ErrKeyNotFound {
+ return s.List(directory)
+ }
+ return nil, err
+ }
+
+ kv = append(kv, &store.KVPair{
+ Key: key,
+ Value: []byte(pair.Value),
+ LastIndex: uint64(stat.Version),
+ })
+ }
+
+ return kv, nil
+}
+
+// DeleteTree deletes a range of keys under a given directory
+func (s *Zookeeper) DeleteTree(directory string) error {
+ pairs, err := s.List(directory)
+ if err != nil {
+ return err
+ }
+
+ var reqs []interface{}
+
+ for _, pair := range pairs {
+ reqs = append(reqs, &zk.DeleteRequest{
+ Path: s.normalize(directory + "/" + pair.Key),
+ Version: -1,
+ })
+ }
+
+ _, err = s.client.Multi(reqs...)
+ return err
+}
+
+// AtomicPut put a value at "key" if the key has not been
+// modified in the meantime, throws an error if this is the case
+func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
+ var lastIndex uint64
+
+ if previous != nil {
+ meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex))
+ if err != nil {
+ // Compare Failed
+ if err == zk.ErrBadVersion {
+ return false, nil, store.ErrKeyModified
+ }
+ return false, nil, err
+ }
+ lastIndex = uint64(meta.Version)
+ } else {
+ // Interpret previous == nil as create operation.
+ _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll))
+ if err != nil {
+ // Directory does not exist
+ if err == zk.ErrNoNode {
+
+ // Create the directory
+ parts := store.SplitKey(strings.TrimSuffix(key, "/"))
+ parts = parts[:len(parts)-1]
+ if err = s.createFullPath(parts, false); err != nil {
+ // Failed to create the directory.
+ return false, nil, err
+ }
+
+ // Create the node
+ if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
+ // Node exist error (when previous nil)
+ if err == zk.ErrNodeExists {
+ return false, nil, store.ErrKeyExists
+ }
+ return false, nil, err
+ }
+
+ } else {
+ // Node Exists error (when previous nil)
+ if err == zk.ErrNodeExists {
+ return false, nil, store.ErrKeyExists
+ }
+
+ // Unhandled error
+ return false, nil, err
+ }
+ }
+ lastIndex = 0 // Newly created nodes have version 0.
+ }
+
+ pair := &store.KVPair{
+ Key: key,
+ Value: value,
+ LastIndex: lastIndex,
+ }
+
+ return true, pair, nil
+}
+
+// AtomicDelete deletes a value at "key" if the key
+// has not been modified in the meantime, throws an
+// error if this is the case
+func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
+ if previous == nil {
+ return false, store.ErrPreviousNotSpecified
+ }
+
+ err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
+ if err != nil {
+ // Key not found
+ if err == zk.ErrNoNode {
+ return false, store.ErrKeyNotFound
+ }
+ // Compare failed
+ if err == zk.ErrBadVersion {
+ return false, store.ErrKeyModified
+ }
+ // General store error
+ return false, err
+ }
+ return true, nil
+}
+
+// NewLock returns a handle to a lock struct which can
+// be used to provide mutual exclusion on a key
+func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
+ value := []byte("")
+
+ // Apply options
+ if options != nil {
+ if options.Value != nil {
+ value = options.Value
+ }
+ }
+
+ lock = &zookeeperLock{
+ client: s.client,
+ key: s.normalize(key),
+ value: value,
+ lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)),
+ }
+
+ return lock, err
+}
+
+// Lock attempts to acquire the lock and blocks while
+// doing so. It returns a channel that is closed if our
+// lock is lost or if an error occurs
+func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
+ err := l.lock.Lock()
+
+ if err == nil {
+ // We hold the lock, we can set our value
+ // FIXME: The value is left behind
+ // (problematic for leader election)
+ _, err = l.client.Set(l.key, l.value, -1)
+ }
+
+ return make(chan struct{}), err
+}
+
+// Unlock the "key". Calling unlock while
+// not holding the lock will throw an error
+func (l *zookeeperLock) Unlock() error {
+ return l.lock.Unlock()
+}
+
+// Close closes the client connection
+func (s *Zookeeper) Close() {
+ s.client.Close()
+}
+
+// Normalize the key for usage in Zookeeper
+func (s *Zookeeper) normalize(key string) string {
+ key = store.Normalize(key)
+ return strings.TrimSuffix(key, "/")
+}
diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go
new file mode 100644
index 00000000..c36087e0
--- /dev/null
+++ b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go
@@ -0,0 +1,54 @@
+package zookeeper
+
+import (
+ "testing"
+ "time"
+
+ "github.com/docker/libkv"
+ "github.com/docker/libkv/store"
+ "github.com/docker/libkv/testutils"
+ "github.com/stretchr/testify/assert"
+)
+
+var (
+ client = "localhost:2181"
+)
+
+func makeZkClient(t *testing.T) store.Store {
+ kv, err := New(
+ []string{client},
+ &store.Config{
+ ConnectionTimeout: 3 * time.Second,
+ },
+ )
+
+ if err != nil {
+ t.Fatalf("cannot create store: %v", err)
+ }
+
+ return kv
+}
+
+func TestRegister(t *testing.T) {
+ Register()
+
+ kv, err := libkv.NewStore(store.ZK, []string{client}, nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, kv)
+
+ if _, ok := kv.(*Zookeeper); !ok {
+ t.Fatal("Error registering and initializing zookeeper")
+ }
+}
+
+func TestZkStore(t *testing.T) {
+ kv := makeZkClient(t)
+ ttlKV := makeZkClient(t)
+
+ testutils.RunTestCommon(t, kv)
+ testutils.RunTestAtomic(t, kv)
+ testutils.RunTestWatch(t, kv)
+ testutils.RunTestLock(t, kv)
+ testutils.RunTestTTL(t, kv, ttlKV)
+ testutils.RunCleanup(t, kv)
+}