From ab59ea091a2374653ea8aeb5ed7675de61c08461 Mon Sep 17 00:00:00 2001 From: Stuart Clark Date: Thu, 3 Aug 2017 01:23:08 +0100 Subject: Add Consul & BoltDB datasource support (#178) * Add libkv support * Add vendoring --- vendor/github.com/docker/libkv/.travis.yml | 31 + vendor/github.com/docker/libkv/LICENSE.code | 191 +++++++ vendor/github.com/docker/libkv/LICENSE.docs | 425 ++++++++++++++ vendor/github.com/docker/libkv/MAINTAINERS | 40 ++ vendor/github.com/docker/libkv/README.md | 107 ++++ .../github.com/docker/libkv/docs/compatibility.md | 82 +++ vendor/github.com/docker/libkv/docs/examples.md | 157 ++++++ vendor/github.com/docker/libkv/libkv.go | 40 ++ vendor/github.com/docker/libkv/libkv_test.go | 24 + vendor/github.com/docker/libkv/script/.validate | 33 ++ vendor/github.com/docker/libkv/script/coverage | 21 + .../docker/libkv/script/travis_consul.sh | 18 + .../github.com/docker/libkv/script/travis_etcd.sh | 11 + vendor/github.com/docker/libkv/script/travis_zk.sh | 12 + .../github.com/docker/libkv/script/validate-gofmt | 30 + .../github.com/docker/libkv/store/boltdb/boltdb.go | 474 ++++++++++++++++ .../docker/libkv/store/boltdb/boltdb_test.go | 144 +++++ .../github.com/docker/libkv/store/consul/consul.go | 558 ++++++++++++++++++ .../docker/libkv/store/consul/consul_test.go | 84 +++ vendor/github.com/docker/libkv/store/helpers.go | 47 ++ vendor/github.com/docker/libkv/store/mock/mock.go | 113 ++++ vendor/github.com/docker/libkv/store/store.go | 132 +++++ vendor/github.com/docker/libkv/testutils/utils.go | 622 +++++++++++++++++++++ 23 files changed, 3396 insertions(+) create mode 100644 vendor/github.com/docker/libkv/.travis.yml create mode 100644 vendor/github.com/docker/libkv/LICENSE.code create mode 100644 vendor/github.com/docker/libkv/LICENSE.docs create mode 100644 vendor/github.com/docker/libkv/MAINTAINERS create mode 100644 vendor/github.com/docker/libkv/README.md create mode 100644 vendor/github.com/docker/libkv/docs/compatibility.md create mode 100644 vendor/github.com/docker/libkv/docs/examples.md create mode 100644 vendor/github.com/docker/libkv/libkv.go create mode 100644 vendor/github.com/docker/libkv/libkv_test.go create mode 100644 vendor/github.com/docker/libkv/script/.validate create mode 100755 vendor/github.com/docker/libkv/script/coverage create mode 100755 vendor/github.com/docker/libkv/script/travis_consul.sh create mode 100755 vendor/github.com/docker/libkv/script/travis_etcd.sh create mode 100755 vendor/github.com/docker/libkv/script/travis_zk.sh create mode 100755 vendor/github.com/docker/libkv/script/validate-gofmt create mode 100644 vendor/github.com/docker/libkv/store/boltdb/boltdb.go create mode 100644 vendor/github.com/docker/libkv/store/boltdb/boltdb_test.go create mode 100644 vendor/github.com/docker/libkv/store/consul/consul.go create mode 100644 vendor/github.com/docker/libkv/store/consul/consul_test.go create mode 100644 vendor/github.com/docker/libkv/store/helpers.go create mode 100644 vendor/github.com/docker/libkv/store/mock/mock.go create mode 100644 vendor/github.com/docker/libkv/store/store.go create mode 100644 vendor/github.com/docker/libkv/testutils/utils.go (limited to 'vendor/github.com/docker/libkv') diff --git a/vendor/github.com/docker/libkv/.travis.yml b/vendor/github.com/docker/libkv/.travis.yml new file mode 100644 index 00000000..a7a3bcff --- /dev/null +++ b/vendor/github.com/docker/libkv/.travis.yml @@ -0,0 +1,31 @@ +language: go + +go: + - 1.7.1 + +# let us have speedy Docker-based Travis workers +sudo: false + +before_install: + # Symlink below is needed for Travis CI to work correctly on personal forks of libkv + - ln -s $HOME/gopath/src/github.com/${TRAVIS_REPO_SLUG///libkv/} $HOME/gopath/src/github.com/docker + - go get golang.org/x/tools/cmd/cover + - go get github.com/mattn/goveralls + - go get github.com/golang/lint/golint + - go get github.com/GeertJohan/fgt + +before_script: + - script/travis_consul.sh 0.6.3 + - script/travis_etcd.sh 3.0.0 + - script/travis_zk.sh 3.5.1-alpha + +script: + - ./consul agent -server -bootstrap -advertise=127.0.0.1 -data-dir /tmp/consul -config-file=./config.json 1>/dev/null & + - ./etcd/etcd --listen-client-urls 'http://0.0.0.0:4001' --advertise-client-urls 'http://127.0.0.1:4001' >/dev/null 2>&1 & + - ./zk/bin/zkServer.sh start ./zk/conf/zoo.cfg 1> /dev/null + - script/validate-gofmt + - go vet ./... + - fgt golint ./... + - go test -v -race ./... + - script/coverage + - goveralls -service=travis-ci -coverprofile=goverage.report diff --git a/vendor/github.com/docker/libkv/LICENSE.code b/vendor/github.com/docker/libkv/LICENSE.code new file mode 100644 index 00000000..34c4ea7c --- /dev/null +++ b/vendor/github.com/docker/libkv/LICENSE.code @@ -0,0 +1,191 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2014-2016 Docker, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/docker/libkv/LICENSE.docs b/vendor/github.com/docker/libkv/LICENSE.docs new file mode 100644 index 00000000..e26cd4fc --- /dev/null +++ b/vendor/github.com/docker/libkv/LICENSE.docs @@ -0,0 +1,425 @@ +Attribution-ShareAlike 4.0 International + +======================================================================= + +Creative Commons Corporation ("Creative Commons") is not a law firm and +does not provide legal services or legal advice. Distribution of +Creative Commons public licenses does not create a lawyer-client or +other relationship. Creative Commons makes its licenses and related +information available on an "as-is" basis. Creative Commons gives no +warranties regarding its licenses, any material licensed under their +terms and conditions, or any related information. Creative Commons +disclaims all liability for damages resulting from their use to the +fullest extent possible. + +Using Creative Commons Public Licenses + +Creative Commons public licenses provide a standard set of terms and +conditions that creators and other rights holders may use to share +original works of authorship and other material subject to copyright +and certain other rights specified in the public license below. The +following considerations are for informational purposes only, are not +exhaustive, and do not form part of our licenses. + + Considerations for licensors: Our public licenses are + intended for use by those authorized to give the public + permission to use material in ways otherwise restricted by + copyright and certain other rights. Our licenses are + irrevocable. Licensors should read and understand the terms + and conditions of the license they choose before applying it. + Licensors should also secure all rights necessary before + applying our licenses so that the public can reuse the + material as expected. Licensors should clearly mark any + material not subject to the license. This includes other CC- + licensed material, or material used under an exception or + limitation to copyright. More considerations for licensors: + wiki.creativecommons.org/Considerations_for_licensors + + Considerations for the public: By using one of our public + licenses, a licensor grants the public permission to use the + licensed material under specified terms and conditions. If + the licensor's permission is not necessary for any reason--for + example, because of any applicable exception or limitation to + copyright--then that use is not regulated by the license. Our + licenses grant only permissions under copyright and certain + other rights that a licensor has authority to grant. Use of + the licensed material may still be restricted for other + reasons, including because others have copyright or other + rights in the material. A licensor may make special requests, + such as asking that all changes be marked or described. + Although not required by our licenses, you are encouraged to + respect those requests where reasonable. More_considerations + for the public: + wiki.creativecommons.org/Considerations_for_licensees + +======================================================================= + +Creative Commons Attribution-ShareAlike 4.0 International Public +License + +By exercising the Licensed Rights (defined below), You accept and agree +to be bound by the terms and conditions of this Creative Commons +Attribution-ShareAlike 4.0 International Public License ("Public +License"). To the extent this Public License may be interpreted as a +contract, You are granted the Licensed Rights in consideration of Your +acceptance of these terms and conditions, and the Licensor grants You +such rights in consideration of benefits the Licensor receives from +making the Licensed Material available under these terms and +conditions. + + +Section 1 -- Definitions. + + a. Adapted Material means material subject to Copyright and Similar + Rights that is derived from or based upon the Licensed Material + and in which the Licensed Material is translated, altered, + arranged, transformed, or otherwise modified in a manner requiring + permission under the Copyright and Similar Rights held by the + Licensor. For purposes of this Public License, where the Licensed + Material is a musical work, performance, or sound recording, + Adapted Material is always produced where the Licensed Material is + synched in timed relation with a moving image. + + b. Adapter's License means the license You apply to Your Copyright + and Similar Rights in Your contributions to Adapted Material in + accordance with the terms and conditions of this Public License. + + c. BY-SA Compatible License means a license listed at + creativecommons.org/compatiblelicenses, approved by Creative + Commons as essentially the equivalent of this Public License. + + d. Copyright and Similar Rights means copyright and/or similar rights + closely related to copyright including, without limitation, + performance, broadcast, sound recording, and Sui Generis Database + Rights, without regard to how the rights are labeled or + categorized. For purposes of this Public License, the rights + specified in Section 2(b)(1)-(2) are not Copyright and Similar + Rights. + + e. Effective Technological Measures means those measures that, in the + absence of proper authority, may not be circumvented under laws + fulfilling obligations under Article 11 of the WIPO Copyright + Treaty adopted on December 20, 1996, and/or similar international + agreements. + + f. Exceptions and Limitations means fair use, fair dealing, and/or + any other exception or limitation to Copyright and Similar Rights + that applies to Your use of the Licensed Material. + + g. License Elements means the license attributes listed in the name + of a Creative Commons Public License. The License Elements of this + Public License are Attribution and ShareAlike. + + h. Licensed Material means the artistic or literary work, database, + or other material to which the Licensor applied this Public + License. + + i. Licensed Rights means the rights granted to You subject to the + terms and conditions of this Public License, which are limited to + all Copyright and Similar Rights that apply to Your use of the + Licensed Material and that the Licensor has authority to license. + + j. Licensor means the individual(s) or entity(ies) granting rights + under this Public License. + + k. Share means to provide material to the public by any means or + process that requires permission under the Licensed Rights, such + as reproduction, public display, public performance, distribution, + dissemination, communication, or importation, and to make material + available to the public including in ways that members of the + public may access the material from a place and at a time + individually chosen by them. + + l. Sui Generis Database Rights means rights other than copyright + resulting from Directive 96/9/EC of the European Parliament and of + the Council of 11 March 1996 on the legal protection of databases, + as amended and/or succeeded, as well as other essentially + equivalent rights anywhere in the world. + + m. You means the individual or entity exercising the Licensed Rights + under this Public License. Your has a corresponding meaning. + + +Section 2 -- Scope. + + a. License grant. + + 1. Subject to the terms and conditions of this Public License, + the Licensor hereby grants You a worldwide, royalty-free, + non-sublicensable, non-exclusive, irrevocable license to + exercise the Licensed Rights in the Licensed Material to: + + a. reproduce and Share the Licensed Material, in whole or + in part; and + + b. produce, reproduce, and Share Adapted Material. + + 2. Exceptions and Limitations. For the avoidance of doubt, where + Exceptions and Limitations apply to Your use, this Public + License does not apply, and You do not need to comply with + its terms and conditions. + + 3. Term. The term of this Public License is specified in Section + 6(a). + + 4. Media and formats; technical modifications allowed. The + Licensor authorizes You to exercise the Licensed Rights in + all media and formats whether now known or hereafter created, + and to make technical modifications necessary to do so. The + Licensor waives and/or agrees not to assert any right or + authority to forbid You from making technical modifications + necessary to exercise the Licensed Rights, including + technical modifications necessary to circumvent Effective + Technological Measures. For purposes of this Public License, + simply making modifications authorized by this Section 2(a) + (4) never produces Adapted Material. + + 5. Downstream recipients. + + a. Offer from the Licensor -- Licensed Material. Every + recipient of the Licensed Material automatically + receives an offer from the Licensor to exercise the + Licensed Rights under the terms and conditions of this + Public License. + + b. Additional offer from the Licensor -- Adapted Material. + Every recipient of Adapted Material from You + automatically receives an offer from the Licensor to + exercise the Licensed Rights in the Adapted Material + under the conditions of the Adapter's License You apply. + + c. No downstream restrictions. You may not offer or impose + any additional or different terms or conditions on, or + apply any Effective Technological Measures to, the + Licensed Material if doing so restricts exercise of the + Licensed Rights by any recipient of the Licensed + Material. + + 6. No endorsement. Nothing in this Public License constitutes or + may be construed as permission to assert or imply that You + are, or that Your use of the Licensed Material is, connected + with, or sponsored, endorsed, or granted official status by, + the Licensor or others designated to receive attribution as + provided in Section 3(a)(1)(A)(i). + + b. Other rights. + + 1. Moral rights, such as the right of integrity, are not + licensed under this Public License, nor are publicity, + privacy, and/or other similar personality rights; however, to + the extent possible, the Licensor waives and/or agrees not to + assert any such rights held by the Licensor to the limited + extent necessary to allow You to exercise the Licensed + Rights, but not otherwise. + + 2. Patent and trademark rights are not licensed under this + Public License. + + 3. To the extent possible, the Licensor waives any right to + collect royalties from You for the exercise of the Licensed + Rights, whether directly or through a collecting society + under any voluntary or waivable statutory or compulsory + licensing scheme. In all other cases the Licensor expressly + reserves any right to collect such royalties. + + +Section 3 -- License Conditions. + +Your exercise of the Licensed Rights is expressly made subject to the +following conditions. + + a. Attribution. + + 1. If You Share the Licensed Material (including in modified + form), You must: + + a. retain the following if it is supplied by the Licensor + with the Licensed Material: + + i. identification of the creator(s) of the Licensed + Material and any others designated to receive + attribution, in any reasonable manner requested by + the Licensor (including by pseudonym if + designated); + + ii. a copyright notice; + + iii. a notice that refers to this Public License; + + iv. a notice that refers to the disclaimer of + warranties; + + v. a URI or hyperlink to the Licensed Material to the + extent reasonably practicable; + + b. indicate if You modified the Licensed Material and + retain an indication of any previous modifications; and + + c. indicate the Licensed Material is licensed under this + Public License, and include the text of, or the URI or + hyperlink to, this Public License. + + 2. You may satisfy the conditions in Section 3(a)(1) in any + reasonable manner based on the medium, means, and context in + which You Share the Licensed Material. For example, it may be + reasonable to satisfy the conditions by providing a URI or + hyperlink to a resource that includes the required + information. + + 3. If requested by the Licensor, You must remove any of the + information required by Section 3(a)(1)(A) to the extent + reasonably practicable. + + b. ShareAlike. + + In addition to the conditions in Section 3(a), if You Share + Adapted Material You produce, the following conditions also apply. + + 1. The Adapter's License You apply must be a Creative Commons + license with the same License Elements, this version or + later, or a BY-SA Compatible License. + + 2. You must include the text of, or the URI or hyperlink to, the + Adapter's License You apply. You may satisfy this condition + in any reasonable manner based on the medium, means, and + context in which You Share Adapted Material. + + 3. You may not offer or impose any additional or different terms + or conditions on, or apply any Effective Technological + Measures to, Adapted Material that restrict exercise of the + rights granted under the Adapter's License You apply. + + +Section 4 -- Sui Generis Database Rights. + +Where the Licensed Rights include Sui Generis Database Rights that +apply to Your use of the Licensed Material: + + a. for the avoidance of doubt, Section 2(a)(1) grants You the right + to extract, reuse, reproduce, and Share all or a substantial + portion of the contents of the database; + + b. if You include all or a substantial portion of the database + contents in a database in which You have Sui Generis Database + Rights, then the database in which You have Sui Generis Database + Rights (but not its individual contents) is Adapted Material, + + including for purposes of Section 3(b); and + c. You must comply with the conditions in Section 3(a) if You Share + all or a substantial portion of the contents of the database. + +For the avoidance of doubt, this Section 4 supplements and does not +replace Your obligations under this Public License where the Licensed +Rights include other Copyright and Similar Rights. + + +Section 5 -- Disclaimer of Warranties and Limitation of Liability. + + a. UNLESS OTHERWISE SEPARATELY UNDERTAKEN BY THE LICENSOR, TO THE + EXTENT POSSIBLE, THE LICENSOR OFFERS THE LICENSED MATERIAL AS-IS + AND AS-AVAILABLE, AND MAKES NO REPRESENTATIONS OR WARRANTIES OF + ANY KIND CONCERNING THE LICENSED MATERIAL, WHETHER EXPRESS, + IMPLIED, STATUTORY, OR OTHER. THIS INCLUDES, WITHOUT LIMITATION, + WARRANTIES OF TITLE, MERCHANTABILITY, FITNESS FOR A PARTICULAR + PURPOSE, NON-INFRINGEMENT, ABSENCE OF LATENT OR OTHER DEFECTS, + ACCURACY, OR THE PRESENCE OR ABSENCE OF ERRORS, WHETHER OR NOT + KNOWN OR DISCOVERABLE. WHERE DISCLAIMERS OF WARRANTIES ARE NOT + ALLOWED IN FULL OR IN PART, THIS DISCLAIMER MAY NOT APPLY TO YOU. + + b. TO THE EXTENT POSSIBLE, IN NO EVENT WILL THE LICENSOR BE LIABLE + TO YOU ON ANY LEGAL THEORY (INCLUDING, WITHOUT LIMITATION, + NEGLIGENCE) OR OTHERWISE FOR ANY DIRECT, SPECIAL, INDIRECT, + INCIDENTAL, CONSEQUENTIAL, PUNITIVE, EXEMPLARY, OR OTHER LOSSES, + COSTS, EXPENSES, OR DAMAGES ARISING OUT OF THIS PUBLIC LICENSE OR + USE OF THE LICENSED MATERIAL, EVEN IF THE LICENSOR HAS BEEN + ADVISED OF THE POSSIBILITY OF SUCH LOSSES, COSTS, EXPENSES, OR + DAMAGES. WHERE A LIMITATION OF LIABILITY IS NOT ALLOWED IN FULL OR + IN PART, THIS LIMITATION MAY NOT APPLY TO YOU. + + c. The disclaimer of warranties and limitation of liability provided + above shall be interpreted in a manner that, to the extent + possible, most closely approximates an absolute disclaimer and + waiver of all liability. + + +Section 6 -- Term and Termination. + + a. This Public License applies for the term of the Copyright and + Similar Rights licensed here. However, if You fail to comply with + this Public License, then Your rights under this Public License + terminate automatically. + + b. Where Your right to use the Licensed Material has terminated under + Section 6(a), it reinstates: + + 1. automatically as of the date the violation is cured, provided + it is cured within 30 days of Your discovery of the + violation; or + + 2. upon express reinstatement by the Licensor. + + For the avoidance of doubt, this Section 6(b) does not affect any + right the Licensor may have to seek remedies for Your violations + of this Public License. + + c. For the avoidance of doubt, the Licensor may also offer the + Licensed Material under separate terms or conditions or stop + distributing the Licensed Material at any time; however, doing so + will not terminate this Public License. + + d. Sections 1, 5, 6, 7, and 8 survive termination of this Public + License. + + +Section 7 -- Other Terms and Conditions. + + a. The Licensor shall not be bound by any additional or different + terms or conditions communicated by You unless expressly agreed. + + b. Any arrangements, understandings, or agreements regarding the + Licensed Material not stated herein are separate from and + independent of the terms and conditions of this Public License. + + +Section 8 -- Interpretation. + + a. For the avoidance of doubt, this Public License does not, and + shall not be interpreted to, reduce, limit, restrict, or impose + conditions on any use of the Licensed Material that could lawfully + be made without permission under this Public License. + + b. To the extent possible, if any provision of this Public License is + deemed unenforceable, it shall be automatically reformed to the + minimum extent necessary to make it enforceable. If the provision + cannot be reformed, it shall be severed from this Public License + without affecting the enforceability of the remaining terms and + conditions. + + c. No term or condition of this Public License will be waived and no + failure to comply consented to unless expressly agreed to by the + Licensor. + + d. Nothing in this Public License constitutes or may be interpreted + as a limitation upon, or waiver of, any privileges and immunities + that apply to the Licensor or You, including from the legal + processes of any jurisdiction or authority. + + +======================================================================= + +Creative Commons is not a party to its public licenses. +Notwithstanding, Creative Commons may elect to apply one of its public +licenses to material it publishes and in those instances will be +considered the "Licensor." Except for the limited purpose of indicating +that material is shared under a Creative Commons public license or as +otherwise permitted by the Creative Commons policies published at +creativecommons.org/policies, Creative Commons does not authorize the +use of the trademark "Creative Commons" or any other trademark or logo +of Creative Commons without its prior written consent including, +without limitation, in connection with any unauthorized modifications +to any of its public licenses or any other arrangements, +understandings, or agreements concerning use of licensed material. For +the avoidance of doubt, this paragraph does not form part of the public +licenses. + +Creative Commons may be contacted at creativecommons.org. diff --git a/vendor/github.com/docker/libkv/MAINTAINERS b/vendor/github.com/docker/libkv/MAINTAINERS new file mode 100644 index 00000000..4a8bbc61 --- /dev/null +++ b/vendor/github.com/docker/libkv/MAINTAINERS @@ -0,0 +1,40 @@ +# Libkv maintainers file +# +# This file describes who runs the docker/libkv project and how. +# This is a living document - if you see something out of date or missing, speak up! +# +# It is structured to be consumable by both humans and programs. +# To extract its contents programmatically, use any TOML-compliant parser. +# +# This file is compiled into the MAINTAINERS file in docker/opensource. +# +[Org] + [Org."Core maintainers"] + people = [ + "aluzzardi", + "sanimej", + "vieux", + ] + +[people] + +# A reference list of all people associated with the project. +# All other sections should refer to people by their canonical key +# in the people section. + + # ADD YOURSELF HERE IN ALPHABETICAL ORDER + + [people.aluzzardi] + Name = "Andrea Luzzardi" + Email = "al@docker.com" + GitHub = "aluzzardi" + + [people.sanimej] + Name = "Santhosh Manohar" + Email = "santhosh@docker.com" + GitHub = "sanimej" + + [people.vieux] + Name = "Victor Vieux" + Email = "vieux@docker.com" + GitHub = "vieux" diff --git a/vendor/github.com/docker/libkv/README.md b/vendor/github.com/docker/libkv/README.md new file mode 100644 index 00000000..ff2cc446 --- /dev/null +++ b/vendor/github.com/docker/libkv/README.md @@ -0,0 +1,107 @@ +# libkv + +[![GoDoc](https://godoc.org/github.com/docker/libkv?status.png)](https://godoc.org/github.com/docker/libkv) +[![Build Status](https://travis-ci.org/docker/libkv.svg?branch=master)](https://travis-ci.org/docker/libkv) +[![Coverage Status](https://coveralls.io/repos/docker/libkv/badge.svg)](https://coveralls.io/r/docker/libkv) +[![Go Report Card](https://goreportcard.com/badge/github.com/docker/libkv)](https://goreportcard.com/report/github.com/docker/libkv) + +`libkv` provides a `Go` native library to store metadata. + +The goal of `libkv` is to abstract common store operations for multiple distributed and/or local Key/Value store backends. + +For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster. + +You can also easily implement a generic *Leader Election* on top of it (see the [docker/leadership](https://github.com/docker/leadership) repository). + +As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` (**Distributed** store) and `BoltDB` (**Local** store). + +## Usage + +`libkv` is meant to be used as an abstraction layer over existing distributed Key/Value stores. It is especially useful if you plan to support `consul`, `etcd` and `zookeeper` using the same codebase. + +It is ideal if you plan for something written in Go that should support: + +- A simple metadata storage, distributed or local +- A lightweight discovery service for your nodes +- A distributed lock mechanism + +You can find examples of usage for `libkv` under in `docs/examples.go`. Optionally you can also take a look at the `docker/swarm` or `docker/libnetwork` repositories which are using `docker/libkv` for all the use cases listed above. + +## Supported versions + +`libkv` supports: +- Consul versions >= `0.5.1` because it uses Sessions with `Delete` behavior for the use of `TTLs` (mimics zookeeper's Ephemeral node support), If you don't plan to use `TTLs`: you can use Consul version `0.4.0+`. +- Etcd versions >= `2.0` because it uses the new `coreos/etcd/client`, this might change in the future as the support for `APIv3` comes along and adds more capabilities. +- Zookeeper versions >= `3.4.5`. Although this might work with previous version but this remains untested as of now. +- Boltdb, which shouldn't be subject to any version dependencies. + +## Interface + +A **storage backend** in `libkv` should implement (fully or partially) this interface: + +```go +type Store interface { + Put(key string, value []byte, options *WriteOptions) error + Get(key string) (*KVPair, error) + Delete(key string) error + Exists(key string) (bool, error) + Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) + WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) + NewLock(key string, options *LockOptions) (Locker, error) + List(directory string) ([]*KVPair, error) + DeleteTree(directory string) error + AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) + AtomicDelete(key string, previous *KVPair) (bool, error) + Close() +} +``` + +## Compatibility matrix + +Backend drivers in `libkv` are generally divided between **local drivers** and **distributed drivers**. Distributed backends offer enhanced capabilities like `Watches` and/or distributed `Locks`. + +Local drivers are usually used in complement to the distributed drivers to store informations that only needs to be available locally. + +| Calls | Consul | Etcd | Zookeeper | BoltDB | +|-----------------------|:----------:|:------:|:-----------:|:--------:| +| Put | X | X | X | X | +| Get | X | X | X | X | +| Delete | X | X | X | X | +| Exists | X | X | X | X | +| Watch | X | X | X | | +| WatchTree | X | X | X | | +| NewLock (Lock/Unlock) | X | X | X | | +| List | X | X | X | X | +| DeleteTree | X | X | X | X | +| AtomicPut | X | X | X | X | +| Close | X | X | X | X | + +## Limitations + +Distributed Key/Value stores often have different concepts for managing and formatting keys and their associated values. Even though `libkv` tries to abstract those stores aiming for some consistency, in some cases it can't be applied easily. + +Please refer to the `docs/compatibility.md` to see what are the special cases for cross-backend compatibility. + +Other than those special cases, you should expect the same experience for basic operations like `Get`/`Put`, etc. + +Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **docker/leadership** repository or the **pkg/discovery/kv** package in **docker/docker**). + +## TLS + +Only `Consul` and `etcd` have support for TLS and you should build and provide your own `config.TLS` object to feed the client. Support is planned for `zookeeper`. + +## Roadmap + +- Make the API nicer to use (using `options`) +- Provide more options (`consistency` for example) +- Improve performance (remove extras `Get`/`List` operations) +- Better key formatting +- New backends? + +## Contributing + +Want to hack on libkv? [Docker's contributions guidelines](https://github.com/docker/docker/blob/master/CONTRIBUTING.md) apply. + +## Copyright and license + +Copyright © 2014-2016 Docker, Inc. All rights reserved, except as follows. Code is released under the Apache 2.0 license. The README.md file, and files in the "docs" folder are licensed under the Creative Commons Attribution 4.0 International License under the terms and conditions set forth in the file "LICENSE.docs". You may obtain a duplicate copy of the same license, titled CC-BY-SA-4.0, at http://creativecommons.org/licenses/by/4.0/. diff --git a/vendor/github.com/docker/libkv/docs/compatibility.md b/vendor/github.com/docker/libkv/docs/compatibility.md new file mode 100644 index 00000000..c4f27e9c --- /dev/null +++ b/vendor/github.com/docker/libkv/docs/compatibility.md @@ -0,0 +1,82 @@ +#Cross-Backend Compatibility + +The value of `libkv` is not to duplicate the code for programs that should support multiple distributed K/V stores like the classic `Consul`/`etcd`/`zookeeper` trio. + +This document provides with general guidelines for users willing to support those backends with the same code using `libkv`. + +Please note that most of those workarounds are going to disappear in the future with `etcd` APIv3. + +##Etcd directory/key distinction + +`etcd` with APIv2 makes the distinction between keys and directories. The result with `libkv` is that when using the etcd driver: + +- You cannot store values on directories +- You cannot invoke `WatchTree` (watching on child values), on a regular key + +This is fundamentaly different than `Consul` and `zookeeper` which are more permissive and allow the same set of operations on keys and directories (called a Node for zookeeper). + +Apiv3 is in the work for `etcd`, which removes this key/directory distinction, but until then you should follow these workarounds to make your `libkv` code work across backends. + +###Put + +`etcd` cannot put values on directories, so this puts a major restriction compared to `Consul` and `zookeeper`. + +If you want to support all those three backends, you should make sure to only put data on **leaves**. + +For example: + +```go +_ := kv.Put("path/to/key/bis", []byte("foo"), nil) +_ := kv.Put("path/to/key", []byte("bar"), nil) +``` + +Will work on `Consul` and `zookeeper` but fail for `etcd`. This is because the first `Put` in the case of `etcd` will recursively create the directory hierarchy and `path/to/key` is now considered as a directory. Thus, values should always be stored on leaves if the support for the three backends is planned. + +###WatchTree + +When initializing the `WatchTree`, the natural way to do so is through the following code: + +```go +key := "path/to/key" +if !kv.Exists(key) { + err := kv.Put(key, []byte("data"), nil) +} +events, err := kv.WatchTree(key, nil) +``` + +The code above will not work across backends and etcd will fail on the `WatchTree` call. What happens exactly: + +- `Consul` will create a regular `key` because it has no distinction between directories and keys. This is not an issue as we can invoke `WatchTree` on regular keys. +- `zookeeper` is going to create a `node` that can either be a directory or a key during the lifetime of a program but it does not matter as a directory can hold values and be watchable like a regular key. +- `etcd` is going to create a regular `key`. We cannot invoke `WatchTree` on regular keys using etcd. + +To be cross-compatible between those three backends for `WatchTree`, we need to enforce a parameter that is only interpreted with `etcd` and which tells the client to create a `directory` instead of a key. + +```go +key := "path/to/key" +if !kv.Exists(key) { + // We enforce IsDir = true to make sure etcd creates a directory + err := kv.Put(key, []byte("data"), &store.WriteOptions{IsDir:true}) +} +events, err := kv.WatchTree(key, nil) +``` + +The code above will work for the three backends but make sure to not try to store any value at that path as the call to `Put` will fail for `etcd` (you can only put at `path/to/key/foo`, `path/to/key/bar` for example). + +##Etcd distributed locking + +There is `Lock` mechanisms baked in the `coreos/etcd/client` for now. Instead, `libkv` has its own implementation of a `Lock` on top of `etcd`. + +The general workflow for the `Lock` is as follows: + +- Call Lock concurrently on a `key` between threads/programs +- Only one will create that key, others are going to fail because the key has already been created +- The thread locking the key can get the right index to set the value of the key using Compare And Swap and effectively Lock and hold the key +- Other threads are given a wrong index to fail the Compare and Swap and block until the key has been released by the thread holding the Lock +- Lock seekers are setting up a Watch listening on that key and events happening on the key +- When the thread/program stops holding the lock, it deletes the key triggering a `delete` event that will notify all the other threads. In case the program crashes, the key has a TTL attached that will send an `expire` event when this TTL expires. +- Once everyone is notified, back to the first step. First come, first served with the Lock. + +The whole Lock process is highly dependent on the `delete`/`expire` events of `etcd`. So don't expect the key to be still there once the Lock is released. + +For example if the whole logic is to `Lock` a key and expect the value to still be there after it has been unlocked, it is not going to be cross-backend compatible with `Consul` and `zookeeper`. On the other end the `etcd` Lock can still be used to do Leader Election for example and still be cross-compatible with other backends. \ No newline at end of file diff --git a/vendor/github.com/docker/libkv/docs/examples.md b/vendor/github.com/docker/libkv/docs/examples.md new file mode 100644 index 00000000..09752db1 --- /dev/null +++ b/vendor/github.com/docker/libkv/docs/examples.md @@ -0,0 +1,157 @@ +#Examples + +This document contains useful example of usage for `libkv`. It might not be complete but provides with general informations on how to use the client. + +##Create a store and use Put/Get/Delete + +```go +package main + +import ( + "fmt" + "time" + "log" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" +) + +func init() { + // Register consul store to libkv + consul.Register() + + // We can register as many backends that are supported by libkv + etcd.Register() + zookeeper.Register() + boltdb.Register() +} + +func main() { + client := "localhost:8500" + + // Initialize a new store with consul + kv, err := libkv.NewStore( + store.CONSUL, // or "consul" + []string{client}, + &store.Config{ + ConnectionTimeout: 10*time.Second, + }, + ) + if err != nil { + log.Fatal("Cannot create store consul") + } + + key := "foo" + err = kv.Put(key, []byte("bar"), nil) + if err != nil { + fmt.Errorf("Error trying to put value at key: %v", key) + } + + pair, err := kv.Get(key) + if err != nil { + fmt.Errorf("Error trying accessing value at key: %v", key) + } + + err = kv.Delete(key) + if err != nil { + fmt.Errorf("Error trying to delete key %v", key) + } + + log.Info("value: ", string(pair.Value)) +} +``` + +##List keys + +```go +// List will list all the keys under `key` if it contains a set of child keys/values +entries, err := kv.List(key) +for _, pair := range entries { + fmt.Printf("key=%v - value=%v", pair.Key, string(pair.Value)) +} + +``` + +##Watching for events on a single key (Watch) + +You can use watches to watch modifications on a key. First you need to check if the key exists. If this is not the case, we need to create it using the `Put` function. + +```go +// Checking on the key before watching +if !kv.Exists(key) { + err := kv.Put(key, []byte("bar"), nil) + if err != nil { + fmt.Errorf("Something went wrong when initializing key %v", key) + } +} + +stopCh := make(<-chan struct{}) +events, err := kv.Watch(key, stopCh) + +select { + case pair := <-events: + // Do something with events + fmt.Printf("value changed on key %v: new value=%v", key, pair.Value) +} + +``` + +##Watching for events happening on child keys (WatchTree) + +You can use watches to watch modifications on a key. First you need to check if the key exists. If this is not the case, we need to create it using the `Put` function. There is a special step here though if you want your code to work across backends. Because `etcd` is a special case and it makes the distinction between directories and keys, we need to make sure that the created key is considered as a directory by enforcing `IsDir` at `true`. + +```go +// Checking on the key before watching +if !kv.Exists(key) { + // Don't forget IsDir:true if the code is used cross-backend + err := kv.Put(key, []byte("bar"), &store.WriteOptions{IsDir:true}) + if err != nil { + fmt.Errorf("Something went wrong when initializing key %v", key) + } +} + +stopCh := make(<-chan struct{}) +events, err := kv.WatchTree(key, stopCh) + +select { + case pairs := <-events: + // Do something with events + for _, pair := range pairs { + fmt.Printf("value changed on key %v: new value=%v", key, pair.Value) + } +} + +``` + +## Distributed Locking, using Lock/Unlock + +```go +key := "lockKey" +value := []byte("bar") + +// Initialize a distributed lock. TTL is optional, it is here to make sure that +// the lock is released after the program that is holding the lock ends or crashes +lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second}) +if err != nil { + fmt.Errorf("something went wrong when trying to initialize the Lock") +} + +// Try to lock the key, the call to Lock() is blocking +_, err := lock.Lock(nil) +if err != nil { + fmt.Errorf("something went wrong when trying to lock key %v", key) +} + +// Get should work because we are holding the key +pair, err := kv.Get(key) +if err != nil { + fmt.Errorf("key %v has value %v", key, pair.Value) +} + +// Unlock the key +err = lock.Unlock() +if err != nil { + fmt.Errorf("something went wrong when trying to unlock key %v", key) +} +``` \ No newline at end of file diff --git a/vendor/github.com/docker/libkv/libkv.go b/vendor/github.com/docker/libkv/libkv.go new file mode 100644 index 00000000..bdb8c752 --- /dev/null +++ b/vendor/github.com/docker/libkv/libkv.go @@ -0,0 +1,40 @@ +package libkv + +import ( + "fmt" + "sort" + "strings" + + "github.com/docker/libkv/store" +) + +// Initialize creates a new Store object, initializing the client +type Initialize func(addrs []string, options *store.Config) (store.Store, error) + +var ( + // Backend initializers + initializers = make(map[store.Backend]Initialize) + + supportedBackend = func() string { + keys := make([]string, 0, len(initializers)) + for k := range initializers { + keys = append(keys, string(k)) + } + sort.Strings(keys) + return strings.Join(keys, ", ") + }() +) + +// NewStore creates an instance of store +func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) { + if init, exists := initializers[backend]; exists { + return init(addrs, options) + } + + return nil, fmt.Errorf("%s %s", store.ErrBackendNotSupported.Error(), supportedBackend) +} + +// AddStore adds a new store backend to libkv +func AddStore(store store.Backend, init Initialize) { + initializers[store] = init +} diff --git a/vendor/github.com/docker/libkv/libkv_test.go b/vendor/github.com/docker/libkv/libkv_test.go new file mode 100644 index 00000000..fe7af6b0 --- /dev/null +++ b/vendor/github.com/docker/libkv/libkv_test.go @@ -0,0 +1,24 @@ +package libkv + +import ( + "testing" + "time" + + "github.com/docker/libkv/store" + "github.com/stretchr/testify/assert" +) + +func TestNewStoreUnsupported(t *testing.T) { + client := "localhost:9999" + + kv, err := NewStore( + "unsupported", + []string{client}, + &store.Config{ + ConnectionTimeout: 10 * time.Second, + }, + ) + assert.Error(t, err) + assert.Nil(t, kv) + assert.Equal(t, "Backend storage not supported yet, please choose one of ", err.Error()) +} diff --git a/vendor/github.com/docker/libkv/script/.validate b/vendor/github.com/docker/libkv/script/.validate new file mode 100644 index 00000000..3767f422 --- /dev/null +++ b/vendor/github.com/docker/libkv/script/.validate @@ -0,0 +1,33 @@ +#!/bin/bash + +if [ -z "$VALIDATE_UPSTREAM" ]; then + # this is kind of an expensive check, so let's not do this twice if we + # are running more than one validate bundlescript + + VALIDATE_REPO='https://github.com/docker/libkv.git' + VALIDATE_BRANCH='master' + + if [ "$TRAVIS" = 'true' -a "$TRAVIS_PULL_REQUEST" != 'false' ]; then + VALIDATE_REPO="https://github.com/${TRAVIS_REPO_SLUG}.git" + VALIDATE_BRANCH="${TRAVIS_BRANCH}" + fi + + VALIDATE_HEAD="$(git rev-parse --verify HEAD)" + + git fetch -q "$VALIDATE_REPO" "refs/heads/$VALIDATE_BRANCH" + VALIDATE_UPSTREAM="$(git rev-parse --verify FETCH_HEAD)" + + VALIDATE_COMMIT_LOG="$VALIDATE_UPSTREAM..$VALIDATE_HEAD" + VALIDATE_COMMIT_DIFF="$VALIDATE_UPSTREAM...$VALIDATE_HEAD" + + validate_diff() { + if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then + git diff "$VALIDATE_COMMIT_DIFF" "$@" + fi + } + validate_log() { + if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then + git log "$VALIDATE_COMMIT_LOG" "$@" + fi + } +fi diff --git a/vendor/github.com/docker/libkv/script/coverage b/vendor/github.com/docker/libkv/script/coverage new file mode 100755 index 00000000..a7a13f45 --- /dev/null +++ b/vendor/github.com/docker/libkv/script/coverage @@ -0,0 +1,21 @@ +#!/bin/bash + +MODE="mode: count" +ROOT=${TRAVIS_BUILD_DIR:-.}/../../.. + +# Grab the list of packages. +# Exclude the API and CLI from coverage as it will be covered by integration tests. +PACKAGES=`go list ./...` + +# Create the empty coverage file. +echo $MODE > goverage.report + +# Run coverage on every package. +for package in $PACKAGES; do + output="$ROOT/$package/coverage.out" + + go test -test.short -covermode=count -coverprofile=$output $package + if [ -f "$output" ] ; then + cat "$output" | grep -v "$MODE" >> goverage.report + fi +done diff --git a/vendor/github.com/docker/libkv/script/travis_consul.sh b/vendor/github.com/docker/libkv/script/travis_consul.sh new file mode 100755 index 00000000..7b63d6b6 --- /dev/null +++ b/vendor/github.com/docker/libkv/script/travis_consul.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ $# -gt 0 ] ; then + CONSUL_VERSION="$1" +else + CONSUL_VERSION="0.5.2" +fi + +# install consul +wget "https://releases.hashicorp.com/consul/${CONSUL_VERSION}/consul_${CONSUL_VERSION}_linux_amd64.zip" +unzip "consul_${CONSUL_VERSION}_linux_amd64.zip" + +# make config for minimum ttl +touch config.json +echo "{\"session_ttl_min\": \"1s\"}" >> config.json + +# check +./consul --version diff --git a/vendor/github.com/docker/libkv/script/travis_etcd.sh b/vendor/github.com/docker/libkv/script/travis_etcd.sh new file mode 100755 index 00000000..bee8567f --- /dev/null +++ b/vendor/github.com/docker/libkv/script/travis_etcd.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +if [ $# -gt 0 ] ; then + ETCD_VERSION="$1" +else + ETCD_VERSION="2.2.0" +fi + +curl -L https://github.com/coreos/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz -o etcd-v$ETCD_VERSION-linux-amd64.tar.gz +tar xzvf etcd-v$ETCD_VERSION-linux-amd64.tar.gz +mv etcd-v$ETCD_VERSION-linux-amd64 etcd diff --git a/vendor/github.com/docker/libkv/script/travis_zk.sh b/vendor/github.com/docker/libkv/script/travis_zk.sh new file mode 100755 index 00000000..636a2407 --- /dev/null +++ b/vendor/github.com/docker/libkv/script/travis_zk.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ $# -gt 0 ] ; then + ZK_VERSION="$1" +else + ZK_VERSION="3.4.7" +fi + +wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-${ZK_VERSION}.tar.gz" +tar -xvf "zookeeper-${ZK_VERSION}.tar.gz" +mv zookeeper-$ZK_VERSION zk +mv ./zk/conf/zoo_sample.cfg ./zk/conf/zoo.cfg diff --git a/vendor/github.com/docker/libkv/script/validate-gofmt b/vendor/github.com/docker/libkv/script/validate-gofmt new file mode 100755 index 00000000..c565976b --- /dev/null +++ b/vendor/github.com/docker/libkv/script/validate-gofmt @@ -0,0 +1,30 @@ +#!/bin/bash + +source "$(dirname "$BASH_SOURCE")/.validate" + +IFS=$'\n' +files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^Godeps/' || true) ) +unset IFS + +badFiles=() +for f in "${files[@]}"; do + # we use "git show" here to validate that what's committed is formatted + if [ "$(git show "$VALIDATE_HEAD:$f" | gofmt -s -l)" ]; then + badFiles+=( "$f" ) + fi +done + +if [ ${#badFiles[@]} -eq 0 ]; then + echo 'Congratulations! All Go source files are properly formatted.' +else + { + echo "These files are not properly gofmt'd:" + for f in "${badFiles[@]}"; do + echo " - $f" + done + echo + echo 'Please reformat the above files using "gofmt -s -w" and commit the result.' + echo + } >&2 + false +fi diff --git a/vendor/github.com/docker/libkv/store/boltdb/boltdb.go b/vendor/github.com/docker/libkv/store/boltdb/boltdb.go new file mode 100644 index 00000000..cdfd74f8 --- /dev/null +++ b/vendor/github.com/docker/libkv/store/boltdb/boltdb.go @@ -0,0 +1,474 @@ +package boltdb + +import ( + "bytes" + "encoding/binary" + "errors" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/boltdb/bolt" + "github.com/docker/libkv" + "github.com/docker/libkv/store" +) + +var ( + // ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for + // BoltDB. Endpoint has to be a local file path + ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path") + // ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing + ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing") +) + +const ( + filePerm os.FileMode = 0644 +) + +//BoltDB type implements the Store interface +type BoltDB struct { + client *bolt.DB + boltBucket []byte + dbIndex uint64 + path string + timeout time.Duration + // By default libkv opens and closes the bolt DB connection for every + // get/put operation. This allows multiple apps to use a Bolt DB at the + // same time. + // PersistConnection flag provides an option to override ths behavior. + // ie: open the connection in New and use it till Close is called. + PersistConnection bool + sync.Mutex +} + +const ( + libkvmetadatalen = 8 + transientTimeout = time.Duration(10) * time.Second +) + +// Register registers boltdb to libkv +func Register() { + libkv.AddStore(store.BOLTDB, New) +} + +// New opens a new BoltDB connection to the specified path and bucket +func New(endpoints []string, options *store.Config) (store.Store, error) { + var ( + db *bolt.DB + err error + boltOptions *bolt.Options + timeout = transientTimeout + ) + + if len(endpoints) > 1 { + return nil, ErrMultipleEndpointsUnsupported + } + + if (options == nil) || (len(options.Bucket) == 0) { + return nil, ErrBoltBucketOptionMissing + } + + dir, _ := filepath.Split(endpoints[0]) + if err = os.MkdirAll(dir, 0750); err != nil { + return nil, err + } + + if options.PersistConnection { + boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} + db, err = bolt.Open(endpoints[0], filePerm, boltOptions) + if err != nil { + return nil, err + } + } + + if options.ConnectionTimeout != 0 { + timeout = options.ConnectionTimeout + } + + b := &BoltDB{ + client: db, + path: endpoints[0], + boltBucket: []byte(options.Bucket), + timeout: timeout, + PersistConnection: options.PersistConnection, + } + + return b, nil +} + +func (b *BoltDB) reset() { + b.path = "" + b.boltBucket = []byte{} +} + +func (b *BoltDB) getDBhandle() (*bolt.DB, error) { + var ( + db *bolt.DB + err error + ) + if !b.PersistConnection { + boltOptions := &bolt.Options{Timeout: b.timeout} + if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil { + return nil, err + } + b.client = db + } + + return b.client, nil +} + +func (b *BoltDB) releaseDBhandle() { + if !b.PersistConnection { + b.client.Close() + } +} + +// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by +// by a atomic counter maintained by the libkv and appened to the value passed by the client. +func (b *BoltDB) Get(key string) (*store.KVPair, error) { + var ( + val []byte + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + v := bucket.Get([]byte(key)) + val = make([]byte, len(v)) + copy(val, v) + + return nil + }) + + if len(val) == 0 { + return nil, store.ErrKeyNotFound + } + if err != nil { + return nil, err + } + + dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + val = val[libkvmetadatalen:] + + return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil +} + +//Put the key, value pair. index number metadata is prepended to the value +func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { + var ( + dbIndex uint64 + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + dbval := make([]byte, libkvmetadatalen) + + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(b.boltBucket) + if err != nil { + return err + } + + dbIndex = atomic.AddUint64(&b.dbIndex, 1) + binary.LittleEndian.PutUint64(dbval, dbIndex) + dbval = append(dbval, value...) + + err = bucket.Put([]byte(key), dbval) + if err != nil { + return err + } + return nil + }) + return err +} + +//Delete the value for the given key. +func (b *BoltDB) Delete(key string) error { + var ( + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + err := bucket.Delete([]byte(key)) + return err + }) + return err +} + +// Exists checks if the key exists inside the store +func (b *BoltDB) Exists(key string) (bool, error) { + var ( + val []byte + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + val = bucket.Get([]byte(key)) + + return nil + }) + + if len(val) == 0 { + return false, err + } + return true, err +} + +// List returns the range of keys starting with the passed in prefix +func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { + var ( + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + kv := []*store.KVPair{} + + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + cursor := bucket.Cursor() + prefix := []byte(keyPrefix) + + for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() { + + dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen]) + v = v[libkvmetadatalen:] + val := make([]byte, len(v)) + copy(val, v) + + kv = append(kv, &store.KVPair{ + Key: string(key), + Value: val, + LastIndex: dbIndex, + }) + } + return nil + }) + if len(kv) == 0 { + return nil, store.ErrKeyNotFound + } + return kv, err +} + +// AtomicDelete deletes a value at "key" if the key +// has not been modified in the meantime, throws an +// error if this is the case +func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) { + var ( + val []byte + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if previous == nil { + return false, store.ErrPreviousNotSpecified + } + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + val = bucket.Get([]byte(key)) + if val == nil { + return store.ErrKeyNotFound + } + dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + if dbIndex != previous.LastIndex { + return store.ErrKeyModified + } + err := bucket.Delete([]byte(key)) + return err + }) + if err != nil { + return false, err + } + return true, err +} + +// AtomicPut puts a value at "key" if the key has not been +// modified since the last Put, throws an error if this is the case +func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { + var ( + val []byte + dbIndex uint64 + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + dbval := make([]byte, libkvmetadatalen) + + if db, err = b.getDBhandle(); err != nil { + return false, nil, err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + var err error + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + if previous != nil { + return store.ErrKeyNotFound + } + bucket, err = tx.CreateBucket(b.boltBucket) + if err != nil { + return err + } + } + // AtomicPut is equivalent to Put if previous is nil and the Ky + // doesn't exist in the DB. + val = bucket.Get([]byte(key)) + if previous == nil && len(val) != 0 { + return store.ErrKeyExists + } + if previous != nil { + if len(val) == 0 { + return store.ErrKeyNotFound + } + dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + if dbIndex != previous.LastIndex { + return store.ErrKeyModified + } + } + dbIndex = atomic.AddUint64(&b.dbIndex, 1) + binary.LittleEndian.PutUint64(dbval, b.dbIndex) + dbval = append(dbval, value...) + return (bucket.Put([]byte(key), dbval)) + }) + if err != nil { + return false, nil, err + } + + updated := &store.KVPair{ + Key: key, + Value: value, + LastIndex: dbIndex, + } + + return true, updated, nil +} + +// Close the db connection to the BoltDB +func (b *BoltDB) Close() { + b.Lock() + defer b.Unlock() + + if !b.PersistConnection { + b.reset() + } else { + b.client.Close() + } + return +} + +// DeleteTree deletes a range of keys with a given prefix +func (b *BoltDB) DeleteTree(keyPrefix string) error { + var ( + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + cursor := bucket.Cursor() + prefix := []byte(keyPrefix) + + for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() { + _ = bucket.Delete([]byte(key)) + } + return nil + }) + + return err +} + +// NewLock has to implemented at the library level since its not supported by BoltDB +func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) { + return nil, store.ErrCallNotSupported +} + +// Watch has to implemented at the library level since its not supported by BoltDB +func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + return nil, store.ErrCallNotSupported +} + +// WatchTree has to implemented at the library level since its not supported by BoltDB +func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + return nil, store.ErrCallNotSupported +} diff --git a/vendor/github.com/docker/libkv/store/boltdb/boltdb_test.go b/vendor/github.com/docker/libkv/store/boltdb/boltdb_test.go new file mode 100644 index 00000000..3eb4e84b --- /dev/null +++ b/vendor/github.com/docker/libkv/store/boltdb/boltdb_test.go @@ -0,0 +1,144 @@ +package boltdb + +import ( + "os" + "testing" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +func makeBoltDBClient(t *testing.T) store.Store { + kv, err := New([]string{"/tmp/not_exist_dir/__boltdbtest"}, &store.Config{Bucket: "boltDBTest"}) + + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore( + store.BOLTDB, + []string{"/tmp/not_exist_dir/__boltdbtest"}, + &store.Config{Bucket: "boltDBTest"}, + ) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*BoltDB); !ok { + t.Fatal("Error registering and initializing boltDB") + } + + _ = os.Remove("/tmp/not_exist_dir/__boltdbtest") +} + +// TestMultiplePersistConnection tests the second connection to a +// BoltDB fails when one is already open with PersistConnection flag +func TestMultiplePersistConnection(t *testing.T) { + kv, err := libkv.NewStore( + store.BOLTDB, + []string{"/tmp/not_exist_dir/__boltdbtest"}, + &store.Config{ + Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second, + PersistConnection: true}, + ) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*BoltDB); !ok { + t.Fatal("Error registering and initializing boltDB") + } + + // Must fail if multiple boltdb requests are made with a valid timeout + kv, err = libkv.NewStore( + store.BOLTDB, + []string{"/tmp/not_exist_dir/__boltdbtest"}, + &store.Config{ + Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second, + PersistConnection: true}, + ) + assert.Error(t, err) + + _ = os.Remove("/tmp/not_exist_dir/__boltdbtest") +} + +// TestConcurrentConnection tests simultaenous get/put using +// two handles. +func TestConcurrentConnection(t *testing.T) { + var err error + kv1, err1 := libkv.NewStore( + store.BOLTDB, + []string{"/tmp/__boltdbtest"}, + &store.Config{ + Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second}, + ) + assert.NoError(t, err1) + assert.NotNil(t, kv1) + + kv2, err2 := libkv.NewStore( + store.BOLTDB, + []string{"/tmp/__boltdbtest"}, + &store.Config{Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second}, + ) + assert.NoError(t, err2) + assert.NotNil(t, kv2) + + key1 := "TestKV1" + value1 := []byte("TestVal1") + err = kv1.Put(key1, value1, nil) + assert.NoError(t, err) + + key2 := "TestKV2" + value2 := []byte("TestVal2") + err = kv2.Put(key2, value2, nil) + assert.NoError(t, err) + + pair1, err1 := kv1.Get(key1) + assert.NoError(t, err) + if assert.NotNil(t, pair1) { + assert.NotNil(t, pair1.Value) + } + assert.Equal(t, pair1.Value, value1) + + pair2, err2 := kv2.Get(key2) + assert.NoError(t, err) + if assert.NotNil(t, pair2) { + assert.NotNil(t, pair2.Value) + } + assert.Equal(t, pair2.Value, value2) + + // AtomicPut using kv1 and kv2 should succeed + _, _, err = kv1.AtomicPut(key1, []byte("TestnewVal1"), pair1, nil) + assert.NoError(t, err) + + _, _, err = kv2.AtomicPut(key2, []byte("TestnewVal2"), pair2, nil) + assert.NoError(t, err) + + testutils.RunTestCommon(t, kv1) + testutils.RunTestCommon(t, kv2) + + kv1.Close() + kv2.Close() + + _ = os.Remove("/tmp/__boltdbtest") +} + +func TestBoldDBStore(t *testing.T) { + kv := makeBoltDBClient(t) + + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + + _ = os.Remove("/tmp/not_exist_dir/__boltdbtest") +} diff --git a/vendor/github.com/docker/libkv/store/consul/consul.go b/vendor/github.com/docker/libkv/store/consul/consul.go new file mode 100644 index 00000000..cb64be72 --- /dev/null +++ b/vendor/github.com/docker/libkv/store/consul/consul.go @@ -0,0 +1,558 @@ +package consul + +import ( + "crypto/tls" + "errors" + "net/http" + "strings" + "sync" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + api "github.com/hashicorp/consul/api" +) + +const ( + // DefaultWatchWaitTime is how long we block for at a + // time to check if the watched key has changed. This + // affects the minimum time it takes to cancel a watch. + DefaultWatchWaitTime = 15 * time.Second + + // RenewSessionRetryMax is the number of time we should try + // to renew the session before giving up and throwing an error + RenewSessionRetryMax = 5 + + // MaxSessionDestroyAttempts is the maximum times we will try + // to explicitely destroy the session attached to a lock after + // the connectivity to the store has been lost + MaxSessionDestroyAttempts = 5 + + // defaultLockTTL is the default ttl for the consul lock + defaultLockTTL = 20 * time.Second +) + +var ( + // ErrMultipleEndpointsUnsupported is thrown when there are + // multiple endpoints specified for Consul + ErrMultipleEndpointsUnsupported = errors.New("consul does not support multiple endpoints") + + // ErrSessionRenew is thrown when the session can't be + // renewed because the Consul version does not support sessions + ErrSessionRenew = errors.New("cannot set or renew session for ttl, unable to operate on sessions") +) + +// Consul is the receiver type for the +// Store interface +type Consul struct { + sync.Mutex + config *api.Config + client *api.Client +} + +type consulLock struct { + lock *api.Lock + renewCh chan struct{} +} + +// Register registers consul to libkv +func Register() { + libkv.AddStore(store.CONSUL, New) +} + +// New creates a new Consul client given a list +// of endpoints and optional tls config +func New(endpoints []string, options *store.Config) (store.Store, error) { + if len(endpoints) > 1 { + return nil, ErrMultipleEndpointsUnsupported + } + + s := &Consul{} + + // Create Consul client + config := api.DefaultConfig() + s.config = config + config.HttpClient = http.DefaultClient + config.Address = endpoints[0] + config.Scheme = "http" + + // Set options + if options != nil { + if options.TLS != nil { + s.setTLS(options.TLS) + } + if options.ConnectionTimeout != 0 { + s.setTimeout(options.ConnectionTimeout) + } + } + + // Creates a new client + client, err := api.NewClient(config) + if err != nil { + return nil, err + } + s.client = client + + return s, nil +} + +// SetTLS sets Consul TLS options +func (s *Consul) setTLS(tls *tls.Config) { + s.config.HttpClient.Transport = &http.Transport{ + TLSClientConfig: tls, + } + s.config.Scheme = "https" +} + +// SetTimeout sets the timeout for connecting to Consul +func (s *Consul) setTimeout(time time.Duration) { + s.config.WaitTime = time +} + +// Normalize the key for usage in Consul +func (s *Consul) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimPrefix(key, "/") +} + +func (s *Consul) renewSession(pair *api.KVPair, ttl time.Duration) error { + // Check if there is any previous session with an active TTL + session, err := s.getActiveSession(pair.Key) + if err != nil { + return err + } + + if session == "" { + entry := &api.SessionEntry{ + Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires + TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay + } + + // Create the key session + session, _, err = s.client.Session().Create(entry, nil) + if err != nil { + return err + } + + lockOpts := &api.LockOptions{ + Key: pair.Key, + Session: session, + } + + // Lock and ignore if lock is held + // It's just a placeholder for the + // ephemeral behavior + lock, _ := s.client.LockOpts(lockOpts) + if lock != nil { + lock.Lock(nil) + } + } + + _, _, err = s.client.Session().Renew(session, nil) + return err +} + +// getActiveSession checks if the key already has +// a session attached +func (s *Consul) getActiveSession(key string) (string, error) { + pair, _, err := s.client.KV().Get(key, nil) + if err != nil { + return "", err + } + if pair != nil && pair.Session != "" { + return pair.Session, nil + } + return "", nil +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Consul) Get(key string) (*store.KVPair, error) { + options := &api.QueryOptions{ + AllowStale: false, + RequireConsistent: true, + } + + pair, meta, err := s.client.KV().Get(s.normalize(key), options) + if err != nil { + return nil, err + } + + // If pair is nil then the key does not exist + if pair == nil { + return nil, store.ErrKeyNotFound + } + + return &store.KVPair{Key: pair.Key, Value: pair.Value, LastIndex: meta.LastIndex}, nil +} + +// Put a value at "key" +func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { + key = s.normalize(key) + + p := &api.KVPair{ + Key: key, + Value: value, + Flags: api.LockFlagValue, + } + + if opts != nil && opts.TTL > 0 { + // Create or renew a session holding a TTL. Operations on sessions + // are not deterministic: creating or renewing a session can fail + for retry := 1; retry <= RenewSessionRetryMax; retry++ { + err := s.renewSession(p, opts.TTL) + if err == nil { + break + } + if retry == RenewSessionRetryMax { + return ErrSessionRenew + } + } + } + + _, err := s.client.KV().Put(p, nil) + return err +} + +// Delete a value at "key" +func (s *Consul) Delete(key string) error { + if _, err := s.Get(key); err != nil { + return err + } + _, err := s.client.KV().Delete(s.normalize(key), nil) + return err +} + +// Exists checks that the key exists inside the store +func (s *Consul) Exists(key string) (bool, error) { + _, err := s.Get(key) + if err != nil { + if err == store.ErrKeyNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// List child nodes of a given directory +func (s *Consul) List(directory string) ([]*store.KVPair, error) { + pairs, _, err := s.client.KV().List(s.normalize(directory), nil) + if err != nil { + return nil, err + } + if len(pairs) == 0 { + return nil, store.ErrKeyNotFound + } + + kv := []*store.KVPair{} + + for _, pair := range pairs { + if pair.Key == directory { + continue + } + kv = append(kv, &store.KVPair{ + Key: pair.Key, + Value: pair.Value, + LastIndex: pair.ModifyIndex, + }) + } + + return kv, nil +} + +// DeleteTree deletes a range of keys under a given directory +func (s *Consul) DeleteTree(directory string) error { + if _, err := s.List(directory); err != nil { + return err + } + _, err := s.client.KV().DeleteTree(s.normalize(directory), nil) + return err +} + +// Watch for changes on a "key" +// It returns a channel that will receive changes or pass +// on errors. Upon creation, the current value will first +// be sent to the channel. Providing a non-nil stopCh can +// be used to stop watching. +func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + kv := s.client.KV() + watchCh := make(chan *store.KVPair) + + go func() { + defer close(watchCh) + + // Use a wait time in order to check if we should quit + // from time to time. + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} + + for { + // Check if we should quit + select { + case <-stopCh: + return + default: + } + + // Get the key + pair, meta, err := kv.Get(key, opts) + if err != nil { + return + } + + // If LastIndex didn't change then it means `Get` returned + // because of the WaitTime and the key didn't changed. + if opts.WaitIndex == meta.LastIndex { + continue + } + opts.WaitIndex = meta.LastIndex + + // Return the value to the channel + // FIXME: What happens when a key is deleted? + if pair != nil { + watchCh <- &store.KVPair{ + Key: pair.Key, + Value: pair.Value, + LastIndex: pair.ModifyIndex, + } + } + } + }() + + return watchCh, nil +} + +// WatchTree watches for changes on a "directory" +// It returns a channel that will receive changes or pass +// on errors. Upon creating a watch, the current childs values +// will be sent to the channel .Providing a non-nil stopCh can +// be used to stop watching. +func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + kv := s.client.KV() + watchCh := make(chan []*store.KVPair) + + go func() { + defer close(watchCh) + + // Use a wait time in order to check if we should quit + // from time to time. + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} + for { + // Check if we should quit + select { + case <-stopCh: + return + default: + } + + // Get all the childrens + pairs, meta, err := kv.List(directory, opts) + if err != nil { + return + } + + // If LastIndex didn't change then it means `Get` returned + // because of the WaitTime and the child keys didn't change. + if opts.WaitIndex == meta.LastIndex { + continue + } + opts.WaitIndex = meta.LastIndex + + // Return children KV pairs to the channel + kvpairs := []*store.KVPair{} + for _, pair := range pairs { + if pair.Key == directory { + continue + } + kvpairs = append(kvpairs, &store.KVPair{ + Key: pair.Key, + Value: pair.Value, + LastIndex: pair.ModifyIndex, + }) + } + watchCh <- kvpairs + } + }() + + return watchCh, nil +} + +// NewLock returns a handle to a lock struct which can +// be used to provide mutual exclusion on a key +func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker, error) { + lockOpts := &api.LockOptions{ + Key: s.normalize(key), + } + + lock := &consulLock{} + + ttl := defaultLockTTL + + if options != nil { + // Set optional TTL on Lock + if options.TTL != 0 { + ttl = options.TTL + } + // Set optional value on Lock + if options.Value != nil { + lockOpts.Value = options.Value + } + } + + entry := &api.SessionEntry{ + Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires + TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay + } + + // Create the key session + session, _, err := s.client.Session().Create(entry, nil) + if err != nil { + return nil, err + } + + // Place the session and renew chan on lock + lockOpts.Session = session + lock.renewCh = options.RenewLock + + l, err := s.client.LockOpts(lockOpts) + if err != nil { + return nil, err + } + + // Renew the session ttl lock periodically + s.renewLockSession(entry.TTL, session, options.RenewLock) + + lock.lock = l + return lock, nil +} + +// renewLockSession is used to renew a session Lock, it takes +// a stopRenew chan which is used to explicitely stop the session +// renew process. The renew routine never stops until a signal is +// sent to this channel. If deleting the session fails because the +// connection to the store is lost, it keeps trying to delete the +// session periodically until it can contact the store, this ensures +// that the lock is not maintained indefinitely which ensures liveness +// over safety for the lock when the store becomes unavailable. +func (s *Consul) renewLockSession(initialTTL string, id string, stopRenew chan struct{}) { + sessionDestroyAttempts := 0 + ttl, err := time.ParseDuration(initialTTL) + if err != nil { + return + } + go func() { + for { + select { + case <-time.After(ttl / 2): + entry, _, err := s.client.Session().Renew(id, nil) + if err != nil { + // If an error occurs, continue until the + // session gets destroyed explicitely or + // the session ttl times out + continue + } + if entry == nil { + return + } + + // Handle the server updating the TTL + ttl, _ = time.ParseDuration(entry.TTL) + + case <-stopRenew: + // Attempt a session destroy + _, err := s.client.Session().Destroy(id, nil) + if err == nil { + return + } + + if sessionDestroyAttempts >= MaxSessionDestroyAttempts { + return + } + + // We can't destroy the session because the store + // is unavailable, wait for the session renew period + sessionDestroyAttempts++ + time.Sleep(ttl / 2) + } + } + }() +} + +// Lock attempts to acquire the lock and blocks while +// doing so. It returns a channel that is closed if our +// lock is lost or if an error occurs +func (l *consulLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { + return l.lock.Lock(stopChan) +} + +// Unlock the "key". Calling unlock while +// not holding the lock will throw an error +func (l *consulLock) Unlock() error { + if l.renewCh != nil { + close(l.renewCh) + } + return l.lock.Unlock() +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { + + p := &api.KVPair{Key: s.normalize(key), Value: value, Flags: api.LockFlagValue} + + if previous == nil { + // Consul interprets ModifyIndex = 0 as new key. + p.ModifyIndex = 0 + } else { + p.ModifyIndex = previous.LastIndex + } + + ok, _, err := s.client.KV().CAS(p, nil) + if err != nil { + return false, nil, err + } + if !ok { + if previous == nil { + return false, nil, store.ErrKeyExists + } + return false, nil, store.ErrKeyModified + } + + pair, err := s.Get(key) + if err != nil { + return false, nil, err + } + + return true, pair, nil +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Consul) AtomicDelete(key string, previous *store.KVPair) (bool, error) { + if previous == nil { + return false, store.ErrPreviousNotSpecified + } + + p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex, Flags: api.LockFlagValue} + + // Extra Get operation to check on the key + _, err := s.Get(key) + if err != nil && err == store.ErrKeyNotFound { + return false, err + } + + if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { + return false, err + } else if !work { + return false, store.ErrKeyModified + } + + return true, nil +} + +// Close closes the client connection +func (s *Consul) Close() { + return +} diff --git a/vendor/github.com/docker/libkv/store/consul/consul_test.go b/vendor/github.com/docker/libkv/store/consul/consul_test.go new file mode 100644 index 00000000..5019494c --- /dev/null +++ b/vendor/github.com/docker/libkv/store/consul/consul_test.go @@ -0,0 +1,84 @@ +package consul + +import ( + "testing" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:8500" +) + +func makeConsulClient(t *testing.T) store.Store { + + kv, err := New( + []string{client}, + &store.Config{ + ConnectionTimeout: 3 * time.Second, + }, + ) + + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.CONSUL, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Consul); !ok { + t.Fatal("Error registering and initializing consul") + } +} + +func TestConsulStore(t *testing.T) { + kv := makeConsulClient(t) + lockKV := makeConsulClient(t) + ttlKV := makeConsulClient(t) + + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestLockTTL(t, kv, lockKV) + testutils.RunTestTTL(t, kv, ttlKV) + testutils.RunCleanup(t, kv) +} + +func TestGetActiveSession(t *testing.T) { + kv := makeConsulClient(t) + + consul := kv.(*Consul) + + key := "foo" + value := []byte("bar") + + // Put the first key with the Ephemeral flag + err := kv.Put(key, value, &store.WriteOptions{TTL: 2 * time.Second}) + assert.NoError(t, err) + + // Session should not be empty + session, err := consul.getActiveSession(key) + assert.NoError(t, err) + assert.NotEqual(t, session, "") + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err) + + // Check the session again, it should return nothing + session, err = consul.getActiveSession(key) + assert.NoError(t, err) + assert.Equal(t, session, "") +} diff --git a/vendor/github.com/docker/libkv/store/helpers.go b/vendor/github.com/docker/libkv/store/helpers.go new file mode 100644 index 00000000..0fb74c9a --- /dev/null +++ b/vendor/github.com/docker/libkv/store/helpers.go @@ -0,0 +1,47 @@ +package store + +import ( + "strings" +) + +// CreateEndpoints creates a list of endpoints given the right scheme +func CreateEndpoints(addrs []string, scheme string) (entries []string) { + for _, addr := range addrs { + entries = append(entries, scheme+"://"+addr) + } + return entries +} + +// Normalize the key for each store to the form: +// +// /path/to/key +// +func Normalize(key string) string { + return "/" + join(SplitKey(key)) +} + +// GetDirectory gets the full directory part of +// the key to the form: +// +// /path/to/ +// +func GetDirectory(key string) string { + parts := SplitKey(key) + parts = parts[:len(parts)-1] + return "/" + join(parts) +} + +// SplitKey splits the key to extract path informations +func SplitKey(key string) (path []string) { + if strings.Contains(key, "/") { + path = strings.Split(key, "/") + } else { + path = []string{key} + } + return path +} + +// join the path parts with '/' +func join(parts []string) string { + return strings.Join(parts, "/") +} diff --git a/vendor/github.com/docker/libkv/store/mock/mock.go b/vendor/github.com/docker/libkv/store/mock/mock.go new file mode 100644 index 00000000..82a5b03b --- /dev/null +++ b/vendor/github.com/docker/libkv/store/mock/mock.go @@ -0,0 +1,113 @@ +package mock + +import ( + "github.com/docker/libkv/store" + "github.com/stretchr/testify/mock" +) + +// Mock store. Mocks all Store functions using testify.Mock +type Mock struct { + mock.Mock + + // Endpoints passed to InitializeMock + Endpoints []string + + // Options passed to InitializeMock + Options *store.Config +} + +// New creates a Mock store +func New(endpoints []string, options *store.Config) (store.Store, error) { + s := &Mock{} + s.Endpoints = endpoints + s.Options = options + return s, nil +} + +// Put mock +func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error { + args := s.Mock.Called(key, value, opts) + return args.Error(0) +} + +// Get mock +func (s *Mock) Get(key string) (*store.KVPair, error) { + args := s.Mock.Called(key) + return args.Get(0).(*store.KVPair), args.Error(1) +} + +// Delete mock +func (s *Mock) Delete(key string) error { + args := s.Mock.Called(key) + return args.Error(0) +} + +// Exists mock +func (s *Mock) Exists(key string) (bool, error) { + args := s.Mock.Called(key) + return args.Bool(0), args.Error(1) +} + +// Watch mock +func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + args := s.Mock.Called(key, stopCh) + return args.Get(0).(<-chan *store.KVPair), args.Error(1) +} + +// WatchTree mock +func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + args := s.Mock.Called(prefix, stopCh) + return args.Get(0).(chan []*store.KVPair), args.Error(1) +} + +// NewLock mock +func (s *Mock) NewLock(key string, options *store.LockOptions) (store.Locker, error) { + args := s.Mock.Called(key, options) + return args.Get(0).(store.Locker), args.Error(1) +} + +// List mock +func (s *Mock) List(prefix string) ([]*store.KVPair, error) { + args := s.Mock.Called(prefix) + return args.Get(0).([]*store.KVPair), args.Error(1) +} + +// DeleteTree mock +func (s *Mock) DeleteTree(prefix string) error { + args := s.Mock.Called(prefix) + return args.Error(0) +} + +// AtomicPut mock +func (s *Mock) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) { + args := s.Mock.Called(key, value, previous, opts) + return args.Bool(0), args.Get(1).(*store.KVPair), args.Error(2) +} + +// AtomicDelete mock +func (s *Mock) AtomicDelete(key string, previous *store.KVPair) (bool, error) { + args := s.Mock.Called(key, previous) + return args.Bool(0), args.Error(1) +} + +// Lock mock implementation of Locker +type Lock struct { + mock.Mock +} + +// Lock mock +func (l *Lock) Lock(stopCh chan struct{}) (<-chan struct{}, error) { + args := l.Mock.Called(stopCh) + return args.Get(0).(<-chan struct{}), args.Error(1) +} + +// Unlock mock +func (l *Lock) Unlock() error { + args := l.Mock.Called() + return args.Error(0) +} + +// Close mock +func (s *Mock) Close() { + return +} diff --git a/vendor/github.com/docker/libkv/store/store.go b/vendor/github.com/docker/libkv/store/store.go new file mode 100644 index 00000000..7a4850c0 --- /dev/null +++ b/vendor/github.com/docker/libkv/store/store.go @@ -0,0 +1,132 @@ +package store + +import ( + "crypto/tls" + "errors" + "time" +) + +// Backend represents a KV Store Backend +type Backend string + +const ( + // CONSUL backend + CONSUL Backend = "consul" + // ETCD backend + ETCD Backend = "etcd" + // ZK backend + ZK Backend = "zk" + // BOLTDB backend + BOLTDB Backend = "boltdb" +) + +var ( + // ErrBackendNotSupported is thrown when the backend k/v store is not supported by libkv + ErrBackendNotSupported = errors.New("Backend storage not supported yet, please choose one of") + // ErrCallNotSupported is thrown when a method is not implemented/supported by the current backend + ErrCallNotSupported = errors.New("The current call is not supported with this backend") + // ErrNotReachable is thrown when the API cannot be reached for issuing common store operations + ErrNotReachable = errors.New("Api not reachable") + // ErrCannotLock is thrown when there is an error acquiring a lock on a key + ErrCannotLock = errors.New("Error acquiring the lock") + // ErrKeyModified is thrown during an atomic operation if the index does not match the one in the store + ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") + // ErrKeyNotFound is thrown when the key is not found in the store during a Get operation + ErrKeyNotFound = errors.New("Key not found in store") + // ErrPreviousNotSpecified is thrown when the previous value is not specified for an atomic operation + ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation") + // ErrKeyExists is thrown when the previous value exists in the case of an AtomicPut + ErrKeyExists = errors.New("Previous K/V pair exists, cannot complete Atomic operation") +) + +// Config contains the options for a storage client +type Config struct { + ClientTLS *ClientTLSConfig + TLS *tls.Config + ConnectionTimeout time.Duration + Bucket string + PersistConnection bool + Username string + Password string +} + +// ClientTLSConfig contains data for a Client TLS configuration in the form +// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. +type ClientTLSConfig struct { + CertFile string + KeyFile string + CACertFile string +} + +// Store represents the backend K/V storage +// Each store should support every call listed +// here. Or it couldn't be implemented as a K/V +// backend for libkv +type Store interface { + // Put a value at the specified key + Put(key string, value []byte, options *WriteOptions) error + + // Get a value given its key + Get(key string) (*KVPair, error) + + // Delete the value at the specified key + Delete(key string) error + + // Verify if a Key exists in the store + Exists(key string) (bool, error) + + // Watch for changes on a key + Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) + + // WatchTree watches for changes on child nodes under + // a given directory + WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) + + // NewLock creates a lock for a given key. + // The returned Locker is not held and must be acquired + // with `.Lock`. The Value is optional. + NewLock(key string, options *LockOptions) (Locker, error) + + // List the content of a given prefix + List(directory string) ([]*KVPair, error) + + // DeleteTree deletes a range of keys under a given directory + DeleteTree(directory string) error + + // Atomic CAS operation on a single value. + // Pass previous = nil to create a new key. + AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) + + // Atomic delete of a single value + AtomicDelete(key string, previous *KVPair) (bool, error) + + // Close the store connection + Close() +} + +// KVPair represents {Key, Value, Lastindex} tuple +type KVPair struct { + Key string + Value []byte + LastIndex uint64 +} + +// WriteOptions contains optional request parameters +type WriteOptions struct { + IsDir bool + TTL time.Duration +} + +// LockOptions contains optional request parameters +type LockOptions struct { + Value []byte // Optional, value to associate with the lock + TTL time.Duration // Optional, expiration ttl associated with the lock + RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock +} + +// Locker provides locking mechanism on top of the store. +// Similar to `sync.Lock` except it may return errors. +type Locker interface { + Lock(stopChan chan struct{}) (<-chan struct{}, error) + Unlock() error +} diff --git a/vendor/github.com/docker/libkv/testutils/utils.go b/vendor/github.com/docker/libkv/testutils/utils.go new file mode 100644 index 00000000..5385bac6 --- /dev/null +++ b/vendor/github.com/docker/libkv/testutils/utils.go @@ -0,0 +1,622 @@ +package testutils + +import ( + "fmt" + "testing" + "time" + + "github.com/docker/libkv/store" + "github.com/stretchr/testify/assert" +) + +// RunTestCommon tests the minimal required APIs which +// should be supported by all K/V backends +func RunTestCommon(t *testing.T, kv store.Store) { + testPutGetDeleteExists(t, kv) + testList(t, kv) + testDeleteTree(t, kv) +} + +// RunTestAtomic tests the Atomic operations by the K/V +// backends +func RunTestAtomic(t *testing.T, kv store.Store) { + testAtomicPut(t, kv) + testAtomicPutCreate(t, kv) + testAtomicPutWithSlashSuffixKey(t, kv) + testAtomicDelete(t, kv) +} + +// RunTestWatch tests the watch/monitor APIs supported +// by the K/V backends. +func RunTestWatch(t *testing.T, kv store.Store) { + testWatch(t, kv) + testWatchTree(t, kv) +} + +// RunTestLock tests the KV pair Lock/Unlock APIs supported +// by the K/V backends. +func RunTestLock(t *testing.T, kv store.Store) { + testLockUnlock(t, kv) +} + +// RunTestLockTTL tests the KV pair Lock with TTL APIs supported +// by the K/V backends. +func RunTestLockTTL(t *testing.T, kv store.Store, backup store.Store) { + testLockTTL(t, kv, backup) +} + +// RunTestTTL tests the TTL functionality of the K/V backend. +func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { + testPutTTL(t, kv, backup) +} + +func testPutGetDeleteExists(t *testing.T, kv store.Store) { + // Get a not exist key should return ErrKeyNotFound + pair, err := kv.Get("testPutGetDelete_not_exist_key") + assert.Equal(t, store.ErrKeyNotFound, err) + + value := []byte("bar") + for _, key := range []string{ + "testPutGetDeleteExists", + "testPutGetDeleteExists/", + "testPutGetDeleteExists/testbar/", + "testPutGetDeleteExists/testbar/testfoobar", + } { + failMsg := fmt.Sprintf("Fail key %s", key) + + // Put the key + err = kv.Put(key, value, nil) + assert.NoError(t, err, failMsg) + + // Get should return the value and an incremented index + pair, err = kv.Get(key) + assert.NoError(t, err, failMsg) + if assert.NotNil(t, pair, failMsg) { + assert.NotNil(t, pair.Value, failMsg) + } + assert.Equal(t, pair.Value, value, failMsg) + assert.NotEqual(t, pair.LastIndex, 0, failMsg) + + // Exists should return true + exists, err := kv.Exists(key) + assert.NoError(t, err, failMsg) + assert.True(t, exists, failMsg) + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err, failMsg) + + // Get should fail + pair, err = kv.Get(key) + assert.Error(t, err, failMsg) + assert.Nil(t, pair, failMsg) + + // Exists should return false + exists, err = kv.Exists(key) + assert.NoError(t, err, failMsg) + assert.False(t, exists, failMsg) + } +} + +func testWatch(t *testing.T, kv store.Store) { + key := "testWatch" + value := []byte("world") + newValue := []byte("world!") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.Watch(key, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(1 * time.Second) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + err := kv.Put(key, newValue, nil) + if assert.NoError(t, err) { + continue + } + return + } + } + }() + + // Check for updates + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + if eventCount == 1 { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, value) + } else { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, newValue) + } + eventCount++ + // We received all the events we wanted to check + if eventCount >= 4 { + return + } + case <-time.After(4 * time.Second): + t.Fatal("Timeout reached") + return + } + } +} + +func testWatchTree(t *testing.T, kv store.Store) { + dir := "testWatchTree" + + node1 := "testWatchTree/node1" + value1 := []byte("node1") + + node2 := "testWatchTree/node2" + value2 := []byte("node2") + + node3 := "testWatchTree/node3" + value3 := []byte("node3") + + err := kv.Put(node1, value1, nil) + assert.NoError(t, err) + err = kv.Put(node2, value2, nil) + assert.NoError(t, err) + err = kv.Put(node3, value3, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.WatchTree(dir, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(500 * time.Millisecond) + for { + select { + case <-timeout: + err := kv.Delete(node3) + assert.NoError(t, err) + return + } + } + }() + + // Check for updates + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + // We received the Delete event on a child node + // Exit test successfully + if eventCount == 2 { + return + } + eventCount++ + case <-time.After(4 * time.Second): + t.Fatal("Timeout reached") + return + } + } +} + +func testAtomicPut(t *testing.T, kv store.Store) { + key := "testAtomicPut" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // This CAS should fail: previous exists. + success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil) + assert.Error(t, err) + assert.False(t, success) + + // This CAS should succeed + success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) + + // This CAS should fail, key exists. + pair.LastIndex = 6744 + success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil) + assert.Error(t, err) + assert.False(t, success) +} + +func testAtomicPutCreate(t *testing.T, kv store.Store) { + // Use a key in a new directory to ensure Stores will create directories + // that don't yet exist. + key := "testAtomicPutCreate/create" + value := []byte("putcreate") + + // AtomicPut the key, previous = nil indicates create. + success, _, err := kv.AtomicPut(key, value, nil, nil) + assert.NoError(t, err) + assert.True(t, success) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + + // Attempting to create again should fail. + success, _, err = kv.AtomicPut(key, value, nil, nil) + assert.Error(t, store.ErrKeyExists) + assert.False(t, success) + + // This CAS should succeed, since it has the value from Get() + success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) +} + +func testAtomicPutWithSlashSuffixKey(t *testing.T, kv store.Store) { + k1 := "testAtomicPutWithSlashSuffixKey/key/" + success, _, err := kv.AtomicPut(k1, []byte{}, nil, nil) + assert.Nil(t, err) + assert.True(t, success) +} + +func testAtomicDelete(t *testing.T, kv store.Store) { + key := "testAtomicDelete" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + tempIndex := pair.LastIndex + + // AtomicDelete should fail + pair.LastIndex = 6744 + success, err := kv.AtomicDelete(key, pair) + assert.Error(t, err) + assert.False(t, success) + + // AtomicDelete should succeed + pair.LastIndex = tempIndex + success, err = kv.AtomicDelete(key, pair) + assert.NoError(t, err) + assert.True(t, success) + + // Delete a non-existent key; should fail + success, err = kv.AtomicDelete(key, pair) + assert.Error(t, store.ErrKeyNotFound) + assert.False(t, success) +} + +func testLockUnlock(t *testing.T, kv store.Store) { + key := "testLockUnlock" + value := []byte("bar") + + // We should be able to create a new lock on key + lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second}) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should successfully succeed or block + lockChan, err := lock.Lock(nil) + assert.NoError(t, err) + assert.NotNil(t, lockChan) + + // Get should work + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Unlock should succeed + err = lock.Unlock() + assert.NoError(t, err) + + // Lock should succeed again + lockChan, err = lock.Lock(nil) + assert.NoError(t, err) + assert.NotNil(t, lockChan) + + // Get should work + pair, err = kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + err = lock.Unlock() + assert.NoError(t, err) +} + +func testLockTTL(t *testing.T, kv store.Store, otherConn store.Store) { + key := "testLockTTL" + value := []byte("bar") + + renewCh := make(chan struct{}) + + // We should be able to create a new lock on key + lock, err := otherConn.NewLock(key, &store.LockOptions{ + Value: value, + TTL: 2 * time.Second, + RenewLock: renewCh, + }) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should successfully succeed + lockChan, err := lock.Lock(nil) + assert.NoError(t, err) + assert.NotNil(t, lockChan) + + // Get should work + pair, err := otherConn.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + time.Sleep(3 * time.Second) + + done := make(chan struct{}) + stop := make(chan struct{}) + + value = []byte("foobar") + + // Create a new lock with another connection + lock, err = kv.NewLock( + key, + &store.LockOptions{ + Value: value, + TTL: 3 * time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should block, the session on the lock + // is still active and renewed periodically + go func(<-chan struct{}) { + _, _ = lock.Lock(stop) + done <- struct{}{} + }(done) + + select { + case _ = <-done: + t.Fatal("Lock succeeded on a key that is supposed to be locked by another client") + case <-time.After(4 * time.Second): + // Stop requesting the lock as we are blocked as expected + stop <- struct{}{} + break + } + + // Close the connection + otherConn.Close() + + // Force stop the session renewal for the lock + close(renewCh) + + // Let the session on the lock expire + time.Sleep(3 * time.Second) + locked := make(chan struct{}) + + // Lock should now succeed for the other client + go func(<-chan struct{}) { + lockChan, err = lock.Lock(nil) + assert.NoError(t, err) + assert.NotNil(t, lockChan) + locked <- struct{}{} + }(locked) + + select { + case _ = <-locked: + break + case <-time.After(4 * time.Second): + t.Fatal("Unable to take the lock, timed out") + } + + // Get should work with the new value + pair, err = kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + err = lock.Unlock() + assert.NoError(t, err) +} + +func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) { + firstKey := "testPutTTL" + firstValue := []byte("foo") + + secondKey := "second" + secondValue := []byte("bar") + + // Put the first key with the Ephemeral flag + err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second}) + assert.NoError(t, err) + + // Put a second key with the Ephemeral flag + err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second}) + assert.NoError(t, err) + + // Get on firstKey should work + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Get on secondKey should work + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Close the connection + otherConn.Close() + + // Let the session expire + time.Sleep(3 * time.Second) + + // Get on firstKey shouldn't work + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + // Get on secondKey shouldn't work + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func testList(t *testing.T, kv store.Store) { + prefix := "testList" + + firstKey := "testList/first" + firstValue := []byte("first") + + secondKey := "testList/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // List should work and return the two correct values + for _, parent := range []string{prefix, prefix + "/"} { + pairs, err := kv.List(parent) + assert.NoError(t, err) + if assert.NotNil(t, pairs) { + assert.Equal(t, len(pairs), 2) + } + + // Check pairs, those are not necessarily in Put order + for _, pair := range pairs { + if pair.Key == firstKey { + assert.Equal(t, pair.Value, firstValue) + } + if pair.Key == secondKey { + assert.Equal(t, pair.Value, secondValue) + } + } + } + + // List should fail: the key does not exist + pairs, err := kv.List("idontexist") + assert.Equal(t, store.ErrKeyNotFound, err) + assert.Nil(t, pairs) +} + +func testDeleteTree(t *testing.T, kv store.Store) { + prefix := "testDeleteTree" + + firstKey := "testDeleteTree/first" + firstValue := []byte("first") + + secondKey := "testDeleteTree/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // Get should work on the first Key + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, firstValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Get should work on the second Key + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, secondValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete Values under directory `nodes` + err = kv.DeleteTree(prefix) + assert.NoError(t, err) + + // Get should fail on both keys + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} + +// RunCleanup cleans up keys introduced by the tests +func RunCleanup(t *testing.T, kv store.Store) { + for _, key := range []string{ + "testAtomicPutWithSlashSuffixKey", + "testPutGetDeleteExists", + "testWatch", + "testWatchTree", + "testAtomicPut", + "testAtomicPutCreate", + "testAtomicDelete", + "testLockUnlock", + "testLockTTL", + "testPutTTL", + "testList", + "testDeleteTree", + } { + err := kv.DeleteTree(key) + assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err)) + err = kv.Delete(key) + assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err)) + } +} -- cgit v1.2.3