summaryrefslogtreecommitdiff
path: root/vendor/github.com/docker
diff options
context:
space:
mode:
authorDave Henderson <dhenderson@gmail.com>2017-12-05 08:10:13 -0500
committerDave Henderson <dhenderson@gmail.com>2017-12-05 08:10:13 -0500
commit149f2b13d110f34f048e5942466faea4e1a4a870 (patch)
tree41a259362fefcfc99f95b26fda532a26fd4ba4f0 /vendor/github.com/docker
parentf6e20ca5ecb47c067fccca8d61e937f35348e7a5 (diff)
Pruning dependencies with `dep prune`
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'vendor/github.com/docker')
-rw-r--r--vendor/github.com/docker/libkv/docs/compatibility.md82
-rw-r--r--vendor/github.com/docker/libkv/docs/examples.md157
-rw-r--r--vendor/github.com/docker/libkv/script/.validate33
-rwxr-xr-xvendor/github.com/docker/libkv/script/coverage21
-rwxr-xr-xvendor/github.com/docker/libkv/script/travis_consul.sh18
-rwxr-xr-xvendor/github.com/docker/libkv/script/travis_etcd.sh11
-rwxr-xr-xvendor/github.com/docker/libkv/script/travis_zk.sh12
-rwxr-xr-xvendor/github.com/docker/libkv/script/validate-gofmt30
-rw-r--r--vendor/github.com/docker/libkv/store/etcd/etcd.go606
-rw-r--r--vendor/github.com/docker/libkv/store/etcd/etcd_test.go58
-rw-r--r--vendor/github.com/docker/libkv/store/mock/mock.go113
-rw-r--r--vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go429
-rw-r--r--vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go54
-rw-r--r--vendor/github.com/docker/libkv/testutils/utils.go622
14 files changed, 0 insertions, 2246 deletions
diff --git a/vendor/github.com/docker/libkv/docs/compatibility.md b/vendor/github.com/docker/libkv/docs/compatibility.md
deleted file mode 100644
index c4f27e9c..00000000
--- a/vendor/github.com/docker/libkv/docs/compatibility.md
+++ /dev/null
@@ -1,82 +0,0 @@
-#Cross-Backend Compatibility
-
-The value of `libkv` is not to duplicate the code for programs that should support multiple distributed K/V stores like the classic `Consul`/`etcd`/`zookeeper` trio.
-
-This document provides with general guidelines for users willing to support those backends with the same code using `libkv`.
-
-Please note that most of those workarounds are going to disappear in the future with `etcd` APIv3.
-
-##Etcd directory/key distinction
-
-`etcd` with APIv2 makes the distinction between keys and directories. The result with `libkv` is that when using the etcd driver:
-
-- You cannot store values on directories
-- You cannot invoke `WatchTree` (watching on child values), on a regular key
-
-This is fundamentaly different than `Consul` and `zookeeper` which are more permissive and allow the same set of operations on keys and directories (called a Node for zookeeper).
-
-Apiv3 is in the work for `etcd`, which removes this key/directory distinction, but until then you should follow these workarounds to make your `libkv` code work across backends.
-
-###Put
-
-`etcd` cannot put values on directories, so this puts a major restriction compared to `Consul` and `zookeeper`.
-
-If you want to support all those three backends, you should make sure to only put data on **leaves**.
-
-For example:
-
-```go
-_ := kv.Put("path/to/key/bis", []byte("foo"), nil)
-_ := kv.Put("path/to/key", []byte("bar"), nil)
-```
-
-Will work on `Consul` and `zookeeper` but fail for `etcd`. This is because the first `Put` in the case of `etcd` will recursively create the directory hierarchy and `path/to/key` is now considered as a directory. Thus, values should always be stored on leaves if the support for the three backends is planned.
-
-###WatchTree
-
-When initializing the `WatchTree`, the natural way to do so is through the following code:
-
-```go
-key := "path/to/key"
-if !kv.Exists(key) {
- err := kv.Put(key, []byte("data"), nil)
-}
-events, err := kv.WatchTree(key, nil)
-```
-
-The code above will not work across backends and etcd will fail on the `WatchTree` call. What happens exactly:
-
-- `Consul` will create a regular `key` because it has no distinction between directories and keys. This is not an issue as we can invoke `WatchTree` on regular keys.
-- `zookeeper` is going to create a `node` that can either be a directory or a key during the lifetime of a program but it does not matter as a directory can hold values and be watchable like a regular key.
-- `etcd` is going to create a regular `key`. We cannot invoke `WatchTree` on regular keys using etcd.
-
-To be cross-compatible between those three backends for `WatchTree`, we need to enforce a parameter that is only interpreted with `etcd` and which tells the client to create a `directory` instead of a key.
-
-```go
-key := "path/to/key"
-if !kv.Exists(key) {
- // We enforce IsDir = true to make sure etcd creates a directory
- err := kv.Put(key, []byte("data"), &store.WriteOptions{IsDir:true})
-}
-events, err := kv.WatchTree(key, nil)
-```
-
-The code above will work for the three backends but make sure to not try to store any value at that path as the call to `Put` will fail for `etcd` (you can only put at `path/to/key/foo`, `path/to/key/bar` for example).
-
-##Etcd distributed locking
-
-There is `Lock` mechanisms baked in the `coreos/etcd/client` for now. Instead, `libkv` has its own implementation of a `Lock` on top of `etcd`.
-
-The general workflow for the `Lock` is as follows:
-
-- Call Lock concurrently on a `key` between threads/programs
-- Only one will create that key, others are going to fail because the key has already been created
-- The thread locking the key can get the right index to set the value of the key using Compare And Swap and effectively Lock and hold the key
-- Other threads are given a wrong index to fail the Compare and Swap and block until the key has been released by the thread holding the Lock
-- Lock seekers are setting up a Watch listening on that key and events happening on the key
-- When the thread/program stops holding the lock, it deletes the key triggering a `delete` event that will notify all the other threads. In case the program crashes, the key has a TTL attached that will send an `expire` event when this TTL expires.
-- Once everyone is notified, back to the first step. First come, first served with the Lock.
-
-The whole Lock process is highly dependent on the `delete`/`expire` events of `etcd`. So don't expect the key to be still there once the Lock is released.
-
-For example if the whole logic is to `Lock` a key and expect the value to still be there after it has been unlocked, it is not going to be cross-backend compatible with `Consul` and `zookeeper`. On the other end the `etcd` Lock can still be used to do Leader Election for example and still be cross-compatible with other backends. \ No newline at end of file
diff --git a/vendor/github.com/docker/libkv/docs/examples.md b/vendor/github.com/docker/libkv/docs/examples.md
deleted file mode 100644
index 09752db1..00000000
--- a/vendor/github.com/docker/libkv/docs/examples.md
+++ /dev/null
@@ -1,157 +0,0 @@
-#Examples
-
-This document contains useful example of usage for `libkv`. It might not be complete but provides with general informations on how to use the client.
-
-##Create a store and use Put/Get/Delete
-
-```go
-package main
-
-import (
- "fmt"
- "time"
- "log"
-
- "github.com/docker/libkv"
- "github.com/docker/libkv/store"
- "github.com/docker/libkv/store/consul"
-)
-
-func init() {
- // Register consul store to libkv
- consul.Register()
-
- // We can register as many backends that are supported by libkv
- etcd.Register()
- zookeeper.Register()
- boltdb.Register()
-}
-
-func main() {
- client := "localhost:8500"
-
- // Initialize a new store with consul
- kv, err := libkv.NewStore(
- store.CONSUL, // or "consul"
- []string{client},
- &store.Config{
- ConnectionTimeout: 10*time.Second,
- },
- )
- if err != nil {
- log.Fatal("Cannot create store consul")
- }
-
- key := "foo"
- err = kv.Put(key, []byte("bar"), nil)
- if err != nil {
- fmt.Errorf("Error trying to put value at key: %v", key)
- }
-
- pair, err := kv.Get(key)
- if err != nil {
- fmt.Errorf("Error trying accessing value at key: %v", key)
- }
-
- err = kv.Delete(key)
- if err != nil {
- fmt.Errorf("Error trying to delete key %v", key)
- }
-
- log.Info("value: ", string(pair.Value))
-}
-```
-
-##List keys
-
-```go
-// List will list all the keys under `key` if it contains a set of child keys/values
-entries, err := kv.List(key)
-for _, pair := range entries {
- fmt.Printf("key=%v - value=%v", pair.Key, string(pair.Value))
-}
-
-```
-
-##Watching for events on a single key (Watch)
-
-You can use watches to watch modifications on a key. First you need to check if the key exists. If this is not the case, we need to create it using the `Put` function.
-
-```go
-// Checking on the key before watching
-if !kv.Exists(key) {
- err := kv.Put(key, []byte("bar"), nil)
- if err != nil {
- fmt.Errorf("Something went wrong when initializing key %v", key)
- }
-}
-
-stopCh := make(<-chan struct{})
-events, err := kv.Watch(key, stopCh)
-
-select {
- case pair := <-events:
- // Do something with events
- fmt.Printf("value changed on key %v: new value=%v", key, pair.Value)
-}
-
-```
-
-##Watching for events happening on child keys (WatchTree)
-
-You can use watches to watch modifications on a key. First you need to check if the key exists. If this is not the case, we need to create it using the `Put` function. There is a special step here though if you want your code to work across backends. Because `etcd` is a special case and it makes the distinction between directories and keys, we need to make sure that the created key is considered as a directory by enforcing `IsDir` at `true`.
-
-```go
-// Checking on the key before watching
-if !kv.Exists(key) {
- // Don't forget IsDir:true if the code is used cross-backend
- err := kv.Put(key, []byte("bar"), &store.WriteOptions{IsDir:true})
- if err != nil {
- fmt.Errorf("Something went wrong when initializing key %v", key)
- }
-}
-
-stopCh := make(<-chan struct{})
-events, err := kv.WatchTree(key, stopCh)
-
-select {
- case pairs := <-events:
- // Do something with events
- for _, pair := range pairs {
- fmt.Printf("value changed on key %v: new value=%v", key, pair.Value)
- }
-}
-
-```
-
-## Distributed Locking, using Lock/Unlock
-
-```go
-key := "lockKey"
-value := []byte("bar")
-
-// Initialize a distributed lock. TTL is optional, it is here to make sure that
-// the lock is released after the program that is holding the lock ends or crashes
-lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second})
-if err != nil {
- fmt.Errorf("something went wrong when trying to initialize the Lock")
-}
-
-// Try to lock the key, the call to Lock() is blocking
-_, err := lock.Lock(nil)
-if err != nil {
- fmt.Errorf("something went wrong when trying to lock key %v", key)
-}
-
-// Get should work because we are holding the key
-pair, err := kv.Get(key)
-if err != nil {
- fmt.Errorf("key %v has value %v", key, pair.Value)
-}
-
-// Unlock the key
-err = lock.Unlock()
-if err != nil {
- fmt.Errorf("something went wrong when trying to unlock key %v", key)
-}
-``` \ No newline at end of file
diff --git a/vendor/github.com/docker/libkv/script/.validate b/vendor/github.com/docker/libkv/script/.validate
deleted file mode 100644
index 3767f422..00000000
--- a/vendor/github.com/docker/libkv/script/.validate
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-if [ -z "$VALIDATE_UPSTREAM" ]; then
- # this is kind of an expensive check, so let's not do this twice if we
- # are running more than one validate bundlescript
-
- VALIDATE_REPO='https://github.com/docker/libkv.git'
- VALIDATE_BRANCH='master'
-
- if [ "$TRAVIS" = 'true' -a "$TRAVIS_PULL_REQUEST" != 'false' ]; then
- VALIDATE_REPO="https://github.com/${TRAVIS_REPO_SLUG}.git"
- VALIDATE_BRANCH="${TRAVIS_BRANCH}"
- fi
-
- VALIDATE_HEAD="$(git rev-parse --verify HEAD)"
-
- git fetch -q "$VALIDATE_REPO" "refs/heads/$VALIDATE_BRANCH"
- VALIDATE_UPSTREAM="$(git rev-parse --verify FETCH_HEAD)"
-
- VALIDATE_COMMIT_LOG="$VALIDATE_UPSTREAM..$VALIDATE_HEAD"
- VALIDATE_COMMIT_DIFF="$VALIDATE_UPSTREAM...$VALIDATE_HEAD"
-
- validate_diff() {
- if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then
- git diff "$VALIDATE_COMMIT_DIFF" "$@"
- fi
- }
- validate_log() {
- if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then
- git log "$VALIDATE_COMMIT_LOG" "$@"
- fi
- }
-fi
diff --git a/vendor/github.com/docker/libkv/script/coverage b/vendor/github.com/docker/libkv/script/coverage
deleted file mode 100755
index a7a13f45..00000000
--- a/vendor/github.com/docker/libkv/script/coverage
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/bash
-
-MODE="mode: count"
-ROOT=${TRAVIS_BUILD_DIR:-.}/../../..
-
-# Grab the list of packages.
-# Exclude the API and CLI from coverage as it will be covered by integration tests.
-PACKAGES=`go list ./...`
-
-# Create the empty coverage file.
-echo $MODE > goverage.report
-
-# Run coverage on every package.
-for package in $PACKAGES; do
- output="$ROOT/$package/coverage.out"
-
- go test -test.short -covermode=count -coverprofile=$output $package
- if [ -f "$output" ] ; then
- cat "$output" | grep -v "$MODE" >> goverage.report
- fi
-done
diff --git a/vendor/github.com/docker/libkv/script/travis_consul.sh b/vendor/github.com/docker/libkv/script/travis_consul.sh
deleted file mode 100755
index 7b63d6b6..00000000
--- a/vendor/github.com/docker/libkv/script/travis_consul.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/bin/bash
-
-if [ $# -gt 0 ] ; then
- CONSUL_VERSION="$1"
-else
- CONSUL_VERSION="0.5.2"
-fi
-
-# install consul
-wget "https://releases.hashicorp.com/consul/${CONSUL_VERSION}/consul_${CONSUL_VERSION}_linux_amd64.zip"
-unzip "consul_${CONSUL_VERSION}_linux_amd64.zip"
-
-# make config for minimum ttl
-touch config.json
-echo "{\"session_ttl_min\": \"1s\"}" >> config.json
-
-# check
-./consul --version
diff --git a/vendor/github.com/docker/libkv/script/travis_etcd.sh b/vendor/github.com/docker/libkv/script/travis_etcd.sh
deleted file mode 100755
index bee8567f..00000000
--- a/vendor/github.com/docker/libkv/script/travis_etcd.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/bin/bash
-
-if [ $# -gt 0 ] ; then
- ETCD_VERSION="$1"
-else
- ETCD_VERSION="2.2.0"
-fi
-
-curl -L https://github.com/coreos/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz -o etcd-v$ETCD_VERSION-linux-amd64.tar.gz
-tar xzvf etcd-v$ETCD_VERSION-linux-amd64.tar.gz
-mv etcd-v$ETCD_VERSION-linux-amd64 etcd
diff --git a/vendor/github.com/docker/libkv/script/travis_zk.sh b/vendor/github.com/docker/libkv/script/travis_zk.sh
deleted file mode 100755
index 636a2407..00000000
--- a/vendor/github.com/docker/libkv/script/travis_zk.sh
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/bash
-
-if [ $# -gt 0 ] ; then
- ZK_VERSION="$1"
-else
- ZK_VERSION="3.4.7"
-fi
-
-wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-${ZK_VERSION}.tar.gz"
-tar -xvf "zookeeper-${ZK_VERSION}.tar.gz"
-mv zookeeper-$ZK_VERSION zk
-mv ./zk/conf/zoo_sample.cfg ./zk/conf/zoo.cfg
diff --git a/vendor/github.com/docker/libkv/script/validate-gofmt b/vendor/github.com/docker/libkv/script/validate-gofmt
deleted file mode 100755
index c565976b..00000000
--- a/vendor/github.com/docker/libkv/script/validate-gofmt
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/bin/bash
-
-source "$(dirname "$BASH_SOURCE")/.validate"
-
-IFS=$'\n'
-files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^Godeps/' || true) )
-unset IFS
-
-badFiles=()
-for f in "${files[@]}"; do
- # we use "git show" here to validate that what's committed is formatted
- if [ "$(git show "$VALIDATE_HEAD:$f" | gofmt -s -l)" ]; then
- badFiles+=( "$f" )
- fi
-done
-
-if [ ${#badFiles[@]} -eq 0 ]; then
- echo 'Congratulations! All Go source files are properly formatted.'
-else
- {
- echo "These files are not properly gofmt'd:"
- for f in "${badFiles[@]}"; do
- echo " - $f"
- done
- echo
- echo 'Please reformat the above files using "gofmt -s -w" and commit the result.'
- echo
- } >&2
- false
-fi
diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd.go b/vendor/github.com/docker/libkv/store/etcd/etcd.go
deleted file mode 100644
index c932ca66..00000000
--- a/vendor/github.com/docker/libkv/store/etcd/etcd.go
+++ /dev/null
@@ -1,606 +0,0 @@
-package etcd
-
-import (
- "crypto/tls"
- "errors"
- "log"
- "net"
- "net/http"
- "strings"
- "time"
-
- "golang.org/x/net/context"
-
- etcd "github.com/coreos/etcd/client"
- "github.com/docker/libkv"
- "github.com/docker/libkv/store"
-)
-
-var (
- // ErrAbortTryLock is thrown when a user stops trying to seek the lock
- // by sending a signal to the stop chan, this is used to verify if the
- // operation succeeded
- ErrAbortTryLock = errors.New("lock operation aborted")
-)
-
-// Etcd is the receiver type for the
-// Store interface
-type Etcd struct {
- client etcd.KeysAPI
-}
-
-type etcdLock struct {
- client etcd.KeysAPI
- stopLock chan struct{}
- stopRenew chan struct{}
- key string
- value string
- last *etcd.Response
- ttl time.Duration
-}
-
-const (
- periodicSync = 5 * time.Minute
- defaultLockTTL = 20 * time.Second
- defaultUpdateTime = 5 * time.Second
-)
-
-// Register registers etcd to libkv
-func Register() {
- libkv.AddStore(store.ETCD, New)
-}
-
-// New creates a new Etcd client given a list
-// of endpoints and an optional tls config
-func New(addrs []string, options *store.Config) (store.Store, error) {
- s := &Etcd{}
-
- var (
- entries []string
- err error
- )
-
- entries = store.CreateEndpoints(addrs, "http")
- cfg := &etcd.Config{
- Endpoints: entries,
- Transport: etcd.DefaultTransport,
- HeaderTimeoutPerRequest: 3 * time.Second,
- }
-
- // Set options
- if options != nil {
- if options.TLS != nil {
- setTLS(cfg, options.TLS, addrs)
- }
- if options.ConnectionTimeout != 0 {
- setTimeout(cfg, options.ConnectionTimeout)
- }
- if options.Username != "" {
- setCredentials(cfg, options.Username, options.Password)
- }
- }
-
- c, err := etcd.New(*cfg)
- if err != nil {
- log.Fatal(err)
- }
-
- s.client = etcd.NewKeysAPI(c)
-
- // Periodic Cluster Sync
- go func() {
- for {
- if err := c.AutoSync(context.Background(), periodicSync); err != nil {
- return
- }
- }
- }()
-
- return s, nil
-}
-
-// SetTLS sets the tls configuration given a tls.Config scheme
-func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) {
- entries := store.CreateEndpoints(addrs, "https")
- cfg.Endpoints = entries
-
- // Set transport
- t := http.Transport{
- Dial: (&net.Dialer{
- Timeout: 30 * time.Second,
- KeepAlive: 30 * time.Second,
- }).Dial,
- TLSHandshakeTimeout: 10 * time.Second,
- TLSClientConfig: tls,
- }
-
- cfg.Transport = &t
-}
-
-// setTimeout sets the timeout used for connecting to the store
-func setTimeout(cfg *etcd.Config, time time.Duration) {
- cfg.HeaderTimeoutPerRequest = time
-}
-
-// setCredentials sets the username/password credentials for connecting to Etcd
-func setCredentials(cfg *etcd.Config, username, password string) {
- cfg.Username = username
- cfg.Password = password
-}
-
-// Normalize the key for usage in Etcd
-func (s *Etcd) normalize(key string) string {
- key = store.Normalize(key)
- return strings.TrimPrefix(key, "/")
-}
-
-// keyNotFound checks on the error returned by the KeysAPI
-// to verify if the key exists in the store or not
-func keyNotFound(err error) bool {
- if err != nil {
- if etcdError, ok := err.(etcd.Error); ok {
- if etcdError.Code == etcd.ErrorCodeKeyNotFound ||
- etcdError.Code == etcd.ErrorCodeNotFile ||
- etcdError.Code == etcd.ErrorCodeNotDir {
- return true
- }
- }
- }
- return false
-}
-
-// Get the value at "key", returns the last modified
-// index to use in conjunction to Atomic calls
-func (s *Etcd) Get(key string) (pair *store.KVPair, err error) {
- getOpts := &etcd.GetOptions{
- Quorum: true,
- }
-
- result, err := s.client.Get(context.Background(), s.normalize(key), getOpts)
- if err != nil {
- if keyNotFound(err) {
- return nil, store.ErrKeyNotFound
- }
- return nil, err
- }
-
- pair = &store.KVPair{
- Key: key,
- Value: []byte(result.Node.Value),
- LastIndex: result.Node.ModifiedIndex,
- }
-
- return pair, nil
-}
-
-// Put a value at "key"
-func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
- setOpts := &etcd.SetOptions{}
-
- // Set options
- if opts != nil {
- setOpts.Dir = opts.IsDir
- setOpts.TTL = opts.TTL
- }
-
- _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts)
- return err
-}
-
-// Delete a value at "key"
-func (s *Etcd) Delete(key string) error {
- opts := &etcd.DeleteOptions{
- Recursive: false,
- }
-
- _, err := s.client.Delete(context.Background(), s.normalize(key), opts)
- if keyNotFound(err) {
- return store.ErrKeyNotFound
- }
- return err
-}
-
-// Exists checks if the key exists inside the store
-func (s *Etcd) Exists(key string) (bool, error) {
- _, err := s.Get(key)
- if err != nil {
- if err == store.ErrKeyNotFound {
- return false, nil
- }
- return false, err
- }
- return true, nil
-}
-
-// Watch for changes on a "key"
-// It returns a channel that will receive changes or pass
-// on errors. Upon creation, the current value will first
-// be sent to the channel. Providing a non-nil stopCh can
-// be used to stop watching.
-func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
- opts := &etcd.WatcherOptions{Recursive: false}
- watcher := s.client.Watcher(s.normalize(key), opts)
-
- // watchCh is sending back events to the caller
- watchCh := make(chan *store.KVPair)
-
- go func() {
- defer close(watchCh)
-
- // Get the current value
- pair, err := s.Get(key)
- if err != nil {
- return
- }
-
- // Push the current value through the channel.
- watchCh <- pair
-
- for {
- // Check if the watch was stopped by the caller
- select {
- case <-stopCh:
- return
- default:
- }
-
- result, err := watcher.Next(context.Background())
-
- if err != nil {
- return
- }
-
- watchCh <- &store.KVPair{
- Key: key,
- Value: []byte(result.Node.Value),
- LastIndex: result.Node.ModifiedIndex,
- }
- }
- }()
-
- return watchCh, nil
-}
-
-// WatchTree watches for changes on a "directory"
-// It returns a channel that will receive changes or pass
-// on errors. Upon creating a watch, the current childs values
-// will be sent to the channel. Providing a non-nil stopCh can
-// be used to stop watching.
-func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
- watchOpts := &etcd.WatcherOptions{Recursive: true}
- watcher := s.client.Watcher(s.normalize(directory), watchOpts)
-
- // watchCh is sending back events to the caller
- watchCh := make(chan []*store.KVPair)
-
- go func() {
- defer close(watchCh)
-
- // Get child values
- list, err := s.List(directory)
- if err != nil {
- return
- }
-
- // Push the current value through the channel.
- watchCh <- list
-
- for {
- // Check if the watch was stopped by the caller
- select {
- case <-stopCh:
- return
- default:
- }
-
- _, err := watcher.Next(context.Background())
-
- if err != nil {
- return
- }
-
- list, err = s.List(directory)
- if err != nil {
- return
- }
-
- watchCh <- list
- }
- }()
-
- return watchCh, nil
-}
-
-// AtomicPut puts a value at "key" if the key has not been
-// modified in the meantime, throws an error if this is the case
-func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) {
- var (
- meta *etcd.Response
- err error
- )
-
- setOpts := &etcd.SetOptions{}
-
- if previous != nil {
- setOpts.PrevExist = etcd.PrevExist
- setOpts.PrevIndex = previous.LastIndex
- if previous.Value != nil {
- setOpts.PrevValue = string(previous.Value)
- }
- } else {
- setOpts.PrevExist = etcd.PrevNoExist
- }
-
- if opts != nil {
- if opts.TTL > 0 {
- setOpts.TTL = opts.TTL
- }
- }
-
- meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts)
- if err != nil {
- if etcdError, ok := err.(etcd.Error); ok {
- // Compare failed
- if etcdError.Code == etcd.ErrorCodeTestFailed {
- return false, nil, store.ErrKeyModified
- }
- // Node exists error (when PrevNoExist)
- if etcdError.Code == etcd.ErrorCodeNodeExist {
- return false, nil, store.ErrKeyExists
- }
- }
- return false, nil, err
- }
-
- updated := &store.KVPair{
- Key: key,
- Value: value,
- LastIndex: meta.Node.ModifiedIndex,
- }
-
- return true, updated, nil
-}
-
-// AtomicDelete deletes a value at "key" if the key
-// has not been modified in the meantime, throws an
-// error if this is the case
-func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
- if previous == nil {
- return false, store.ErrPreviousNotSpecified
- }
-
- delOpts := &etcd.DeleteOptions{}
-
- if previous != nil {
- delOpts.PrevIndex = previous.LastIndex
- if previous.Value != nil {
- delOpts.PrevValue = string(previous.Value)
- }
- }
-
- _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts)
- if err != nil {
- if etcdError, ok := err.(etcd.Error); ok {
- // Key Not Found
- if etcdError.Code == etcd.ErrorCodeKeyNotFound {
- return false, store.ErrKeyNotFound
- }
- // Compare failed
- if etcdError.Code == etcd.ErrorCodeTestFailed {
- return false, store.ErrKeyModified
- }
- }
- return false, err
- }
-
- return true, nil
-}
-
-// List child nodes of a given directory
-func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
- getOpts := &etcd.GetOptions{
- Quorum: true,
- Recursive: true,
- Sort: true,
- }
-
- resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts)
- if err != nil {
- if keyNotFound(err) {
- return nil, store.ErrKeyNotFound
- }
- return nil, err
- }
-
- kv := []*store.KVPair{}
- for _, n := range resp.Node.Nodes {
- kv = append(kv, &store.KVPair{
- Key: n.Key,
- Value: []byte(n.Value),
- LastIndex: n.ModifiedIndex,
- })
- }
- return kv, nil
-}
-
-// DeleteTree deletes a range of keys under a given directory
-func (s *Etcd) DeleteTree(directory string) error {
- delOpts := &etcd.DeleteOptions{
- Recursive: true,
- }
-
- _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts)
- if keyNotFound(err) {
- return store.ErrKeyNotFound
- }
- return err
-}
-
-// NewLock returns a handle to a lock struct which can
-// be used to provide mutual exclusion on a key
-func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
- var value string
- ttl := defaultLockTTL
- renewCh := make(chan struct{})
-
- // Apply options on Lock
- if options != nil {
- if options.Value != nil {
- value = string(options.Value)
- }
- if options.TTL != 0 {
- ttl = options.TTL
- }
- if options.RenewLock != nil {
- renewCh = options.RenewLock
- }
- }
-
- // Create lock object
- lock = &etcdLock{
- client: s.client,
- stopRenew: renewCh,
- key: s.normalize(key),
- value: value,
- ttl: ttl,
- }
-
- return lock, nil
-}
-
-// Lock attempts to acquire the lock and blocks while
-// doing so. It returns a channel that is closed if our
-// lock is lost or if an error occurs
-func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
-
- // Lock holder channel
- lockHeld := make(chan struct{})
- stopLocking := l.stopRenew
-
- setOpts := &etcd.SetOptions{
- TTL: l.ttl,
- }
-
- for {
- setOpts.PrevExist = etcd.PrevNoExist
- resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts)
- if err != nil {
- if etcdError, ok := err.(etcd.Error); ok {
- if etcdError.Code != etcd.ErrorCodeNodeExist {
- return nil, err
- }
- setOpts.PrevIndex = ^uint64(0)
- }
- } else {
- setOpts.PrevIndex = resp.Node.ModifiedIndex
- }
-
- setOpts.PrevExist = etcd.PrevExist
- l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts)
-
- if err == nil {
- // Leader section
- l.stopLock = stopLocking
- go l.holdLock(l.key, lockHeld, stopLocking)
- break
- } else {
- // If this is a legitimate error, return
- if etcdError, ok := err.(etcd.Error); ok {
- if etcdError.Code != etcd.ErrorCodeTestFailed {
- return nil, err
- }
- }
-
- // Seeker section
- errorCh := make(chan error)
- chWStop := make(chan bool)
- free := make(chan bool)
-
- go l.waitLock(l.key, errorCh, chWStop, free)
-
- // Wait for the key to be available or for
- // a signal to stop trying to lock the key
- select {
- case <-free:
- break
- case err := <-errorCh:
- return nil, err
- case <-stopChan:
- return nil, ErrAbortTryLock
- }
-
- // Delete or Expire event occurred
- // Retry
- }
- }
-
- return lockHeld, nil
-}
-
-// Hold the lock as long as we can
-// Updates the key ttl periodically until we receive
-// an explicit stop signal from the Unlock method
-func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) {
- defer close(lockHeld)
-
- update := time.NewTicker(l.ttl / 3)
- defer update.Stop()
-
- var err error
- setOpts := &etcd.SetOptions{TTL: l.ttl}
-
- for {
- select {
- case <-update.C:
- setOpts.PrevIndex = l.last.Node.ModifiedIndex
- l.last, err = l.client.Set(context.Background(), key, l.value, setOpts)
- if err != nil {
- return
- }
-
- case <-stopLocking:
- return
- }
- }
-}
-
-// WaitLock simply waits for the key to be available for creation
-func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) {
- opts := &etcd.WatcherOptions{Recursive: false}
- watcher := l.client.Watcher(key, opts)
-
- for {
- event, err := watcher.Next(context.Background())
- if err != nil {
- errorCh <- err
- return
- }
- if event.Action == "delete" || event.Action == "expire" {
- free <- true
- return
- }
- }
-}
-
-// Unlock the "key". Calling unlock while
-// not holding the lock will throw an error
-func (l *etcdLock) Unlock() error {
- if l.stopLock != nil {
- l.stopLock <- struct{}{}
- }
- if l.last != nil {
- delOpts := &etcd.DeleteOptions{
- PrevIndex: l.last.Node.ModifiedIndex,
- }
- _, err := l.client.Delete(context.Background(), l.key, delOpts)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-// Close closes the client connection
-func (s *Etcd) Close() {
- return
-}
diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd_test.go b/vendor/github.com/docker/libkv/store/etcd/etcd_test.go
deleted file mode 100644
index e2d224c8..00000000
--- a/vendor/github.com/docker/libkv/store/etcd/etcd_test.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package etcd
-
-import (
- "testing"
- "time"
-
- "github.com/docker/libkv"
- "github.com/docker/libkv/store"
- "github.com/docker/libkv/testutils"
- "github.com/stretchr/testify/assert"
-)
-
-var (
- client = "localhost:4001"
-)
-
-func makeEtcdClient(t *testing.T) store.Store {
- kv, err := New(
- []string{client},
- &store.Config{
- ConnectionTimeout: 3 * time.Second,
- Username: "test",
- Password: "very-secure",
- },
- )
-
- if err != nil {
- t.Fatalf("cannot create store: %v", err)
- }
-
- return kv
-}
-
-func TestRegister(t *testing.T) {
- Register()
-
- kv, err := libkv.NewStore(store.ETCD, []string{client}, nil)
- assert.NoError(t, err)
- assert.NotNil(t, kv)
-
- if _, ok := kv.(*Etcd); !ok {
- t.Fatal("Error registering and initializing etcd")
- }
-}
-
-func TestEtcdStore(t *testing.T) {
- kv := makeEtcdClient(t)
- lockKV := makeEtcdClient(t)
- ttlKV := makeEtcdClient(t)
-
- testutils.RunTestCommon(t, kv)
- testutils.RunTestAtomic(t, kv)
- testutils.RunTestWatch(t, kv)
- testutils.RunTestLock(t, kv)
- testutils.RunTestLockTTL(t, kv, lockKV)
- testutils.RunTestTTL(t, kv, ttlKV)
- testutils.RunCleanup(t, kv)
-}
diff --git a/vendor/github.com/docker/libkv/store/mock/mock.go b/vendor/github.com/docker/libkv/store/mock/mock.go
deleted file mode 100644
index 82a5b03b..00000000
--- a/vendor/github.com/docker/libkv/store/mock/mock.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package mock
-
-import (
- "github.com/docker/libkv/store"
- "github.com/stretchr/testify/mock"
-)
-
-// Mock store. Mocks all Store functions using testify.Mock
-type Mock struct {
- mock.Mock
-
- // Endpoints passed to InitializeMock
- Endpoints []string
-
- // Options passed to InitializeMock
- Options *store.Config
-}
-
-// New creates a Mock store
-func New(endpoints []string, options *store.Config) (store.Store, error) {
- s := &Mock{}
- s.Endpoints = endpoints
- s.Options = options
- return s, nil
-}
-
-// Put mock
-func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error {
- args := s.Mock.Called(key, value, opts)
- return args.Error(0)
-}
-
-// Get mock
-func (s *Mock) Get(key string) (*store.KVPair, error) {
- args := s.Mock.Called(key)
- return args.Get(0).(*store.KVPair), args.Error(1)
-}
-
-// Delete mock
-func (s *Mock) Delete(key string) error {
- args := s.Mock.Called(key)
- return args.Error(0)
-}
-
-// Exists mock
-func (s *Mock) Exists(key string) (bool, error) {
- args := s.Mock.Called(key)
- return args.Bool(0), args.Error(1)
-}
-
-// Watch mock
-func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
- args := s.Mock.Called(key, stopCh)
- return args.Get(0).(<-chan *store.KVPair), args.Error(1)
-}
-
-// WatchTree mock
-func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
- args := s.Mock.Called(prefix, stopCh)
- return args.Get(0).(chan []*store.KVPair), args.Error(1)
-}
-
-// NewLock mock
-func (s *Mock) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
- args := s.Mock.Called(key, options)
- return args.Get(0).(store.Locker), args.Error(1)
-}
-
-// List mock
-func (s *Mock) List(prefix string) ([]*store.KVPair, error) {
- args := s.Mock.Called(prefix)
- return args.Get(0).([]*store.KVPair), args.Error(1)
-}
-
-// DeleteTree mock
-func (s *Mock) DeleteTree(prefix string) error {
- args := s.Mock.Called(prefix)
- return args.Error(0)
-}
-
-// AtomicPut mock
-func (s *Mock) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) {
- args := s.Mock.Called(key, value, previous, opts)
- return args.Bool(0), args.Get(1).(*store.KVPair), args.Error(2)
-}
-
-// AtomicDelete mock
-func (s *Mock) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
- args := s.Mock.Called(key, previous)
- return args.Bool(0), args.Error(1)
-}
-
-// Lock mock implementation of Locker
-type Lock struct {
- mock.Mock
-}
-
-// Lock mock
-func (l *Lock) Lock(stopCh chan struct{}) (<-chan struct{}, error) {
- args := l.Mock.Called(stopCh)
- return args.Get(0).(<-chan struct{}), args.Error(1)
-}
-
-// Unlock mock
-func (l *Lock) Unlock() error {
- args := l.Mock.Called()
- return args.Error(0)
-}
-
-// Close mock
-func (s *Mock) Close() {
- return
-}
diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go
deleted file mode 100644
index ff8d4ebe..00000000
--- a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go
+++ /dev/null
@@ -1,429 +0,0 @@
-package zookeeper
-
-import (
- "strings"
- "time"
-
- "github.com/docker/libkv"
- "github.com/docker/libkv/store"
- zk "github.com/samuel/go-zookeeper/zk"
-)
-
-const (
- // SOH control character
- SOH = "\x01"
-
- defaultTimeout = 10 * time.Second
-)
-
-// Zookeeper is the receiver type for
-// the Store interface
-type Zookeeper struct {
- timeout time.Duration
- client *zk.Conn
-}
-
-type zookeeperLock struct {
- client *zk.Conn
- lock *zk.Lock
- key string
- value []byte
-}
-
-// Register registers zookeeper to libkv
-func Register() {
- libkv.AddStore(store.ZK, New)
-}
-
-// New creates a new Zookeeper client given a
-// list of endpoints and an optional tls config
-func New(endpoints []string, options *store.Config) (store.Store, error) {
- s := &Zookeeper{}
- s.timeout = defaultTimeout
-
- // Set options
- if options != nil {
- if options.ConnectionTimeout != 0 {
- s.setTimeout(options.ConnectionTimeout)
- }
- }
-
- // Connect to Zookeeper
- conn, _, err := zk.Connect(endpoints, s.timeout)
- if err != nil {
- return nil, err
- }
- s.client = conn
-
- return s, nil
-}
-
-// setTimeout sets the timeout for connecting to Zookeeper
-func (s *Zookeeper) setTimeout(time time.Duration) {
- s.timeout = time
-}
-
-// Get the value at "key", returns the last modified index
-// to use in conjunction to Atomic calls
-func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
- resp, meta, err := s.client.Get(s.normalize(key))
-
- if err != nil {
- if err == zk.ErrNoNode {
- return nil, store.ErrKeyNotFound
- }
- return nil, err
- }
-
- // FIXME handle very rare cases where Get returns the
- // SOH control character instead of the actual value
- if string(resp) == SOH {
- return s.Get(store.Normalize(key))
- }
-
- pair = &store.KVPair{
- Key: key,
- Value: resp,
- LastIndex: uint64(meta.Version),
- }
-
- return pair, nil
-}
-
-// createFullPath creates the entire path for a directory
-// that does not exist
-func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
- for i := 1; i <= len(path); i++ {
- newpath := "/" + strings.Join(path[:i], "/")
- if i == len(path) && ephemeral {
- _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
- return err
- }
- _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
- if err != nil {
- // Skip if node already exists
- if err != zk.ErrNodeExists {
- return err
- }
- }
- }
- return nil
-}
-
-// Put a value at "key"
-func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
- fkey := s.normalize(key)
-
- exists, err := s.Exists(key)
- if err != nil {
- return err
- }
-
- if !exists {
- if opts != nil && opts.TTL > 0 {
- s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
- } else {
- s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
- }
- }
-
- _, err = s.client.Set(fkey, value, -1)
- return err
-}
-
-// Delete a value at "key"
-func (s *Zookeeper) Delete(key string) error {
- err := s.client.Delete(s.normalize(key), -1)
- if err == zk.ErrNoNode {
- return store.ErrKeyNotFound
- }
- return err
-}
-
-// Exists checks if the key exists inside the store
-func (s *Zookeeper) Exists(key string) (bool, error) {
- exists, _, err := s.client.Exists(s.normalize(key))
- if err != nil {
- return false, err
- }
- return exists, nil
-}
-
-// Watch for changes on a "key"
-// It returns a channel that will receive changes or pass
-// on errors. Upon creation, the current value will first
-// be sent to the channel. Providing a non-nil stopCh can
-// be used to stop watching.
-func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
- // Get the key first
- pair, err := s.Get(key)
- if err != nil {
- return nil, err
- }
-
- // Catch zk notifications and fire changes into the channel.
- watchCh := make(chan *store.KVPair)
- go func() {
- defer close(watchCh)
-
- // Get returns the current value to the channel prior
- // to listening to any event that may occur on that key
- watchCh <- pair
- for {
- _, _, eventCh, err := s.client.GetW(s.normalize(key))
- if err != nil {
- return
- }
- select {
- case e := <-eventCh:
- if e.Type == zk.EventNodeDataChanged {
- if entry, err := s.Get(key); err == nil {
- watchCh <- entry
- }
- }
- case <-stopCh:
- // There is no way to stop GetW so just quit
- return
- }
- }
- }()
-
- return watchCh, nil
-}
-
-// WatchTree watches for changes on a "directory"
-// It returns a channel that will receive changes or pass
-// on errors. Upon creating a watch, the current childs values
-// will be sent to the channel .Providing a non-nil stopCh can
-// be used to stop watching.
-func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
- // List the childrens first
- entries, err := s.List(directory)
- if err != nil {
- return nil, err
- }
-
- // Catch zk notifications and fire changes into the channel.
- watchCh := make(chan []*store.KVPair)
- go func() {
- defer close(watchCh)
-
- // List returns the children values to the channel
- // prior to listening to any events that may occur
- // on those keys
- watchCh <- entries
-
- for {
- _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
- if err != nil {
- return
- }
- select {
- case e := <-eventCh:
- if e.Type == zk.EventNodeChildrenChanged {
- if kv, err := s.List(directory); err == nil {
- watchCh <- kv
- }
- }
- case <-stopCh:
- // There is no way to stop GetW so just quit
- return
- }
- }
- }()
-
- return watchCh, nil
-}
-
-// List child nodes of a given directory
-func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
- keys, stat, err := s.client.Children(s.normalize(directory))
- if err != nil {
- if err == zk.ErrNoNode {
- return nil, store.ErrKeyNotFound
- }
- return nil, err
- }
-
- kv := []*store.KVPair{}
-
- // FIXME Costly Get request for each child key..
- for _, key := range keys {
- pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
- if err != nil {
- // If node is not found: List is out of date, retry
- if err == store.ErrKeyNotFound {
- return s.List(directory)
- }
- return nil, err
- }
-
- kv = append(kv, &store.KVPair{
- Key: key,
- Value: []byte(pair.Value),
- LastIndex: uint64(stat.Version),
- })
- }
-
- return kv, nil
-}
-
-// DeleteTree deletes a range of keys under a given directory
-func (s *Zookeeper) DeleteTree(directory string) error {
- pairs, err := s.List(directory)
- if err != nil {
- return err
- }
-
- var reqs []interface{}
-
- for _, pair := range pairs {
- reqs = append(reqs, &zk.DeleteRequest{
- Path: s.normalize(directory + "/" + pair.Key),
- Version: -1,
- })
- }
-
- _, err = s.client.Multi(reqs...)
- return err
-}
-
-// AtomicPut put a value at "key" if the key has not been
-// modified in the meantime, throws an error if this is the case
-func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
- var lastIndex uint64
-
- if previous != nil {
- meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex))
- if err != nil {
- // Compare Failed
- if err == zk.ErrBadVersion {
- return false, nil, store.ErrKeyModified
- }
- return false, nil, err
- }
- lastIndex = uint64(meta.Version)
- } else {
- // Interpret previous == nil as create operation.
- _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll))
- if err != nil {
- // Directory does not exist
- if err == zk.ErrNoNode {
-
- // Create the directory
- parts := store.SplitKey(strings.TrimSuffix(key, "/"))
- parts = parts[:len(parts)-1]
- if err = s.createFullPath(parts, false); err != nil {
- // Failed to create the directory.
- return false, nil, err
- }
-
- // Create the node
- if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
- // Node exist error (when previous nil)
- if err == zk.ErrNodeExists {
- return false, nil, store.ErrKeyExists
- }
- return false, nil, err
- }
-
- } else {
- // Node Exists error (when previous nil)
- if err == zk.ErrNodeExists {
- return false, nil, store.ErrKeyExists
- }
-
- // Unhandled error
- return false, nil, err
- }
- }
- lastIndex = 0 // Newly created nodes have version 0.
- }
-
- pair := &store.KVPair{
- Key: key,
- Value: value,
- LastIndex: lastIndex,
- }
-
- return true, pair, nil
-}
-
-// AtomicDelete deletes a value at "key" if the key
-// has not been modified in the meantime, throws an
-// error if this is the case
-func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
- if previous == nil {
- return false, store.ErrPreviousNotSpecified
- }
-
- err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
- if err != nil {
- // Key not found
- if err == zk.ErrNoNode {
- return false, store.ErrKeyNotFound
- }
- // Compare failed
- if err == zk.ErrBadVersion {
- return false, store.ErrKeyModified
- }
- // General store error
- return false, err
- }
- return true, nil
-}
-
-// NewLock returns a handle to a lock struct which can
-// be used to provide mutual exclusion on a key
-func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
- value := []byte("")
-
- // Apply options
- if options != nil {
- if options.Value != nil {
- value = options.Value
- }
- }
-
- lock = &zookeeperLock{
- client: s.client,
- key: s.normalize(key),
- value: value,
- lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)),
- }
-
- return lock, err
-}
-
-// Lock attempts to acquire the lock and blocks while
-// doing so. It returns a channel that is closed if our
-// lock is lost or if an error occurs
-func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
- err := l.lock.Lock()
-
- if err == nil {
- // We hold the lock, we can set our value
- // FIXME: The value is left behind
- // (problematic for leader election)
- _, err = l.client.Set(l.key, l.value, -1)
- }
-
- return make(chan struct{}), err
-}
-
-// Unlock the "key". Calling unlock while
-// not holding the lock will throw an error
-func (l *zookeeperLock) Unlock() error {
- return l.lock.Unlock()
-}
-
-// Close closes the client connection
-func (s *Zookeeper) Close() {
- s.client.Close()
-}
-
-// Normalize the key for usage in Zookeeper
-func (s *Zookeeper) normalize(key string) string {
- key = store.Normalize(key)
- return strings.TrimSuffix(key, "/")
-}
diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go
deleted file mode 100644
index c36087e0..00000000
--- a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package zookeeper
-
-import (
- "testing"
- "time"
-
- "github.com/docker/libkv"
- "github.com/docker/libkv/store"
- "github.com/docker/libkv/testutils"
- "github.com/stretchr/testify/assert"
-)
-
-var (
- client = "localhost:2181"
-)
-
-func makeZkClient(t *testing.T) store.Store {
- kv, err := New(
- []string{client},
- &store.Config{
- ConnectionTimeout: 3 * time.Second,
- },
- )
-
- if err != nil {
- t.Fatalf("cannot create store: %v", err)
- }
-
- return kv
-}
-
-func TestRegister(t *testing.T) {
- Register()
-
- kv, err := libkv.NewStore(store.ZK, []string{client}, nil)
- assert.NoError(t, err)
- assert.NotNil(t, kv)
-
- if _, ok := kv.(*Zookeeper); !ok {
- t.Fatal("Error registering and initializing zookeeper")
- }
-}
-
-func TestZkStore(t *testing.T) {
- kv := makeZkClient(t)
- ttlKV := makeZkClient(t)
-
- testutils.RunTestCommon(t, kv)
- testutils.RunTestAtomic(t, kv)
- testutils.RunTestWatch(t, kv)
- testutils.RunTestLock(t, kv)
- testutils.RunTestTTL(t, kv, ttlKV)
- testutils.RunCleanup(t, kv)
-}
diff --git a/vendor/github.com/docker/libkv/testutils/utils.go b/vendor/github.com/docker/libkv/testutils/utils.go
deleted file mode 100644
index 5385bac6..00000000
--- a/vendor/github.com/docker/libkv/testutils/utils.go
+++ /dev/null
@@ -1,622 +0,0 @@
-package testutils
-
-import (
- "fmt"
- "testing"
- "time"
-
- "github.com/docker/libkv/store"
- "github.com/stretchr/testify/assert"
-)
-
-// RunTestCommon tests the minimal required APIs which
-// should be supported by all K/V backends
-func RunTestCommon(t *testing.T, kv store.Store) {
- testPutGetDeleteExists(t, kv)
- testList(t, kv)
- testDeleteTree(t, kv)
-}
-
-// RunTestAtomic tests the Atomic operations by the K/V
-// backends
-func RunTestAtomic(t *testing.T, kv store.Store) {
- testAtomicPut(t, kv)
- testAtomicPutCreate(t, kv)
- testAtomicPutWithSlashSuffixKey(t, kv)
- testAtomicDelete(t, kv)
-}
-
-// RunTestWatch tests the watch/monitor APIs supported
-// by the K/V backends.
-func RunTestWatch(t *testing.T, kv store.Store) {
- testWatch(t, kv)
- testWatchTree(t, kv)
-}
-
-// RunTestLock tests the KV pair Lock/Unlock APIs supported
-// by the K/V backends.
-func RunTestLock(t *testing.T, kv store.Store) {
- testLockUnlock(t, kv)
-}
-
-// RunTestLockTTL tests the KV pair Lock with TTL APIs supported
-// by the K/V backends.
-func RunTestLockTTL(t *testing.T, kv store.Store, backup store.Store) {
- testLockTTL(t, kv, backup)
-}
-
-// RunTestTTL tests the TTL functionality of the K/V backend.
-func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) {
- testPutTTL(t, kv, backup)
-}
-
-func testPutGetDeleteExists(t *testing.T, kv store.Store) {
- // Get a not exist key should return ErrKeyNotFound
- pair, err := kv.Get("testPutGetDelete_not_exist_key")
- assert.Equal(t, store.ErrKeyNotFound, err)
-
- value := []byte("bar")
- for _, key := range []string{
- "testPutGetDeleteExists",
- "testPutGetDeleteExists/",
- "testPutGetDeleteExists/testbar/",
- "testPutGetDeleteExists/testbar/testfoobar",
- } {
- failMsg := fmt.Sprintf("Fail key %s", key)
-
- // Put the key
- err = kv.Put(key, value, nil)
- assert.NoError(t, err, failMsg)
-
- // Get should return the value and an incremented index
- pair, err = kv.Get(key)
- assert.NoError(t, err, failMsg)
- if assert.NotNil(t, pair, failMsg) {
- assert.NotNil(t, pair.Value, failMsg)
- }
- assert.Equal(t, pair.Value, value, failMsg)
- assert.NotEqual(t, pair.LastIndex, 0, failMsg)
-
- // Exists should return true
- exists, err := kv.Exists(key)
- assert.NoError(t, err, failMsg)
- assert.True(t, exists, failMsg)
-
- // Delete the key
- err = kv.Delete(key)
- assert.NoError(t, err, failMsg)
-
- // Get should fail
- pair, err = kv.Get(key)
- assert.Error(t, err, failMsg)
- assert.Nil(t, pair, failMsg)
-
- // Exists should return false
- exists, err = kv.Exists(key)
- assert.NoError(t, err, failMsg)
- assert.False(t, exists, failMsg)
- }
-}
-
-func testWatch(t *testing.T, kv store.Store) {
- key := "testWatch"
- value := []byte("world")
- newValue := []byte("world!")
-
- // Put the key
- err := kv.Put(key, value, nil)
- assert.NoError(t, err)
-
- stopCh := make(<-chan struct{})
- events, err := kv.Watch(key, stopCh)
- assert.NoError(t, err)
- assert.NotNil(t, events)
-
- // Update loop
- go func() {
- timeout := time.After(1 * time.Second)
- tick := time.Tick(250 * time.Millisecond)
- for {
- select {
- case <-timeout:
- return
- case <-tick:
- err := kv.Put(key, newValue, nil)
- if assert.NoError(t, err) {
- continue
- }
- return
- }
- }
- }()
-
- // Check for updates
- eventCount := 1
- for {
- select {
- case event := <-events:
- assert.NotNil(t, event)
- if eventCount == 1 {
- assert.Equal(t, event.Key, key)
- assert.Equal(t, event.Value, value)
- } else {
- assert.Equal(t, event.Key, key)
- assert.Equal(t, event.Value, newValue)
- }
- eventCount++
- // We received all the events we wanted to check
- if eventCount >= 4 {
- return
- }
- case <-time.After(4 * time.Second):
- t.Fatal("Timeout reached")
- return
- }
- }
-}
-
-func testWatchTree(t *testing.T, kv store.Store) {
- dir := "testWatchTree"
-
- node1 := "testWatchTree/node1"
- value1 := []byte("node1")
-
- node2 := "testWatchTree/node2"
- value2 := []byte("node2")
-
- node3 := "testWatchTree/node3"
- value3 := []byte("node3")
-
- err := kv.Put(node1, value1, nil)
- assert.NoError(t, err)
- err = kv.Put(node2, value2, nil)
- assert.NoError(t, err)
- err = kv.Put(node3, value3, nil)
- assert.NoError(t, err)
-
- stopCh := make(<-chan struct{})
- events, err := kv.WatchTree(dir, stopCh)
- assert.NoError(t, err)
- assert.NotNil(t, events)
-
- // Update loop
- go func() {
- timeout := time.After(500 * time.Millisecond)
- for {
- select {
- case <-timeout:
- err := kv.Delete(node3)
- assert.NoError(t, err)
- return
- }
- }
- }()
-
- // Check for updates
- eventCount := 1
- for {
- select {
- case event := <-events:
- assert.NotNil(t, event)
- // We received the Delete event on a child node
- // Exit test successfully
- if eventCount == 2 {
- return
- }
- eventCount++
- case <-time.After(4 * time.Second):
- t.Fatal("Timeout reached")
- return
- }
- }
-}
-
-func testAtomicPut(t *testing.T, kv store.Store) {
- key := "testAtomicPut"
- value := []byte("world")
-
- // Put the key
- err := kv.Put(key, value, nil)
- assert.NoError(t, err)
-
- // Get should return the value and an incremented index
- pair, err := kv.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- // This CAS should fail: previous exists.
- success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil)
- assert.Error(t, err)
- assert.False(t, success)
-
- // This CAS should succeed
- success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil)
- assert.NoError(t, err)
- assert.True(t, success)
-
- // This CAS should fail, key exists.
- pair.LastIndex = 6744
- success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil)
- assert.Error(t, err)
- assert.False(t, success)
-}
-
-func testAtomicPutCreate(t *testing.T, kv store.Store) {
- // Use a key in a new directory to ensure Stores will create directories
- // that don't yet exist.
- key := "testAtomicPutCreate/create"
- value := []byte("putcreate")
-
- // AtomicPut the key, previous = nil indicates create.
- success, _, err := kv.AtomicPut(key, value, nil, nil)
- assert.NoError(t, err)
- assert.True(t, success)
-
- // Get should return the value and an incremented index
- pair, err := kv.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
-
- // Attempting to create again should fail.
- success, _, err = kv.AtomicPut(key, value, nil, nil)
- assert.Error(t, store.ErrKeyExists)
- assert.False(t, success)
-
- // This CAS should succeed, since it has the value from Get()
- success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil)
- assert.NoError(t, err)
- assert.True(t, success)
-}
-
-func testAtomicPutWithSlashSuffixKey(t *testing.T, kv store.Store) {
- k1 := "testAtomicPutWithSlashSuffixKey/key/"
- success, _, err := kv.AtomicPut(k1, []byte{}, nil, nil)
- assert.Nil(t, err)
- assert.True(t, success)
-}
-
-func testAtomicDelete(t *testing.T, kv store.Store) {
- key := "testAtomicDelete"
- value := []byte("world")
-
- // Put the key
- err := kv.Put(key, value, nil)
- assert.NoError(t, err)
-
- // Get should return the value and an incremented index
- pair, err := kv.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- tempIndex := pair.LastIndex
-
- // AtomicDelete should fail
- pair.LastIndex = 6744
- success, err := kv.AtomicDelete(key, pair)
- assert.Error(t, err)
- assert.False(t, success)
-
- // AtomicDelete should succeed
- pair.LastIndex = tempIndex
- success, err = kv.AtomicDelete(key, pair)
- assert.NoError(t, err)
- assert.True(t, success)
-
- // Delete a non-existent key; should fail
- success, err = kv.AtomicDelete(key, pair)
- assert.Error(t, store.ErrKeyNotFound)
- assert.False(t, success)
-}
-
-func testLockUnlock(t *testing.T, kv store.Store) {
- key := "testLockUnlock"
- value := []byte("bar")
-
- // We should be able to create a new lock on key
- lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second})
- assert.NoError(t, err)
- assert.NotNil(t, lock)
-
- // Lock should successfully succeed or block
- lockChan, err := lock.Lock(nil)
- assert.NoError(t, err)
- assert.NotNil(t, lockChan)
-
- // Get should work
- pair, err := kv.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- // Unlock should succeed
- err = lock.Unlock()
- assert.NoError(t, err)
-
- // Lock should succeed again
- lockChan, err = lock.Lock(nil)
- assert.NoError(t, err)
- assert.NotNil(t, lockChan)
-
- // Get should work
- pair, err = kv.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- err = lock.Unlock()
- assert.NoError(t, err)
-}
-
-func testLockTTL(t *testing.T, kv store.Store, otherConn store.Store) {
- key := "testLockTTL"
- value := []byte("bar")
-
- renewCh := make(chan struct{})
-
- // We should be able to create a new lock on key
- lock, err := otherConn.NewLock(key, &store.LockOptions{
- Value: value,
- TTL: 2 * time.Second,
- RenewLock: renewCh,
- })
- assert.NoError(t, err)
- assert.NotNil(t, lock)
-
- // Lock should successfully succeed
- lockChan, err := lock.Lock(nil)
- assert.NoError(t, err)
- assert.NotNil(t, lockChan)
-
- // Get should work
- pair, err := otherConn.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- time.Sleep(3 * time.Second)
-
- done := make(chan struct{})
- stop := make(chan struct{})
-
- value = []byte("foobar")
-
- // Create a new lock with another connection
- lock, err = kv.NewLock(
- key,
- &store.LockOptions{
- Value: value,
- TTL: 3 * time.Second,
- },
- )
- assert.NoError(t, err)
- assert.NotNil(t, lock)
-
- // Lock should block, the session on the lock
- // is still active and renewed periodically
- go func(<-chan struct{}) {
- _, _ = lock.Lock(stop)
- done <- struct{}{}
- }(done)
-
- select {
- case _ = <-done:
- t.Fatal("Lock succeeded on a key that is supposed to be locked by another client")
- case <-time.After(4 * time.Second):
- // Stop requesting the lock as we are blocked as expected
- stop <- struct{}{}
- break
- }
-
- // Close the connection
- otherConn.Close()
-
- // Force stop the session renewal for the lock
- close(renewCh)
-
- // Let the session on the lock expire
- time.Sleep(3 * time.Second)
- locked := make(chan struct{})
-
- // Lock should now succeed for the other client
- go func(<-chan struct{}) {
- lockChan, err = lock.Lock(nil)
- assert.NoError(t, err)
- assert.NotNil(t, lockChan)
- locked <- struct{}{}
- }(locked)
-
- select {
- case _ = <-locked:
- break
- case <-time.After(4 * time.Second):
- t.Fatal("Unable to take the lock, timed out")
- }
-
- // Get should work with the new value
- pair, err = kv.Get(key)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, value)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- err = lock.Unlock()
- assert.NoError(t, err)
-}
-
-func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) {
- firstKey := "testPutTTL"
- firstValue := []byte("foo")
-
- secondKey := "second"
- secondValue := []byte("bar")
-
- // Put the first key with the Ephemeral flag
- err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second})
- assert.NoError(t, err)
-
- // Put a second key with the Ephemeral flag
- err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second})
- assert.NoError(t, err)
-
- // Get on firstKey should work
- pair, err := kv.Get(firstKey)
- assert.NoError(t, err)
- assert.NotNil(t, pair)
-
- // Get on secondKey should work
- pair, err = kv.Get(secondKey)
- assert.NoError(t, err)
- assert.NotNil(t, pair)
-
- // Close the connection
- otherConn.Close()
-
- // Let the session expire
- time.Sleep(3 * time.Second)
-
- // Get on firstKey shouldn't work
- pair, err = kv.Get(firstKey)
- assert.Error(t, err)
- assert.Nil(t, pair)
-
- // Get on secondKey shouldn't work
- pair, err = kv.Get(secondKey)
- assert.Error(t, err)
- assert.Nil(t, pair)
-}
-
-func testList(t *testing.T, kv store.Store) {
- prefix := "testList"
-
- firstKey := "testList/first"
- firstValue := []byte("first")
-
- secondKey := "testList/second"
- secondValue := []byte("second")
-
- // Put the first key
- err := kv.Put(firstKey, firstValue, nil)
- assert.NoError(t, err)
-
- // Put the second key
- err = kv.Put(secondKey, secondValue, nil)
- assert.NoError(t, err)
-
- // List should work and return the two correct values
- for _, parent := range []string{prefix, prefix + "/"} {
- pairs, err := kv.List(parent)
- assert.NoError(t, err)
- if assert.NotNil(t, pairs) {
- assert.Equal(t, len(pairs), 2)
- }
-
- // Check pairs, those are not necessarily in Put order
- for _, pair := range pairs {
- if pair.Key == firstKey {
- assert.Equal(t, pair.Value, firstValue)
- }
- if pair.Key == secondKey {
- assert.Equal(t, pair.Value, secondValue)
- }
- }
- }
-
- // List should fail: the key does not exist
- pairs, err := kv.List("idontexist")
- assert.Equal(t, store.ErrKeyNotFound, err)
- assert.Nil(t, pairs)
-}
-
-func testDeleteTree(t *testing.T, kv store.Store) {
- prefix := "testDeleteTree"
-
- firstKey := "testDeleteTree/first"
- firstValue := []byte("first")
-
- secondKey := "testDeleteTree/second"
- secondValue := []byte("second")
-
- // Put the first key
- err := kv.Put(firstKey, firstValue, nil)
- assert.NoError(t, err)
-
- // Put the second key
- err = kv.Put(secondKey, secondValue, nil)
- assert.NoError(t, err)
-
- // Get should work on the first Key
- pair, err := kv.Get(firstKey)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, firstValue)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- // Get should work on the second Key
- pair, err = kv.Get(secondKey)
- assert.NoError(t, err)
- if assert.NotNil(t, pair) {
- assert.NotNil(t, pair.Value)
- }
- assert.Equal(t, pair.Value, secondValue)
- assert.NotEqual(t, pair.LastIndex, 0)
-
- // Delete Values under directory `nodes`
- err = kv.DeleteTree(prefix)
- assert.NoError(t, err)
-
- // Get should fail on both keys
- pair, err = kv.Get(firstKey)
- assert.Error(t, err)
- assert.Nil(t, pair)
-
- pair, err = kv.Get(secondKey)
- assert.Error(t, err)
- assert.Nil(t, pair)
-}
-
-// RunCleanup cleans up keys introduced by the tests
-func RunCleanup(t *testing.T, kv store.Store) {
- for _, key := range []string{
- "testAtomicPutWithSlashSuffixKey",
- "testPutGetDeleteExists",
- "testWatch",
- "testWatchTree",
- "testAtomicPut",
- "testAtomicPutCreate",
- "testAtomicDelete",
- "testLockUnlock",
- "testLockTTL",
- "testPutTTL",
- "testList",
- "testDeleteTree",
- } {
- err := kv.DeleteTree(key)
- assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err))
- err = kv.Delete(key)
- assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err))
- }
-}