diff options
| author | Dave Henderson <dhenderson@gmail.com> | 2017-12-05 08:10:13 -0500 |
|---|---|---|
| committer | Dave Henderson <dhenderson@gmail.com> | 2017-12-05 08:10:13 -0500 |
| commit | 149f2b13d110f34f048e5942466faea4e1a4a870 (patch) | |
| tree | 41a259362fefcfc99f95b26fda532a26fd4ba4f0 /vendor/github.com/docker | |
| parent | f6e20ca5ecb47c067fccca8d61e937f35348e7a5 (diff) | |
Pruning dependencies with `dep prune`
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'vendor/github.com/docker')
| -rw-r--r-- | vendor/github.com/docker/libkv/docs/compatibility.md | 82 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/docs/examples.md | 157 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/script/.validate | 33 | ||||
| -rwxr-xr-x | vendor/github.com/docker/libkv/script/coverage | 21 | ||||
| -rwxr-xr-x | vendor/github.com/docker/libkv/script/travis_consul.sh | 18 | ||||
| -rwxr-xr-x | vendor/github.com/docker/libkv/script/travis_etcd.sh | 11 | ||||
| -rwxr-xr-x | vendor/github.com/docker/libkv/script/travis_zk.sh | 12 | ||||
| -rwxr-xr-x | vendor/github.com/docker/libkv/script/validate-gofmt | 30 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/store/etcd/etcd.go | 606 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/store/etcd/etcd_test.go | 58 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/store/mock/mock.go | 113 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go | 429 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go | 54 | ||||
| -rw-r--r-- | vendor/github.com/docker/libkv/testutils/utils.go | 622 |
14 files changed, 0 insertions, 2246 deletions
diff --git a/vendor/github.com/docker/libkv/docs/compatibility.md b/vendor/github.com/docker/libkv/docs/compatibility.md deleted file mode 100644 index c4f27e9c..00000000 --- a/vendor/github.com/docker/libkv/docs/compatibility.md +++ /dev/null @@ -1,82 +0,0 @@ -#Cross-Backend Compatibility - -The value of `libkv` is not to duplicate the code for programs that should support multiple distributed K/V stores like the classic `Consul`/`etcd`/`zookeeper` trio. - -This document provides with general guidelines for users willing to support those backends with the same code using `libkv`. - -Please note that most of those workarounds are going to disappear in the future with `etcd` APIv3. - -##Etcd directory/key distinction - -`etcd` with APIv2 makes the distinction between keys and directories. The result with `libkv` is that when using the etcd driver: - -- You cannot store values on directories -- You cannot invoke `WatchTree` (watching on child values), on a regular key - -This is fundamentaly different than `Consul` and `zookeeper` which are more permissive and allow the same set of operations on keys and directories (called a Node for zookeeper). - -Apiv3 is in the work for `etcd`, which removes this key/directory distinction, but until then you should follow these workarounds to make your `libkv` code work across backends. - -###Put - -`etcd` cannot put values on directories, so this puts a major restriction compared to `Consul` and `zookeeper`. - -If you want to support all those three backends, you should make sure to only put data on **leaves**. - -For example: - -```go -_ := kv.Put("path/to/key/bis", []byte("foo"), nil) -_ := kv.Put("path/to/key", []byte("bar"), nil) -``` - -Will work on `Consul` and `zookeeper` but fail for `etcd`. This is because the first `Put` in the case of `etcd` will recursively create the directory hierarchy and `path/to/key` is now considered as a directory. Thus, values should always be stored on leaves if the support for the three backends is planned. - -###WatchTree - -When initializing the `WatchTree`, the natural way to do so is through the following code: - -```go -key := "path/to/key" -if !kv.Exists(key) { - err := kv.Put(key, []byte("data"), nil) -} -events, err := kv.WatchTree(key, nil) -``` - -The code above will not work across backends and etcd will fail on the `WatchTree` call. What happens exactly: - -- `Consul` will create a regular `key` because it has no distinction between directories and keys. This is not an issue as we can invoke `WatchTree` on regular keys. -- `zookeeper` is going to create a `node` that can either be a directory or a key during the lifetime of a program but it does not matter as a directory can hold values and be watchable like a regular key. -- `etcd` is going to create a regular `key`. We cannot invoke `WatchTree` on regular keys using etcd. - -To be cross-compatible between those three backends for `WatchTree`, we need to enforce a parameter that is only interpreted with `etcd` and which tells the client to create a `directory` instead of a key. - -```go -key := "path/to/key" -if !kv.Exists(key) { - // We enforce IsDir = true to make sure etcd creates a directory - err := kv.Put(key, []byte("data"), &store.WriteOptions{IsDir:true}) -} -events, err := kv.WatchTree(key, nil) -``` - -The code above will work for the three backends but make sure to not try to store any value at that path as the call to `Put` will fail for `etcd` (you can only put at `path/to/key/foo`, `path/to/key/bar` for example). - -##Etcd distributed locking - -There is `Lock` mechanisms baked in the `coreos/etcd/client` for now. Instead, `libkv` has its own implementation of a `Lock` on top of `etcd`. - -The general workflow for the `Lock` is as follows: - -- Call Lock concurrently on a `key` between threads/programs -- Only one will create that key, others are going to fail because the key has already been created -- The thread locking the key can get the right index to set the value of the key using Compare And Swap and effectively Lock and hold the key -- Other threads are given a wrong index to fail the Compare and Swap and block until the key has been released by the thread holding the Lock -- Lock seekers are setting up a Watch listening on that key and events happening on the key -- When the thread/program stops holding the lock, it deletes the key triggering a `delete` event that will notify all the other threads. In case the program crashes, the key has a TTL attached that will send an `expire` event when this TTL expires. -- Once everyone is notified, back to the first step. First come, first served with the Lock. - -The whole Lock process is highly dependent on the `delete`/`expire` events of `etcd`. So don't expect the key to be still there once the Lock is released. - -For example if the whole logic is to `Lock` a key and expect the value to still be there after it has been unlocked, it is not going to be cross-backend compatible with `Consul` and `zookeeper`. On the other end the `etcd` Lock can still be used to do Leader Election for example and still be cross-compatible with other backends.
\ No newline at end of file diff --git a/vendor/github.com/docker/libkv/docs/examples.md b/vendor/github.com/docker/libkv/docs/examples.md deleted file mode 100644 index 09752db1..00000000 --- a/vendor/github.com/docker/libkv/docs/examples.md +++ /dev/null @@ -1,157 +0,0 @@ -#Examples - -This document contains useful example of usage for `libkv`. It might not be complete but provides with general informations on how to use the client. - -##Create a store and use Put/Get/Delete - -```go -package main - -import ( - "fmt" - "time" - "log" - - "github.com/docker/libkv" - "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" -) - -func init() { - // Register consul store to libkv - consul.Register() - - // We can register as many backends that are supported by libkv - etcd.Register() - zookeeper.Register() - boltdb.Register() -} - -func main() { - client := "localhost:8500" - - // Initialize a new store with consul - kv, err := libkv.NewStore( - store.CONSUL, // or "consul" - []string{client}, - &store.Config{ - ConnectionTimeout: 10*time.Second, - }, - ) - if err != nil { - log.Fatal("Cannot create store consul") - } - - key := "foo" - err = kv.Put(key, []byte("bar"), nil) - if err != nil { - fmt.Errorf("Error trying to put value at key: %v", key) - } - - pair, err := kv.Get(key) - if err != nil { - fmt.Errorf("Error trying accessing value at key: %v", key) - } - - err = kv.Delete(key) - if err != nil { - fmt.Errorf("Error trying to delete key %v", key) - } - - log.Info("value: ", string(pair.Value)) -} -``` - -##List keys - -```go -// List will list all the keys under `key` if it contains a set of child keys/values -entries, err := kv.List(key) -for _, pair := range entries { - fmt.Printf("key=%v - value=%v", pair.Key, string(pair.Value)) -} - -``` - -##Watching for events on a single key (Watch) - -You can use watches to watch modifications on a key. First you need to check if the key exists. If this is not the case, we need to create it using the `Put` function. - -```go -// Checking on the key before watching -if !kv.Exists(key) { - err := kv.Put(key, []byte("bar"), nil) - if err != nil { - fmt.Errorf("Something went wrong when initializing key %v", key) - } -} - -stopCh := make(<-chan struct{}) -events, err := kv.Watch(key, stopCh) - -select { - case pair := <-events: - // Do something with events - fmt.Printf("value changed on key %v: new value=%v", key, pair.Value) -} - -``` - -##Watching for events happening on child keys (WatchTree) - -You can use watches to watch modifications on a key. First you need to check if the key exists. If this is not the case, we need to create it using the `Put` function. There is a special step here though if you want your code to work across backends. Because `etcd` is a special case and it makes the distinction between directories and keys, we need to make sure that the created key is considered as a directory by enforcing `IsDir` at `true`. - -```go -// Checking on the key before watching -if !kv.Exists(key) { - // Don't forget IsDir:true if the code is used cross-backend - err := kv.Put(key, []byte("bar"), &store.WriteOptions{IsDir:true}) - if err != nil { - fmt.Errorf("Something went wrong when initializing key %v", key) - } -} - -stopCh := make(<-chan struct{}) -events, err := kv.WatchTree(key, stopCh) - -select { - case pairs := <-events: - // Do something with events - for _, pair := range pairs { - fmt.Printf("value changed on key %v: new value=%v", key, pair.Value) - } -} - -``` - -## Distributed Locking, using Lock/Unlock - -```go -key := "lockKey" -value := []byte("bar") - -// Initialize a distributed lock. TTL is optional, it is here to make sure that -// the lock is released after the program that is holding the lock ends or crashes -lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second}) -if err != nil { - fmt.Errorf("something went wrong when trying to initialize the Lock") -} - -// Try to lock the key, the call to Lock() is blocking -_, err := lock.Lock(nil) -if err != nil { - fmt.Errorf("something went wrong when trying to lock key %v", key) -} - -// Get should work because we are holding the key -pair, err := kv.Get(key) -if err != nil { - fmt.Errorf("key %v has value %v", key, pair.Value) -} - -// Unlock the key -err = lock.Unlock() -if err != nil { - fmt.Errorf("something went wrong when trying to unlock key %v", key) -} -```
\ No newline at end of file diff --git a/vendor/github.com/docker/libkv/script/.validate b/vendor/github.com/docker/libkv/script/.validate deleted file mode 100644 index 3767f422..00000000 --- a/vendor/github.com/docker/libkv/script/.validate +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -if [ -z "$VALIDATE_UPSTREAM" ]; then - # this is kind of an expensive check, so let's not do this twice if we - # are running more than one validate bundlescript - - VALIDATE_REPO='https://github.com/docker/libkv.git' - VALIDATE_BRANCH='master' - - if [ "$TRAVIS" = 'true' -a "$TRAVIS_PULL_REQUEST" != 'false' ]; then - VALIDATE_REPO="https://github.com/${TRAVIS_REPO_SLUG}.git" - VALIDATE_BRANCH="${TRAVIS_BRANCH}" - fi - - VALIDATE_HEAD="$(git rev-parse --verify HEAD)" - - git fetch -q "$VALIDATE_REPO" "refs/heads/$VALIDATE_BRANCH" - VALIDATE_UPSTREAM="$(git rev-parse --verify FETCH_HEAD)" - - VALIDATE_COMMIT_LOG="$VALIDATE_UPSTREAM..$VALIDATE_HEAD" - VALIDATE_COMMIT_DIFF="$VALIDATE_UPSTREAM...$VALIDATE_HEAD" - - validate_diff() { - if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then - git diff "$VALIDATE_COMMIT_DIFF" "$@" - fi - } - validate_log() { - if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then - git log "$VALIDATE_COMMIT_LOG" "$@" - fi - } -fi diff --git a/vendor/github.com/docker/libkv/script/coverage b/vendor/github.com/docker/libkv/script/coverage deleted file mode 100755 index a7a13f45..00000000 --- a/vendor/github.com/docker/libkv/script/coverage +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash - -MODE="mode: count" -ROOT=${TRAVIS_BUILD_DIR:-.}/../../.. - -# Grab the list of packages. -# Exclude the API and CLI from coverage as it will be covered by integration tests. -PACKAGES=`go list ./...` - -# Create the empty coverage file. -echo $MODE > goverage.report - -# Run coverage on every package. -for package in $PACKAGES; do - output="$ROOT/$package/coverage.out" - - go test -test.short -covermode=count -coverprofile=$output $package - if [ -f "$output" ] ; then - cat "$output" | grep -v "$MODE" >> goverage.report - fi -done diff --git a/vendor/github.com/docker/libkv/script/travis_consul.sh b/vendor/github.com/docker/libkv/script/travis_consul.sh deleted file mode 100755 index 7b63d6b6..00000000 --- a/vendor/github.com/docker/libkv/script/travis_consul.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -if [ $# -gt 0 ] ; then - CONSUL_VERSION="$1" -else - CONSUL_VERSION="0.5.2" -fi - -# install consul -wget "https://releases.hashicorp.com/consul/${CONSUL_VERSION}/consul_${CONSUL_VERSION}_linux_amd64.zip" -unzip "consul_${CONSUL_VERSION}_linux_amd64.zip" - -# make config for minimum ttl -touch config.json -echo "{\"session_ttl_min\": \"1s\"}" >> config.json - -# check -./consul --version diff --git a/vendor/github.com/docker/libkv/script/travis_etcd.sh b/vendor/github.com/docker/libkv/script/travis_etcd.sh deleted file mode 100755 index bee8567f..00000000 --- a/vendor/github.com/docker/libkv/script/travis_etcd.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash - -if [ $# -gt 0 ] ; then - ETCD_VERSION="$1" -else - ETCD_VERSION="2.2.0" -fi - -curl -L https://github.com/coreos/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz -o etcd-v$ETCD_VERSION-linux-amd64.tar.gz -tar xzvf etcd-v$ETCD_VERSION-linux-amd64.tar.gz -mv etcd-v$ETCD_VERSION-linux-amd64 etcd diff --git a/vendor/github.com/docker/libkv/script/travis_zk.sh b/vendor/github.com/docker/libkv/script/travis_zk.sh deleted file mode 100755 index 636a2407..00000000 --- a/vendor/github.com/docker/libkv/script/travis_zk.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -if [ $# -gt 0 ] ; then - ZK_VERSION="$1" -else - ZK_VERSION="3.4.7" -fi - -wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-${ZK_VERSION}.tar.gz" -tar -xvf "zookeeper-${ZK_VERSION}.tar.gz" -mv zookeeper-$ZK_VERSION zk -mv ./zk/conf/zoo_sample.cfg ./zk/conf/zoo.cfg diff --git a/vendor/github.com/docker/libkv/script/validate-gofmt b/vendor/github.com/docker/libkv/script/validate-gofmt deleted file mode 100755 index c565976b..00000000 --- a/vendor/github.com/docker/libkv/script/validate-gofmt +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash - -source "$(dirname "$BASH_SOURCE")/.validate" - -IFS=$'\n' -files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^Godeps/' || true) ) -unset IFS - -badFiles=() -for f in "${files[@]}"; do - # we use "git show" here to validate that what's committed is formatted - if [ "$(git show "$VALIDATE_HEAD:$f" | gofmt -s -l)" ]; then - badFiles+=( "$f" ) - fi -done - -if [ ${#badFiles[@]} -eq 0 ]; then - echo 'Congratulations! All Go source files are properly formatted.' -else - { - echo "These files are not properly gofmt'd:" - for f in "${badFiles[@]}"; do - echo " - $f" - done - echo - echo 'Please reformat the above files using "gofmt -s -w" and commit the result.' - echo - } >&2 - false -fi diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd.go b/vendor/github.com/docker/libkv/store/etcd/etcd.go deleted file mode 100644 index c932ca66..00000000 --- a/vendor/github.com/docker/libkv/store/etcd/etcd.go +++ /dev/null @@ -1,606 +0,0 @@ -package etcd - -import ( - "crypto/tls" - "errors" - "log" - "net" - "net/http" - "strings" - "time" - - "golang.org/x/net/context" - - etcd "github.com/coreos/etcd/client" - "github.com/docker/libkv" - "github.com/docker/libkv/store" -) - -var ( - // ErrAbortTryLock is thrown when a user stops trying to seek the lock - // by sending a signal to the stop chan, this is used to verify if the - // operation succeeded - ErrAbortTryLock = errors.New("lock operation aborted") -) - -// Etcd is the receiver type for the -// Store interface -type Etcd struct { - client etcd.KeysAPI -} - -type etcdLock struct { - client etcd.KeysAPI - stopLock chan struct{} - stopRenew chan struct{} - key string - value string - last *etcd.Response - ttl time.Duration -} - -const ( - periodicSync = 5 * time.Minute - defaultLockTTL = 20 * time.Second - defaultUpdateTime = 5 * time.Second -) - -// Register registers etcd to libkv -func Register() { - libkv.AddStore(store.ETCD, New) -} - -// New creates a new Etcd client given a list -// of endpoints and an optional tls config -func New(addrs []string, options *store.Config) (store.Store, error) { - s := &Etcd{} - - var ( - entries []string - err error - ) - - entries = store.CreateEndpoints(addrs, "http") - cfg := &etcd.Config{ - Endpoints: entries, - Transport: etcd.DefaultTransport, - HeaderTimeoutPerRequest: 3 * time.Second, - } - - // Set options - if options != nil { - if options.TLS != nil { - setTLS(cfg, options.TLS, addrs) - } - if options.ConnectionTimeout != 0 { - setTimeout(cfg, options.ConnectionTimeout) - } - if options.Username != "" { - setCredentials(cfg, options.Username, options.Password) - } - } - - c, err := etcd.New(*cfg) - if err != nil { - log.Fatal(err) - } - - s.client = etcd.NewKeysAPI(c) - - // Periodic Cluster Sync - go func() { - for { - if err := c.AutoSync(context.Background(), periodicSync); err != nil { - return - } - } - }() - - return s, nil -} - -// SetTLS sets the tls configuration given a tls.Config scheme -func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) { - entries := store.CreateEndpoints(addrs, "https") - cfg.Endpoints = entries - - // Set transport - t := http.Transport{ - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: tls, - } - - cfg.Transport = &t -} - -// setTimeout sets the timeout used for connecting to the store -func setTimeout(cfg *etcd.Config, time time.Duration) { - cfg.HeaderTimeoutPerRequest = time -} - -// setCredentials sets the username/password credentials for connecting to Etcd -func setCredentials(cfg *etcd.Config, username, password string) { - cfg.Username = username - cfg.Password = password -} - -// Normalize the key for usage in Etcd -func (s *Etcd) normalize(key string) string { - key = store.Normalize(key) - return strings.TrimPrefix(key, "/") -} - -// keyNotFound checks on the error returned by the KeysAPI -// to verify if the key exists in the store or not -func keyNotFound(err error) bool { - if err != nil { - if etcdError, ok := err.(etcd.Error); ok { - if etcdError.Code == etcd.ErrorCodeKeyNotFound || - etcdError.Code == etcd.ErrorCodeNotFile || - etcdError.Code == etcd.ErrorCodeNotDir { - return true - } - } - } - return false -} - -// Get the value at "key", returns the last modified -// index to use in conjunction to Atomic calls -func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { - getOpts := &etcd.GetOptions{ - Quorum: true, - } - - result, err := s.client.Get(context.Background(), s.normalize(key), getOpts) - if err != nil { - if keyNotFound(err) { - return nil, store.ErrKeyNotFound - } - return nil, err - } - - pair = &store.KVPair{ - Key: key, - Value: []byte(result.Node.Value), - LastIndex: result.Node.ModifiedIndex, - } - - return pair, nil -} - -// Put a value at "key" -func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { - setOpts := &etcd.SetOptions{} - - // Set options - if opts != nil { - setOpts.Dir = opts.IsDir - setOpts.TTL = opts.TTL - } - - _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) - return err -} - -// Delete a value at "key" -func (s *Etcd) Delete(key string) error { - opts := &etcd.DeleteOptions{ - Recursive: false, - } - - _, err := s.client.Delete(context.Background(), s.normalize(key), opts) - if keyNotFound(err) { - return store.ErrKeyNotFound - } - return err -} - -// Exists checks if the key exists inside the store -func (s *Etcd) Exists(key string) (bool, error) { - _, err := s.Get(key) - if err != nil { - if err == store.ErrKeyNotFound { - return false, nil - } - return false, err - } - return true, nil -} - -// Watch for changes on a "key" -// It returns a channel that will receive changes or pass -// on errors. Upon creation, the current value will first -// be sent to the channel. Providing a non-nil stopCh can -// be used to stop watching. -func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - opts := &etcd.WatcherOptions{Recursive: false} - watcher := s.client.Watcher(s.normalize(key), opts) - - // watchCh is sending back events to the caller - watchCh := make(chan *store.KVPair) - - go func() { - defer close(watchCh) - - // Get the current value - pair, err := s.Get(key) - if err != nil { - return - } - - // Push the current value through the channel. - watchCh <- pair - - for { - // Check if the watch was stopped by the caller - select { - case <-stopCh: - return - default: - } - - result, err := watcher.Next(context.Background()) - - if err != nil { - return - } - - watchCh <- &store.KVPair{ - Key: key, - Value: []byte(result.Node.Value), - LastIndex: result.Node.ModifiedIndex, - } - } - }() - - return watchCh, nil -} - -// WatchTree watches for changes on a "directory" -// It returns a channel that will receive changes or pass -// on errors. Upon creating a watch, the current childs values -// will be sent to the channel. Providing a non-nil stopCh can -// be used to stop watching. -func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - watchOpts := &etcd.WatcherOptions{Recursive: true} - watcher := s.client.Watcher(s.normalize(directory), watchOpts) - - // watchCh is sending back events to the caller - watchCh := make(chan []*store.KVPair) - - go func() { - defer close(watchCh) - - // Get child values - list, err := s.List(directory) - if err != nil { - return - } - - // Push the current value through the channel. - watchCh <- list - - for { - // Check if the watch was stopped by the caller - select { - case <-stopCh: - return - default: - } - - _, err := watcher.Next(context.Background()) - - if err != nil { - return - } - - list, err = s.List(directory) - if err != nil { - return - } - - watchCh <- list - } - }() - - return watchCh, nil -} - -// AtomicPut puts a value at "key" if the key has not been -// modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) { - var ( - meta *etcd.Response - err error - ) - - setOpts := &etcd.SetOptions{} - - if previous != nil { - setOpts.PrevExist = etcd.PrevExist - setOpts.PrevIndex = previous.LastIndex - if previous.Value != nil { - setOpts.PrevValue = string(previous.Value) - } - } else { - setOpts.PrevExist = etcd.PrevNoExist - } - - if opts != nil { - if opts.TTL > 0 { - setOpts.TTL = opts.TTL - } - } - - meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) - if err != nil { - if etcdError, ok := err.(etcd.Error); ok { - // Compare failed - if etcdError.Code == etcd.ErrorCodeTestFailed { - return false, nil, store.ErrKeyModified - } - // Node exists error (when PrevNoExist) - if etcdError.Code == etcd.ErrorCodeNodeExist { - return false, nil, store.ErrKeyExists - } - } - return false, nil, err - } - - updated := &store.KVPair{ - Key: key, - Value: value, - LastIndex: meta.Node.ModifiedIndex, - } - - return true, updated, nil -} - -// AtomicDelete deletes a value at "key" if the key -// has not been modified in the meantime, throws an -// error if this is the case -func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { - if previous == nil { - return false, store.ErrPreviousNotSpecified - } - - delOpts := &etcd.DeleteOptions{} - - if previous != nil { - delOpts.PrevIndex = previous.LastIndex - if previous.Value != nil { - delOpts.PrevValue = string(previous.Value) - } - } - - _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts) - if err != nil { - if etcdError, ok := err.(etcd.Error); ok { - // Key Not Found - if etcdError.Code == etcd.ErrorCodeKeyNotFound { - return false, store.ErrKeyNotFound - } - // Compare failed - if etcdError.Code == etcd.ErrorCodeTestFailed { - return false, store.ErrKeyModified - } - } - return false, err - } - - return true, nil -} - -// List child nodes of a given directory -func (s *Etcd) List(directory string) ([]*store.KVPair, error) { - getOpts := &etcd.GetOptions{ - Quorum: true, - Recursive: true, - Sort: true, - } - - resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts) - if err != nil { - if keyNotFound(err) { - return nil, store.ErrKeyNotFound - } - return nil, err - } - - kv := []*store.KVPair{} - for _, n := range resp.Node.Nodes { - kv = append(kv, &store.KVPair{ - Key: n.Key, - Value: []byte(n.Value), - LastIndex: n.ModifiedIndex, - }) - } - return kv, nil -} - -// DeleteTree deletes a range of keys under a given directory -func (s *Etcd) DeleteTree(directory string) error { - delOpts := &etcd.DeleteOptions{ - Recursive: true, - } - - _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts) - if keyNotFound(err) { - return store.ErrKeyNotFound - } - return err -} - -// NewLock returns a handle to a lock struct which can -// be used to provide mutual exclusion on a key -func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { - var value string - ttl := defaultLockTTL - renewCh := make(chan struct{}) - - // Apply options on Lock - if options != nil { - if options.Value != nil { - value = string(options.Value) - } - if options.TTL != 0 { - ttl = options.TTL - } - if options.RenewLock != nil { - renewCh = options.RenewLock - } - } - - // Create lock object - lock = &etcdLock{ - client: s.client, - stopRenew: renewCh, - key: s.normalize(key), - value: value, - ttl: ttl, - } - - return lock, nil -} - -// Lock attempts to acquire the lock and blocks while -// doing so. It returns a channel that is closed if our -// lock is lost or if an error occurs -func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { - - // Lock holder channel - lockHeld := make(chan struct{}) - stopLocking := l.stopRenew - - setOpts := &etcd.SetOptions{ - TTL: l.ttl, - } - - for { - setOpts.PrevExist = etcd.PrevNoExist - resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts) - if err != nil { - if etcdError, ok := err.(etcd.Error); ok { - if etcdError.Code != etcd.ErrorCodeNodeExist { - return nil, err - } - setOpts.PrevIndex = ^uint64(0) - } - } else { - setOpts.PrevIndex = resp.Node.ModifiedIndex - } - - setOpts.PrevExist = etcd.PrevExist - l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts) - - if err == nil { - // Leader section - l.stopLock = stopLocking - go l.holdLock(l.key, lockHeld, stopLocking) - break - } else { - // If this is a legitimate error, return - if etcdError, ok := err.(etcd.Error); ok { - if etcdError.Code != etcd.ErrorCodeTestFailed { - return nil, err - } - } - - // Seeker section - errorCh := make(chan error) - chWStop := make(chan bool) - free := make(chan bool) - - go l.waitLock(l.key, errorCh, chWStop, free) - - // Wait for the key to be available or for - // a signal to stop trying to lock the key - select { - case <-free: - break - case err := <-errorCh: - return nil, err - case <-stopChan: - return nil, ErrAbortTryLock - } - - // Delete or Expire event occurred - // Retry - } - } - - return lockHeld, nil -} - -// Hold the lock as long as we can -// Updates the key ttl periodically until we receive -// an explicit stop signal from the Unlock method -func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) { - defer close(lockHeld) - - update := time.NewTicker(l.ttl / 3) - defer update.Stop() - - var err error - setOpts := &etcd.SetOptions{TTL: l.ttl} - - for { - select { - case <-update.C: - setOpts.PrevIndex = l.last.Node.ModifiedIndex - l.last, err = l.client.Set(context.Background(), key, l.value, setOpts) - if err != nil { - return - } - - case <-stopLocking: - return - } - } -} - -// WaitLock simply waits for the key to be available for creation -func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) { - opts := &etcd.WatcherOptions{Recursive: false} - watcher := l.client.Watcher(key, opts) - - for { - event, err := watcher.Next(context.Background()) - if err != nil { - errorCh <- err - return - } - if event.Action == "delete" || event.Action == "expire" { - free <- true - return - } - } -} - -// Unlock the "key". Calling unlock while -// not holding the lock will throw an error -func (l *etcdLock) Unlock() error { - if l.stopLock != nil { - l.stopLock <- struct{}{} - } - if l.last != nil { - delOpts := &etcd.DeleteOptions{ - PrevIndex: l.last.Node.ModifiedIndex, - } - _, err := l.client.Delete(context.Background(), l.key, delOpts) - if err != nil { - return err - } - } - return nil -} - -// Close closes the client connection -func (s *Etcd) Close() { - return -} diff --git a/vendor/github.com/docker/libkv/store/etcd/etcd_test.go b/vendor/github.com/docker/libkv/store/etcd/etcd_test.go deleted file mode 100644 index e2d224c8..00000000 --- a/vendor/github.com/docker/libkv/store/etcd/etcd_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package etcd - -import ( - "testing" - "time" - - "github.com/docker/libkv" - "github.com/docker/libkv/store" - "github.com/docker/libkv/testutils" - "github.com/stretchr/testify/assert" -) - -var ( - client = "localhost:4001" -) - -func makeEtcdClient(t *testing.T) store.Store { - kv, err := New( - []string{client}, - &store.Config{ - ConnectionTimeout: 3 * time.Second, - Username: "test", - Password: "very-secure", - }, - ) - - if err != nil { - t.Fatalf("cannot create store: %v", err) - } - - return kv -} - -func TestRegister(t *testing.T) { - Register() - - kv, err := libkv.NewStore(store.ETCD, []string{client}, nil) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*Etcd); !ok { - t.Fatal("Error registering and initializing etcd") - } -} - -func TestEtcdStore(t *testing.T) { - kv := makeEtcdClient(t) - lockKV := makeEtcdClient(t) - ttlKV := makeEtcdClient(t) - - testutils.RunTestCommon(t, kv) - testutils.RunTestAtomic(t, kv) - testutils.RunTestWatch(t, kv) - testutils.RunTestLock(t, kv) - testutils.RunTestLockTTL(t, kv, lockKV) - testutils.RunTestTTL(t, kv, ttlKV) - testutils.RunCleanup(t, kv) -} diff --git a/vendor/github.com/docker/libkv/store/mock/mock.go b/vendor/github.com/docker/libkv/store/mock/mock.go deleted file mode 100644 index 82a5b03b..00000000 --- a/vendor/github.com/docker/libkv/store/mock/mock.go +++ /dev/null @@ -1,113 +0,0 @@ -package mock - -import ( - "github.com/docker/libkv/store" - "github.com/stretchr/testify/mock" -) - -// Mock store. Mocks all Store functions using testify.Mock -type Mock struct { - mock.Mock - - // Endpoints passed to InitializeMock - Endpoints []string - - // Options passed to InitializeMock - Options *store.Config -} - -// New creates a Mock store -func New(endpoints []string, options *store.Config) (store.Store, error) { - s := &Mock{} - s.Endpoints = endpoints - s.Options = options - return s, nil -} - -// Put mock -func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error { - args := s.Mock.Called(key, value, opts) - return args.Error(0) -} - -// Get mock -func (s *Mock) Get(key string) (*store.KVPair, error) { - args := s.Mock.Called(key) - return args.Get(0).(*store.KVPair), args.Error(1) -} - -// Delete mock -func (s *Mock) Delete(key string) error { - args := s.Mock.Called(key) - return args.Error(0) -} - -// Exists mock -func (s *Mock) Exists(key string) (bool, error) { - args := s.Mock.Called(key) - return args.Bool(0), args.Error(1) -} - -// Watch mock -func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - args := s.Mock.Called(key, stopCh) - return args.Get(0).(<-chan *store.KVPair), args.Error(1) -} - -// WatchTree mock -func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - args := s.Mock.Called(prefix, stopCh) - return args.Get(0).(chan []*store.KVPair), args.Error(1) -} - -// NewLock mock -func (s *Mock) NewLock(key string, options *store.LockOptions) (store.Locker, error) { - args := s.Mock.Called(key, options) - return args.Get(0).(store.Locker), args.Error(1) -} - -// List mock -func (s *Mock) List(prefix string) ([]*store.KVPair, error) { - args := s.Mock.Called(prefix) - return args.Get(0).([]*store.KVPair), args.Error(1) -} - -// DeleteTree mock -func (s *Mock) DeleteTree(prefix string) error { - args := s.Mock.Called(prefix) - return args.Error(0) -} - -// AtomicPut mock -func (s *Mock) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) { - args := s.Mock.Called(key, value, previous, opts) - return args.Bool(0), args.Get(1).(*store.KVPair), args.Error(2) -} - -// AtomicDelete mock -func (s *Mock) AtomicDelete(key string, previous *store.KVPair) (bool, error) { - args := s.Mock.Called(key, previous) - return args.Bool(0), args.Error(1) -} - -// Lock mock implementation of Locker -type Lock struct { - mock.Mock -} - -// Lock mock -func (l *Lock) Lock(stopCh chan struct{}) (<-chan struct{}, error) { - args := l.Mock.Called(stopCh) - return args.Get(0).(<-chan struct{}), args.Error(1) -} - -// Unlock mock -func (l *Lock) Unlock() error { - args := l.Mock.Called() - return args.Error(0) -} - -// Close mock -func (s *Mock) Close() { - return -} diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go deleted file mode 100644 index ff8d4ebe..00000000 --- a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ /dev/null @@ -1,429 +0,0 @@ -package zookeeper - -import ( - "strings" - "time" - - "github.com/docker/libkv" - "github.com/docker/libkv/store" - zk "github.com/samuel/go-zookeeper/zk" -) - -const ( - // SOH control character - SOH = "\x01" - - defaultTimeout = 10 * time.Second -) - -// Zookeeper is the receiver type for -// the Store interface -type Zookeeper struct { - timeout time.Duration - client *zk.Conn -} - -type zookeeperLock struct { - client *zk.Conn - lock *zk.Lock - key string - value []byte -} - -// Register registers zookeeper to libkv -func Register() { - libkv.AddStore(store.ZK, New) -} - -// New creates a new Zookeeper client given a -// list of endpoints and an optional tls config -func New(endpoints []string, options *store.Config) (store.Store, error) { - s := &Zookeeper{} - s.timeout = defaultTimeout - - // Set options - if options != nil { - if options.ConnectionTimeout != 0 { - s.setTimeout(options.ConnectionTimeout) - } - } - - // Connect to Zookeeper - conn, _, err := zk.Connect(endpoints, s.timeout) - if err != nil { - return nil, err - } - s.client = conn - - return s, nil -} - -// setTimeout sets the timeout for connecting to Zookeeper -func (s *Zookeeper) setTimeout(time time.Duration) { - s.timeout = time -} - -// Get the value at "key", returns the last modified index -// to use in conjunction to Atomic calls -func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(s.normalize(key)) - - if err != nil { - if err == zk.ErrNoNode { - return nil, store.ErrKeyNotFound - } - return nil, err - } - - // FIXME handle very rare cases where Get returns the - // SOH control character instead of the actual value - if string(resp) == SOH { - return s.Get(store.Normalize(key)) - } - - pair = &store.KVPair{ - Key: key, - Value: resp, - LastIndex: uint64(meta.Version), - } - - return pair, nil -} - -// createFullPath creates the entire path for a directory -// that does not exist -func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { - for i := 1; i <= len(path); i++ { - newpath := "/" + strings.Join(path[:i], "/") - if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) - return err - } - _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - // Skip if node already exists - if err != zk.ErrNodeExists { - return err - } - } - } - return nil -} - -// Put a value at "key" -func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error { - fkey := s.normalize(key) - - exists, err := s.Exists(key) - if err != nil { - return err - } - - if !exists { - if opts != nil && opts.TTL > 0 { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) - } else { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) - } - } - - _, err = s.client.Set(fkey, value, -1) - return err -} - -// Delete a value at "key" -func (s *Zookeeper) Delete(key string) error { - err := s.client.Delete(s.normalize(key), -1) - if err == zk.ErrNoNode { - return store.ErrKeyNotFound - } - return err -} - -// Exists checks if the key exists inside the store -func (s *Zookeeper) Exists(key string) (bool, error) { - exists, _, err := s.client.Exists(s.normalize(key)) - if err != nil { - return false, err - } - return exists, nil -} - -// Watch for changes on a "key" -// It returns a channel that will receive changes or pass -// on errors. Upon creation, the current value will first -// be sent to the channel. Providing a non-nil stopCh can -// be used to stop watching. -func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Get the key first - pair, err := s.Get(key) - if err != nil { - return nil, err - } - - // Catch zk notifications and fire changes into the channel. - watchCh := make(chan *store.KVPair) - go func() { - defer close(watchCh) - - // Get returns the current value to the channel prior - // to listening to any event that may occur on that key - watchCh <- pair - for { - _, _, eventCh, err := s.client.GetW(s.normalize(key)) - if err != nil { - return - } - select { - case e := <-eventCh: - if e.Type == zk.EventNodeDataChanged { - if entry, err := s.Get(key); err == nil { - watchCh <- entry - } - } - case <-stopCh: - // There is no way to stop GetW so just quit - return - } - } - }() - - return watchCh, nil -} - -// WatchTree watches for changes on a "directory" -// It returns a channel that will receive changes or pass -// on errors. Upon creating a watch, the current childs values -// will be sent to the channel .Providing a non-nil stopCh can -// be used to stop watching. -func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // List the childrens first - entries, err := s.List(directory) - if err != nil { - return nil, err - } - - // Catch zk notifications and fire changes into the channel. - watchCh := make(chan []*store.KVPair) - go func() { - defer close(watchCh) - - // List returns the children values to the channel - // prior to listening to any events that may occur - // on those keys - watchCh <- entries - - for { - _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) - if err != nil { - return - } - select { - case e := <-eventCh: - if e.Type == zk.EventNodeChildrenChanged { - if kv, err := s.List(directory); err == nil { - watchCh <- kv - } - } - case <-stopCh: - // There is no way to stop GetW so just quit - return - } - } - }() - - return watchCh, nil -} - -// List child nodes of a given directory -func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { - keys, stat, err := s.client.Children(s.normalize(directory)) - if err != nil { - if err == zk.ErrNoNode { - return nil, store.ErrKeyNotFound - } - return nil, err - } - - kv := []*store.KVPair{} - - // FIXME Costly Get request for each child key.. - for _, key := range keys { - pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) - if err != nil { - // If node is not found: List is out of date, retry - if err == store.ErrKeyNotFound { - return s.List(directory) - } - return nil, err - } - - kv = append(kv, &store.KVPair{ - Key: key, - Value: []byte(pair.Value), - LastIndex: uint64(stat.Version), - }) - } - - return kv, nil -} - -// DeleteTree deletes a range of keys under a given directory -func (s *Zookeeper) DeleteTree(directory string) error { - pairs, err := s.List(directory) - if err != nil { - return err - } - - var reqs []interface{} - - for _, pair := range pairs { - reqs = append(reqs, &zk.DeleteRequest{ - Path: s.normalize(directory + "/" + pair.Key), - Version: -1, - }) - } - - _, err = s.client.Multi(reqs...) - return err -} - -// AtomicPut put a value at "key" if the key has not been -// modified in the meantime, throws an error if this is the case -func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) { - var lastIndex uint64 - - if previous != nil { - meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex)) - if err != nil { - // Compare Failed - if err == zk.ErrBadVersion { - return false, nil, store.ErrKeyModified - } - return false, nil, err - } - lastIndex = uint64(meta.Version) - } else { - // Interpret previous == nil as create operation. - _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - // Directory does not exist - if err == zk.ErrNoNode { - - // Create the directory - parts := store.SplitKey(strings.TrimSuffix(key, "/")) - parts = parts[:len(parts)-1] - if err = s.createFullPath(parts, false); err != nil { - // Failed to create the directory. - return false, nil, err - } - - // Create the node - if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { - // Node exist error (when previous nil) - if err == zk.ErrNodeExists { - return false, nil, store.ErrKeyExists - } - return false, nil, err - } - - } else { - // Node Exists error (when previous nil) - if err == zk.ErrNodeExists { - return false, nil, store.ErrKeyExists - } - - // Unhandled error - return false, nil, err - } - } - lastIndex = 0 // Newly created nodes have version 0. - } - - pair := &store.KVPair{ - Key: key, - Value: value, - LastIndex: lastIndex, - } - - return true, pair, nil -} - -// AtomicDelete deletes a value at "key" if the key -// has not been modified in the meantime, throws an -// error if this is the case -func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) { - if previous == nil { - return false, store.ErrPreviousNotSpecified - } - - err := s.client.Delete(s.normalize(key), int32(previous.LastIndex)) - if err != nil { - // Key not found - if err == zk.ErrNoNode { - return false, store.ErrKeyNotFound - } - // Compare failed - if err == zk.ErrBadVersion { - return false, store.ErrKeyModified - } - // General store error - return false, err - } - return true, nil -} - -// NewLock returns a handle to a lock struct which can -// be used to provide mutual exclusion on a key -func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { - value := []byte("") - - // Apply options - if options != nil { - if options.Value != nil { - value = options.Value - } - } - - lock = &zookeeperLock{ - client: s.client, - key: s.normalize(key), - value: value, - lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)), - } - - return lock, err -} - -// Lock attempts to acquire the lock and blocks while -// doing so. It returns a channel that is closed if our -// lock is lost or if an error occurs -func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { - err := l.lock.Lock() - - if err == nil { - // We hold the lock, we can set our value - // FIXME: The value is left behind - // (problematic for leader election) - _, err = l.client.Set(l.key, l.value, -1) - } - - return make(chan struct{}), err -} - -// Unlock the "key". Calling unlock while -// not holding the lock will throw an error -func (l *zookeeperLock) Unlock() error { - return l.lock.Unlock() -} - -// Close closes the client connection -func (s *Zookeeper) Close() { - s.client.Close() -} - -// Normalize the key for usage in Zookeeper -func (s *Zookeeper) normalize(key string) string { - key = store.Normalize(key) - return strings.TrimSuffix(key, "/") -} diff --git a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go deleted file mode 100644 index c36087e0..00000000 --- a/vendor/github.com/docker/libkv/store/zookeeper/zookeeper_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package zookeeper - -import ( - "testing" - "time" - - "github.com/docker/libkv" - "github.com/docker/libkv/store" - "github.com/docker/libkv/testutils" - "github.com/stretchr/testify/assert" -) - -var ( - client = "localhost:2181" -) - -func makeZkClient(t *testing.T) store.Store { - kv, err := New( - []string{client}, - &store.Config{ - ConnectionTimeout: 3 * time.Second, - }, - ) - - if err != nil { - t.Fatalf("cannot create store: %v", err) - } - - return kv -} - -func TestRegister(t *testing.T) { - Register() - - kv, err := libkv.NewStore(store.ZK, []string{client}, nil) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*Zookeeper); !ok { - t.Fatal("Error registering and initializing zookeeper") - } -} - -func TestZkStore(t *testing.T) { - kv := makeZkClient(t) - ttlKV := makeZkClient(t) - - testutils.RunTestCommon(t, kv) - testutils.RunTestAtomic(t, kv) - testutils.RunTestWatch(t, kv) - testutils.RunTestLock(t, kv) - testutils.RunTestTTL(t, kv, ttlKV) - testutils.RunCleanup(t, kv) -} diff --git a/vendor/github.com/docker/libkv/testutils/utils.go b/vendor/github.com/docker/libkv/testutils/utils.go deleted file mode 100644 index 5385bac6..00000000 --- a/vendor/github.com/docker/libkv/testutils/utils.go +++ /dev/null @@ -1,622 +0,0 @@ -package testutils - -import ( - "fmt" - "testing" - "time" - - "github.com/docker/libkv/store" - "github.com/stretchr/testify/assert" -) - -// RunTestCommon tests the minimal required APIs which -// should be supported by all K/V backends -func RunTestCommon(t *testing.T, kv store.Store) { - testPutGetDeleteExists(t, kv) - testList(t, kv) - testDeleteTree(t, kv) -} - -// RunTestAtomic tests the Atomic operations by the K/V -// backends -func RunTestAtomic(t *testing.T, kv store.Store) { - testAtomicPut(t, kv) - testAtomicPutCreate(t, kv) - testAtomicPutWithSlashSuffixKey(t, kv) - testAtomicDelete(t, kv) -} - -// RunTestWatch tests the watch/monitor APIs supported -// by the K/V backends. -func RunTestWatch(t *testing.T, kv store.Store) { - testWatch(t, kv) - testWatchTree(t, kv) -} - -// RunTestLock tests the KV pair Lock/Unlock APIs supported -// by the K/V backends. -func RunTestLock(t *testing.T, kv store.Store) { - testLockUnlock(t, kv) -} - -// RunTestLockTTL tests the KV pair Lock with TTL APIs supported -// by the K/V backends. -func RunTestLockTTL(t *testing.T, kv store.Store, backup store.Store) { - testLockTTL(t, kv, backup) -} - -// RunTestTTL tests the TTL functionality of the K/V backend. -func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { - testPutTTL(t, kv, backup) -} - -func testPutGetDeleteExists(t *testing.T, kv store.Store) { - // Get a not exist key should return ErrKeyNotFound - pair, err := kv.Get("testPutGetDelete_not_exist_key") - assert.Equal(t, store.ErrKeyNotFound, err) - - value := []byte("bar") - for _, key := range []string{ - "testPutGetDeleteExists", - "testPutGetDeleteExists/", - "testPutGetDeleteExists/testbar/", - "testPutGetDeleteExists/testbar/testfoobar", - } { - failMsg := fmt.Sprintf("Fail key %s", key) - - // Put the key - err = kv.Put(key, value, nil) - assert.NoError(t, err, failMsg) - - // Get should return the value and an incremented index - pair, err = kv.Get(key) - assert.NoError(t, err, failMsg) - if assert.NotNil(t, pair, failMsg) { - assert.NotNil(t, pair.Value, failMsg) - } - assert.Equal(t, pair.Value, value, failMsg) - assert.NotEqual(t, pair.LastIndex, 0, failMsg) - - // Exists should return true - exists, err := kv.Exists(key) - assert.NoError(t, err, failMsg) - assert.True(t, exists, failMsg) - - // Delete the key - err = kv.Delete(key) - assert.NoError(t, err, failMsg) - - // Get should fail - pair, err = kv.Get(key) - assert.Error(t, err, failMsg) - assert.Nil(t, pair, failMsg) - - // Exists should return false - exists, err = kv.Exists(key) - assert.NoError(t, err, failMsg) - assert.False(t, exists, failMsg) - } -} - -func testWatch(t *testing.T, kv store.Store) { - key := "testWatch" - value := []byte("world") - newValue := []byte("world!") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - stopCh := make(<-chan struct{}) - events, err := kv.Watch(key, stopCh) - assert.NoError(t, err) - assert.NotNil(t, events) - - // Update loop - go func() { - timeout := time.After(1 * time.Second) - tick := time.Tick(250 * time.Millisecond) - for { - select { - case <-timeout: - return - case <-tick: - err := kv.Put(key, newValue, nil) - if assert.NoError(t, err) { - continue - } - return - } - } - }() - - // Check for updates - eventCount := 1 - for { - select { - case event := <-events: - assert.NotNil(t, event) - if eventCount == 1 { - assert.Equal(t, event.Key, key) - assert.Equal(t, event.Value, value) - } else { - assert.Equal(t, event.Key, key) - assert.Equal(t, event.Value, newValue) - } - eventCount++ - // We received all the events we wanted to check - if eventCount >= 4 { - return - } - case <-time.After(4 * time.Second): - t.Fatal("Timeout reached") - return - } - } -} - -func testWatchTree(t *testing.T, kv store.Store) { - dir := "testWatchTree" - - node1 := "testWatchTree/node1" - value1 := []byte("node1") - - node2 := "testWatchTree/node2" - value2 := []byte("node2") - - node3 := "testWatchTree/node3" - value3 := []byte("node3") - - err := kv.Put(node1, value1, nil) - assert.NoError(t, err) - err = kv.Put(node2, value2, nil) - assert.NoError(t, err) - err = kv.Put(node3, value3, nil) - assert.NoError(t, err) - - stopCh := make(<-chan struct{}) - events, err := kv.WatchTree(dir, stopCh) - assert.NoError(t, err) - assert.NotNil(t, events) - - // Update loop - go func() { - timeout := time.After(500 * time.Millisecond) - for { - select { - case <-timeout: - err := kv.Delete(node3) - assert.NoError(t, err) - return - } - } - }() - - // Check for updates - eventCount := 1 - for { - select { - case event := <-events: - assert.NotNil(t, event) - // We received the Delete event on a child node - // Exit test successfully - if eventCount == 2 { - return - } - eventCount++ - case <-time.After(4 * time.Second): - t.Fatal("Timeout reached") - return - } - } -} - -func testAtomicPut(t *testing.T, kv store.Store) { - key := "testAtomicPut" - value := []byte("world") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // This CAS should fail: previous exists. - success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil) - assert.Error(t, err) - assert.False(t, success) - - // This CAS should succeed - success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil) - assert.NoError(t, err) - assert.True(t, success) - - // This CAS should fail, key exists. - pair.LastIndex = 6744 - success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil) - assert.Error(t, err) - assert.False(t, success) -} - -func testAtomicPutCreate(t *testing.T, kv store.Store) { - // Use a key in a new directory to ensure Stores will create directories - // that don't yet exist. - key := "testAtomicPutCreate/create" - value := []byte("putcreate") - - // AtomicPut the key, previous = nil indicates create. - success, _, err := kv.AtomicPut(key, value, nil, nil) - assert.NoError(t, err) - assert.True(t, success) - - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - - // Attempting to create again should fail. - success, _, err = kv.AtomicPut(key, value, nil, nil) - assert.Error(t, store.ErrKeyExists) - assert.False(t, success) - - // This CAS should succeed, since it has the value from Get() - success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil) - assert.NoError(t, err) - assert.True(t, success) -} - -func testAtomicPutWithSlashSuffixKey(t *testing.T, kv store.Store) { - k1 := "testAtomicPutWithSlashSuffixKey/key/" - success, _, err := kv.AtomicPut(k1, []byte{}, nil, nil) - assert.Nil(t, err) - assert.True(t, success) -} - -func testAtomicDelete(t *testing.T, kv store.Store) { - key := "testAtomicDelete" - value := []byte("world") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - tempIndex := pair.LastIndex - - // AtomicDelete should fail - pair.LastIndex = 6744 - success, err := kv.AtomicDelete(key, pair) - assert.Error(t, err) - assert.False(t, success) - - // AtomicDelete should succeed - pair.LastIndex = tempIndex - success, err = kv.AtomicDelete(key, pair) - assert.NoError(t, err) - assert.True(t, success) - - // Delete a non-existent key; should fail - success, err = kv.AtomicDelete(key, pair) - assert.Error(t, store.ErrKeyNotFound) - assert.False(t, success) -} - -func testLockUnlock(t *testing.T, kv store.Store) { - key := "testLockUnlock" - value := []byte("bar") - - // We should be able to create a new lock on key - lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second}) - assert.NoError(t, err) - assert.NotNil(t, lock) - - // Lock should successfully succeed or block - lockChan, err := lock.Lock(nil) - assert.NoError(t, err) - assert.NotNil(t, lockChan) - - // Get should work - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // Unlock should succeed - err = lock.Unlock() - assert.NoError(t, err) - - // Lock should succeed again - lockChan, err = lock.Lock(nil) - assert.NoError(t, err) - assert.NotNil(t, lockChan) - - // Get should work - pair, err = kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - err = lock.Unlock() - assert.NoError(t, err) -} - -func testLockTTL(t *testing.T, kv store.Store, otherConn store.Store) { - key := "testLockTTL" - value := []byte("bar") - - renewCh := make(chan struct{}) - - // We should be able to create a new lock on key - lock, err := otherConn.NewLock(key, &store.LockOptions{ - Value: value, - TTL: 2 * time.Second, - RenewLock: renewCh, - }) - assert.NoError(t, err) - assert.NotNil(t, lock) - - // Lock should successfully succeed - lockChan, err := lock.Lock(nil) - assert.NoError(t, err) - assert.NotNil(t, lockChan) - - // Get should work - pair, err := otherConn.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - time.Sleep(3 * time.Second) - - done := make(chan struct{}) - stop := make(chan struct{}) - - value = []byte("foobar") - - // Create a new lock with another connection - lock, err = kv.NewLock( - key, - &store.LockOptions{ - Value: value, - TTL: 3 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, lock) - - // Lock should block, the session on the lock - // is still active and renewed periodically - go func(<-chan struct{}) { - _, _ = lock.Lock(stop) - done <- struct{}{} - }(done) - - select { - case _ = <-done: - t.Fatal("Lock succeeded on a key that is supposed to be locked by another client") - case <-time.After(4 * time.Second): - // Stop requesting the lock as we are blocked as expected - stop <- struct{}{} - break - } - - // Close the connection - otherConn.Close() - - // Force stop the session renewal for the lock - close(renewCh) - - // Let the session on the lock expire - time.Sleep(3 * time.Second) - locked := make(chan struct{}) - - // Lock should now succeed for the other client - go func(<-chan struct{}) { - lockChan, err = lock.Lock(nil) - assert.NoError(t, err) - assert.NotNil(t, lockChan) - locked <- struct{}{} - }(locked) - - select { - case _ = <-locked: - break - case <-time.After(4 * time.Second): - t.Fatal("Unable to take the lock, timed out") - } - - // Get should work with the new value - pair, err = kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - err = lock.Unlock() - assert.NoError(t, err) -} - -func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) { - firstKey := "testPutTTL" - firstValue := []byte("foo") - - secondKey := "second" - secondValue := []byte("bar") - - // Put the first key with the Ephemeral flag - err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second}) - assert.NoError(t, err) - - // Put a second key with the Ephemeral flag - err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second}) - assert.NoError(t, err) - - // Get on firstKey should work - pair, err := kv.Get(firstKey) - assert.NoError(t, err) - assert.NotNil(t, pair) - - // Get on secondKey should work - pair, err = kv.Get(secondKey) - assert.NoError(t, err) - assert.NotNil(t, pair) - - // Close the connection - otherConn.Close() - - // Let the session expire - time.Sleep(3 * time.Second) - - // Get on firstKey shouldn't work - pair, err = kv.Get(firstKey) - assert.Error(t, err) - assert.Nil(t, pair) - - // Get on secondKey shouldn't work - pair, err = kv.Get(secondKey) - assert.Error(t, err) - assert.Nil(t, pair) -} - -func testList(t *testing.T, kv store.Store) { - prefix := "testList" - - firstKey := "testList/first" - firstValue := []byte("first") - - secondKey := "testList/second" - secondValue := []byte("second") - - // Put the first key - err := kv.Put(firstKey, firstValue, nil) - assert.NoError(t, err) - - // Put the second key - err = kv.Put(secondKey, secondValue, nil) - assert.NoError(t, err) - - // List should work and return the two correct values - for _, parent := range []string{prefix, prefix + "/"} { - pairs, err := kv.List(parent) - assert.NoError(t, err) - if assert.NotNil(t, pairs) { - assert.Equal(t, len(pairs), 2) - } - - // Check pairs, those are not necessarily in Put order - for _, pair := range pairs { - if pair.Key == firstKey { - assert.Equal(t, pair.Value, firstValue) - } - if pair.Key == secondKey { - assert.Equal(t, pair.Value, secondValue) - } - } - } - - // List should fail: the key does not exist - pairs, err := kv.List("idontexist") - assert.Equal(t, store.ErrKeyNotFound, err) - assert.Nil(t, pairs) -} - -func testDeleteTree(t *testing.T, kv store.Store) { - prefix := "testDeleteTree" - - firstKey := "testDeleteTree/first" - firstValue := []byte("first") - - secondKey := "testDeleteTree/second" - secondValue := []byte("second") - - // Put the first key - err := kv.Put(firstKey, firstValue, nil) - assert.NoError(t, err) - - // Put the second key - err = kv.Put(secondKey, secondValue, nil) - assert.NoError(t, err) - - // Get should work on the first Key - pair, err := kv.Get(firstKey) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, firstValue) - assert.NotEqual(t, pair.LastIndex, 0) - - // Get should work on the second Key - pair, err = kv.Get(secondKey) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, secondValue) - assert.NotEqual(t, pair.LastIndex, 0) - - // Delete Values under directory `nodes` - err = kv.DeleteTree(prefix) - assert.NoError(t, err) - - // Get should fail on both keys - pair, err = kv.Get(firstKey) - assert.Error(t, err) - assert.Nil(t, pair) - - pair, err = kv.Get(secondKey) - assert.Error(t, err) - assert.Nil(t, pair) -} - -// RunCleanup cleans up keys introduced by the tests -func RunCleanup(t *testing.T, kv store.Store) { - for _, key := range []string{ - "testAtomicPutWithSlashSuffixKey", - "testPutGetDeleteExists", - "testWatch", - "testWatchTree", - "testAtomicPut", - "testAtomicPutCreate", - "testAtomicDelete", - "testLockUnlock", - "testLockTTL", - "testPutTTL", - "testList", - "testDeleteTree", - } { - err := kv.DeleteTree(key) - assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err)) - err = kv.Delete(key) - assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err)) - } -} |
