diff options
| author | Dave Henderson <dhenderson@gmail.com> | 2017-08-02 22:03:04 -0400 |
|---|---|---|
| committer | Dave Henderson <dhenderson@gmail.com> | 2017-08-02 22:03:04 -0400 |
| commit | bb8df1b9ab32d8559a89f576cd3a9e39bcf2b4e0 (patch) | |
| tree | 2ccbbe9ed5efd47cb01c59634cf8788b1791d2bc /vendor/github.com/docker | |
| parent | ab59ea091a2374653ea8aeb5ed7675de61c08461 (diff) | |
Updating dependencies
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'vendor/github.com/docker')
4 files changed, 1147 insertions, 0 deletions
diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd.go b/vendor/github.com/docker/libkv/store/etcd/etcd.go new file mode 100644 index 00000000..c932ca66 --- /dev/null +++ b/vendor/github.com/docker/libkv/store/etcd/etcd.go @@ -0,0 +1,606 @@ +package etcd + +import ( + "crypto/tls" + "errors" + "log" + "net" + "net/http" + "strings" + "time" + + "golang.org/x/net/context" + + etcd "github.com/coreos/etcd/client" + "github.com/docker/libkv" + "github.com/docker/libkv/store" +) + +var ( + // ErrAbortTryLock is thrown when a user stops trying to seek the lock + // by sending a signal to the stop chan, this is used to verify if the + // operation succeeded + ErrAbortTryLock = errors.New("lock operation aborted") +) + +// Etcd is the receiver type for the +// Store interface +type Etcd struct { + client etcd.KeysAPI +} + +type etcdLock struct { + client etcd.KeysAPI + stopLock chan struct{} + stopRenew chan struct{} + key string + value string + last *etcd.Response + ttl time.Duration +} + +const ( + periodicSync = 5 * time.Minute + defaultLockTTL = 20 * time.Second + defaultUpdateTime = 5 * time.Second +) + +// Register registers etcd to libkv +func Register() { + libkv.AddStore(store.ETCD, New) +} + +// New creates a new Etcd client given a list +// of endpoints and an optional tls config +func New(addrs []string, options *store.Config) (store.Store, error) { + s := &Etcd{} + + var ( + entries []string + err error + ) + + entries = store.CreateEndpoints(addrs, "http") + cfg := &etcd.Config{ + Endpoints: entries, + Transport: etcd.DefaultTransport, + HeaderTimeoutPerRequest: 3 * time.Second, + } + + // Set options + if options != nil { + if options.TLS != nil { + setTLS(cfg, options.TLS, addrs) + } + if options.ConnectionTimeout != 0 { + setTimeout(cfg, options.ConnectionTimeout) + } + if options.Username != "" { + setCredentials(cfg, options.Username, options.Password) + } + } + + c, err := etcd.New(*cfg) + if err != nil { + log.Fatal(err) + } + + s.client = etcd.NewKeysAPI(c) + + // Periodic Cluster Sync + go func() { + for { + if err := c.AutoSync(context.Background(), periodicSync); err != nil { + return + } + } + }() + + return s, nil +} + +// SetTLS sets the tls configuration given a tls.Config scheme +func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) { + entries := store.CreateEndpoints(addrs, "https") + cfg.Endpoints = entries + + // Set transport + t := http.Transport{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tls, + } + + cfg.Transport = &t +} + +// setTimeout sets the timeout used for connecting to the store +func setTimeout(cfg *etcd.Config, time time.Duration) { + cfg.HeaderTimeoutPerRequest = time +} + +// setCredentials sets the username/password credentials for connecting to Etcd +func setCredentials(cfg *etcd.Config, username, password string) { + cfg.Username = username + cfg.Password = password +} + +// Normalize the key for usage in Etcd +func (s *Etcd) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimPrefix(key, "/") +} + +// keyNotFound checks on the error returned by the KeysAPI +// to verify if the key exists in the store or not +func keyNotFound(err error) bool { + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code == etcd.ErrorCodeKeyNotFound || + etcdError.Code == etcd.ErrorCodeNotFile || + etcdError.Code == etcd.ErrorCodeNotDir { + return true + } + } + } + return false +} + +// Get the value at "key", returns the last modified +// index to use in conjunction to Atomic calls +func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { + getOpts := &etcd.GetOptions{ + Quorum: true, + } + + result, err := s.client.Get(context.Background(), s.normalize(key), getOpts) + if err != nil { + if keyNotFound(err) { + return nil, store.ErrKeyNotFound + } + return nil, err + } + + pair = &store.KVPair{ + Key: key, + Value: []byte(result.Node.Value), + LastIndex: result.Node.ModifiedIndex, + } + + return pair, nil +} + +// Put a value at "key" +func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { + setOpts := &etcd.SetOptions{} + + // Set options + if opts != nil { + setOpts.Dir = opts.IsDir + setOpts.TTL = opts.TTL + } + + _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) + return err +} + +// Delete a value at "key" +func (s *Etcd) Delete(key string) error { + opts := &etcd.DeleteOptions{ + Recursive: false, + } + + _, err := s.client.Delete(context.Background(), s.normalize(key), opts) + if keyNotFound(err) { + return store.ErrKeyNotFound + } + return err +} + +// Exists checks if the key exists inside the store +func (s *Etcd) Exists(key string) (bool, error) { + _, err := s.Get(key) + if err != nil { + if err == store.ErrKeyNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// Watch for changes on a "key" +// It returns a channel that will receive changes or pass +// on errors. Upon creation, the current value will first +// be sent to the channel. Providing a non-nil stopCh can +// be used to stop watching. +func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + opts := &etcd.WatcherOptions{Recursive: false} + watcher := s.client.Watcher(s.normalize(key), opts) + + // watchCh is sending back events to the caller + watchCh := make(chan *store.KVPair) + + go func() { + defer close(watchCh) + + // Get the current value + pair, err := s.Get(key) + if err != nil { + return + } + + // Push the current value through the channel. + watchCh <- pair + + for { + // Check if the watch was stopped by the caller + select { + case <-stopCh: + return + default: + } + + result, err := watcher.Next(context.Background()) + + if err != nil { + return + } + + watchCh <- &store.KVPair{ + Key: key, + Value: []byte(result.Node.Value), + LastIndex: result.Node.ModifiedIndex, + } + } + }() + + return watchCh, nil +} + +// WatchTree watches for changes on a "directory" +// It returns a channel that will receive changes or pass +// on errors. Upon creating a watch, the current childs values +// will be sent to the channel. Providing a non-nil stopCh can +// be used to stop watching. +func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + watchOpts := &etcd.WatcherOptions{Recursive: true} + watcher := s.client.Watcher(s.normalize(directory), watchOpts) + + // watchCh is sending back events to the caller + watchCh := make(chan []*store.KVPair) + + go func() { + defer close(watchCh) + + // Get child values + list, err := s.List(directory) + if err != nil { + return + } + + // Push the current value through the channel. + watchCh <- list + + for { + // Check if the watch was stopped by the caller + select { + case <-stopCh: + return + default: + } + + _, err := watcher.Next(context.Background()) + + if err != nil { + return + } + + list, err = s.List(directory) + if err != nil { + return + } + + watchCh <- list + } + }() + + return watchCh, nil +} + +// AtomicPut puts a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) { + var ( + meta *etcd.Response + err error + ) + + setOpts := &etcd.SetOptions{} + + if previous != nil { + setOpts.PrevExist = etcd.PrevExist + setOpts.PrevIndex = previous.LastIndex + if previous.Value != nil { + setOpts.PrevValue = string(previous.Value) + } + } else { + setOpts.PrevExist = etcd.PrevNoExist + } + + if opts != nil { + if opts.TTL > 0 { + setOpts.TTL = opts.TTL + } + } + + meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + // Compare failed + if etcdError.Code == etcd.ErrorCodeTestFailed { + return false, nil, store.ErrKeyModified + } + // Node exists error (when PrevNoExist) + if etcdError.Code == etcd.ErrorCodeNodeExist { + return false, nil, store.ErrKeyExists + } + } + return false, nil, err + } + + updated := &store.KVPair{ + Key: key, + Value: value, + LastIndex: meta.Node.ModifiedIndex, + } + + return true, updated, nil +} + +// AtomicDelete deletes a value at "key" if the key +// has not been modified in the meantime, throws an +// error if this is the case +func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { + if previous == nil { + return false, store.ErrPreviousNotSpecified + } + + delOpts := &etcd.DeleteOptions{} + + if previous != nil { + delOpts.PrevIndex = previous.LastIndex + if previous.Value != nil { + delOpts.PrevValue = string(previous.Value) + } + } + + _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts) + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + // Key Not Found + if etcdError.Code == etcd.ErrorCodeKeyNotFound { + return false, store.ErrKeyNotFound + } + // Compare failed + if etcdError.Code == etcd.ErrorCodeTestFailed { + return false, store.ErrKeyModified + } + } + return false, err + } + + return true, nil +} + +// List child nodes of a given directory +func (s *Etcd) List(directory string) ([]*store.KVPair, error) { + getOpts := &etcd.GetOptions{ + Quorum: true, + Recursive: true, + Sort: true, + } + + resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts) + if err != nil { + if keyNotFound(err) { + return nil, store.ErrKeyNotFound + } + return nil, err + } + + kv := []*store.KVPair{} + for _, n := range resp.Node.Nodes { + kv = append(kv, &store.KVPair{ + Key: n.Key, + Value: []byte(n.Value), + LastIndex: n.ModifiedIndex, + }) + } + return kv, nil +} + +// DeleteTree deletes a range of keys under a given directory +func (s *Etcd) DeleteTree(directory string) error { + delOpts := &etcd.DeleteOptions{ + Recursive: true, + } + + _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts) + if keyNotFound(err) { + return store.ErrKeyNotFound + } + return err +} + +// NewLock returns a handle to a lock struct which can +// be used to provide mutual exclusion on a key +func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { + var value string + ttl := defaultLockTTL + renewCh := make(chan struct{}) + + // Apply options on Lock + if options != nil { + if options.Value != nil { + value = string(options.Value) + } + if options.TTL != 0 { + ttl = options.TTL + } + if options.RenewLock != nil { + renewCh = options.RenewLock + } + } + + // Create lock object + lock = &etcdLock{ + client: s.client, + stopRenew: renewCh, + key: s.normalize(key), + value: value, + ttl: ttl, + } + + return lock, nil +} + +// Lock attempts to acquire the lock and blocks while +// doing so. It returns a channel that is closed if our +// lock is lost or if an error occurs +func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { + + // Lock holder channel + lockHeld := make(chan struct{}) + stopLocking := l.stopRenew + + setOpts := &etcd.SetOptions{ + TTL: l.ttl, + } + + for { + setOpts.PrevExist = etcd.PrevNoExist + resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts) + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code != etcd.ErrorCodeNodeExist { + return nil, err + } + setOpts.PrevIndex = ^uint64(0) + } + } else { + setOpts.PrevIndex = resp.Node.ModifiedIndex + } + + setOpts.PrevExist = etcd.PrevExist + l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts) + + if err == nil { + // Leader section + l.stopLock = stopLocking + go l.holdLock(l.key, lockHeld, stopLocking) + break + } else { + // If this is a legitimate error, return + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code != etcd.ErrorCodeTestFailed { + return nil, err + } + } + + // Seeker section + errorCh := make(chan error) + chWStop := make(chan bool) + free := make(chan bool) + + go l.waitLock(l.key, errorCh, chWStop, free) + + // Wait for the key to be available or for + // a signal to stop trying to lock the key + select { + case <-free: + break + case err := <-errorCh: + return nil, err + case <-stopChan: + return nil, ErrAbortTryLock + } + + // Delete or Expire event occurred + // Retry + } + } + + return lockHeld, nil +} + +// Hold the lock as long as we can +// Updates the key ttl periodically until we receive +// an explicit stop signal from the Unlock method +func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) { + defer close(lockHeld) + + update := time.NewTicker(l.ttl / 3) + defer update.Stop() + + var err error + setOpts := &etcd.SetOptions{TTL: l.ttl} + + for { + select { + case <-update.C: + setOpts.PrevIndex = l.last.Node.ModifiedIndex + l.last, err = l.client.Set(context.Background(), key, l.value, setOpts) + if err != nil { + return + } + + case <-stopLocking: + return + } + } +} + +// WaitLock simply waits for the key to be available for creation +func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) { + opts := &etcd.WatcherOptions{Recursive: false} + watcher := l.client.Watcher(key, opts) + + for { + event, err := watcher.Next(context.Background()) + if err != nil { + errorCh <- err + return + } + if event.Action == "delete" || event.Action == "expire" { + free <- true + return + } + } +} + +// Unlock the "key". Calling unlock while +// not holding the lock will throw an error +func (l *etcdLock) Unlock() error { + if l.stopLock != nil { + l.stopLock <- struct{}{} + } + if l.last != nil { + delOpts := &etcd.DeleteOptions{ + PrevIndex: l.last.Node.ModifiedIndex, + } + _, err := l.client.Delete(context.Background(), l.key, delOpts) + if err != nil { + return err + } + } + return nil +} + +// Close closes the client connection +func (s *Etcd) Close() { + return +} diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd_test.go b/vendor/github.com/docker/libkv/store/etcd/etcd_test.go new file mode 100644 index 00000000..e2d224c8 --- /dev/null +++ b/vendor/github.com/docker/libkv/store/etcd/etcd_test.go @@ -0,0 +1,58 @@ +package etcd + +import ( + "testing" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:4001" +) + +func makeEtcdClient(t *testing.T) store.Store { + kv, err := New( + []string{client}, + &store.Config{ + ConnectionTimeout: 3 * time.Second, + Username: "test", + Password: "very-secure", + }, + ) + + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.ETCD, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Etcd); !ok { + t.Fatal("Error registering and initializing etcd") + } +} + +func TestEtcdStore(t *testing.T) { + kv := makeEtcdClient(t) + lockKV := makeEtcdClient(t) + ttlKV := makeEtcdClient(t) + + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestLockTTL(t, kv, lockKV) + testutils.RunTestTTL(t, kv, ttlKV) + testutils.RunCleanup(t, kv) +} diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go new file mode 100644 index 00000000..ff8d4ebe --- /dev/null +++ b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -0,0 +1,429 @@ +package zookeeper + +import ( + "strings" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + zk "github.com/samuel/go-zookeeper/zk" +) + +const ( + // SOH control character + SOH = "\x01" + + defaultTimeout = 10 * time.Second +) + +// Zookeeper is the receiver type for +// the Store interface +type Zookeeper struct { + timeout time.Duration + client *zk.Conn +} + +type zookeeperLock struct { + client *zk.Conn + lock *zk.Lock + key string + value []byte +} + +// Register registers zookeeper to libkv +func Register() { + libkv.AddStore(store.ZK, New) +} + +// New creates a new Zookeeper client given a +// list of endpoints and an optional tls config +func New(endpoints []string, options *store.Config) (store.Store, error) { + s := &Zookeeper{} + s.timeout = defaultTimeout + + // Set options + if options != nil { + if options.ConnectionTimeout != 0 { + s.setTimeout(options.ConnectionTimeout) + } + } + + // Connect to Zookeeper + conn, _, err := zk.Connect(endpoints, s.timeout) + if err != nil { + return nil, err + } + s.client = conn + + return s, nil +} + +// setTimeout sets the timeout for connecting to Zookeeper +func (s *Zookeeper) setTimeout(time time.Duration) { + s.timeout = time +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to Atomic calls +func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { + resp, meta, err := s.client.Get(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } + return nil, err + } + + // FIXME handle very rare cases where Get returns the + // SOH control character instead of the actual value + if string(resp) == SOH { + return s.Get(store.Normalize(key)) + } + + pair = &store.KVPair{ + Key: key, + Value: resp, + LastIndex: uint64(meta.Version), + } + + return pair, nil +} + +// createFullPath creates the entire path for a directory +// that does not exist +func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { + for i := 1; i <= len(path); i++ { + newpath := "/" + strings.Join(path[:i], "/") + if i == len(path) && ephemeral { + _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + return err + } + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // Skip if node already exists + if err != zk.ErrNodeExists { + return err + } + } + } + return nil +} + +// Put a value at "key" +func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error { + fkey := s.normalize(key) + + exists, err := s.Exists(key) + if err != nil { + return err + } + + if !exists { + if opts != nil && opts.TTL > 0 { + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) + } else { + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) + } + } + + _, err = s.client.Set(fkey, value, -1) + return err +} + +// Delete a value at "key" +func (s *Zookeeper) Delete(key string) error { + err := s.client.Delete(s.normalize(key), -1) + if err == zk.ErrNoNode { + return store.ErrKeyNotFound + } + return err +} + +// Exists checks if the key exists inside the store +func (s *Zookeeper) Exists(key string) (bool, error) { + exists, _, err := s.client.Exists(s.normalize(key)) + if err != nil { + return false, err + } + return exists, nil +} + +// Watch for changes on a "key" +// It returns a channel that will receive changes or pass +// on errors. Upon creation, the current value will first +// be sent to the channel. Providing a non-nil stopCh can +// be used to stop watching. +func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + // Get the key first + pair, err := s.Get(key) + if err != nil { + return nil, err + } + + // Catch zk notifications and fire changes into the channel. + watchCh := make(chan *store.KVPair) + go func() { + defer close(watchCh) + + // Get returns the current value to the channel prior + // to listening to any event that may occur on that key + watchCh <- pair + for { + _, _, eventCh, err := s.client.GetW(s.normalize(key)) + if err != nil { + return + } + select { + case e := <-eventCh: + if e.Type == zk.EventNodeDataChanged { + if entry, err := s.Get(key); err == nil { + watchCh <- entry + } + } + case <-stopCh: + // There is no way to stop GetW so just quit + return + } + } + }() + + return watchCh, nil +} + +// WatchTree watches for changes on a "directory" +// It returns a channel that will receive changes or pass +// on errors. Upon creating a watch, the current childs values +// will be sent to the channel .Providing a non-nil stopCh can +// be used to stop watching. +func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + // List the childrens first + entries, err := s.List(directory) + if err != nil { + return nil, err + } + + // Catch zk notifications and fire changes into the channel. + watchCh := make(chan []*store.KVPair) + go func() { + defer close(watchCh) + + // List returns the children values to the channel + // prior to listening to any events that may occur + // on those keys + watchCh <- entries + + for { + _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) + if err != nil { + return + } + select { + case e := <-eventCh: + if e.Type == zk.EventNodeChildrenChanged { + if kv, err := s.List(directory); err == nil { + watchCh <- kv + } + } + case <-stopCh: + // There is no way to stop GetW so just quit + return + } + } + }() + + return watchCh, nil +} + +// List child nodes of a given directory +func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { + keys, stat, err := s.client.Children(s.normalize(directory)) + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } + return nil, err + } + + kv := []*store.KVPair{} + + // FIXME Costly Get request for each child key.. + for _, key := range keys { + pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) + if err != nil { + // If node is not found: List is out of date, retry + if err == store.ErrKeyNotFound { + return s.List(directory) + } + return nil, err + } + + kv = append(kv, &store.KVPair{ + Key: key, + Value: []byte(pair.Value), + LastIndex: uint64(stat.Version), + }) + } + + return kv, nil +} + +// DeleteTree deletes a range of keys under a given directory +func (s *Zookeeper) DeleteTree(directory string) error { + pairs, err := s.List(directory) + if err != nil { + return err + } + + var reqs []interface{} + + for _, pair := range pairs { + reqs = append(reqs, &zk.DeleteRequest{ + Path: s.normalize(directory + "/" + pair.Key), + Version: -1, + }) + } + + _, err = s.client.Multi(reqs...) + return err +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) { + var lastIndex uint64 + + if previous != nil { + meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex)) + if err != nil { + // Compare Failed + if err == zk.ErrBadVersion { + return false, nil, store.ErrKeyModified + } + return false, nil, err + } + lastIndex = uint64(meta.Version) + } else { + // Interpret previous == nil as create operation. + _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // Directory does not exist + if err == zk.ErrNoNode { + + // Create the directory + parts := store.SplitKey(strings.TrimSuffix(key, "/")) + parts = parts[:len(parts)-1] + if err = s.createFullPath(parts, false); err != nil { + // Failed to create the directory. + return false, nil, err + } + + // Create the node + if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { + // Node exist error (when previous nil) + if err == zk.ErrNodeExists { + return false, nil, store.ErrKeyExists + } + return false, nil, err + } + + } else { + // Node Exists error (when previous nil) + if err == zk.ErrNodeExists { + return false, nil, store.ErrKeyExists + } + + // Unhandled error + return false, nil, err + } + } + lastIndex = 0 // Newly created nodes have version 0. + } + + pair := &store.KVPair{ + Key: key, + Value: value, + LastIndex: lastIndex, + } + + return true, pair, nil +} + +// AtomicDelete deletes a value at "key" if the key +// has not been modified in the meantime, throws an +// error if this is the case +func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) { + if previous == nil { + return false, store.ErrPreviousNotSpecified + } + + err := s.client.Delete(s.normalize(key), int32(previous.LastIndex)) + if err != nil { + // Key not found + if err == zk.ErrNoNode { + return false, store.ErrKeyNotFound + } + // Compare failed + if err == zk.ErrBadVersion { + return false, store.ErrKeyModified + } + // General store error + return false, err + } + return true, nil +} + +// NewLock returns a handle to a lock struct which can +// be used to provide mutual exclusion on a key +func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { + value := []byte("") + + // Apply options + if options != nil { + if options.Value != nil { + value = options.Value + } + } + + lock = &zookeeperLock{ + client: s.client, + key: s.normalize(key), + value: value, + lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)), + } + + return lock, err +} + +// Lock attempts to acquire the lock and blocks while +// doing so. It returns a channel that is closed if our +// lock is lost or if an error occurs +func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { + err := l.lock.Lock() + + if err == nil { + // We hold the lock, we can set our value + // FIXME: The value is left behind + // (problematic for leader election) + _, err = l.client.Set(l.key, l.value, -1) + } + + return make(chan struct{}), err +} + +// Unlock the "key". Calling unlock while +// not holding the lock will throw an error +func (l *zookeeperLock) Unlock() error { + return l.lock.Unlock() +} + +// Close closes the client connection +func (s *Zookeeper) Close() { + s.client.Close() +} + +// Normalize the key for usage in Zookeeper +func (s *Zookeeper) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimSuffix(key, "/") +} diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go new file mode 100644 index 00000000..c36087e0 --- /dev/null +++ b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go @@ -0,0 +1,54 @@ +package zookeeper + +import ( + "testing" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:2181" +) + +func makeZkClient(t *testing.T) store.Store { + kv, err := New( + []string{client}, + &store.Config{ + ConnectionTimeout: 3 * time.Second, + }, + ) + + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.ZK, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Zookeeper); !ok { + t.Fatal("Error registering and initializing zookeeper") + } +} + +func TestZkStore(t *testing.T) { + kv := makeZkClient(t) + ttlKV := makeZkClient(t) + + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, ttlKV) + testutils.RunCleanup(t, kv) +} |
