[Vendor] update macaron related (#13409)

* Vendor: update gitea.com/macaron/session to a177a270

* make vendor

* Vendor: update gitea.com/macaron/macaron to 0db5d458

* make vendor

* Vendor: update gitea.com/macaron/cache to 905232fb

* make vendor

* Vendor: update gitea.com/macaron/i18n to 4ca3dd0c

* make vendor

* Vendor: update gitea.com/macaron/gzip to efa5e847

* make vendor

* Vendor: update gitea.com/macaron/captcha to e8597820

* make vendor
This commit is contained in:
6543 2020-11-03 07:04:09 +01:00 committed by GitHub
parent b687707014
commit 70ea2300ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
118 changed files with 14557 additions and 6115 deletions

14
vendor/github.com/couchbase/go-couchbase/.gitignore generated vendored Normal file
View file

@ -0,0 +1,14 @@
#*
*.6
*.a
*~
*.swp
/examples/basic/basic
/hello/hello
/populate/populate
/tools/view2go/view2go
/tools/loadfile/loadfile
gotags.files
TAGS
6.out
_*

5
vendor/github.com/couchbase/go-couchbase/.travis.yml generated vendored Normal file
View file

@ -0,0 +1,5 @@
language: go
install: go get -v -d ./... && go build -v ./...
script: go test -v ./...
go: 1.1.1

19
vendor/github.com/couchbase/go-couchbase/LICENSE generated vendored Normal file
View file

@ -0,0 +1,19 @@
Copyright (c) 2013 Couchbase, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,37 @@
# A smart client for couchbase in go
This is a *unoffical* version of a Couchbase Golang client. If you are
looking for the *Offical* Couchbase Golang client please see
[CB-go])[https://github.com/couchbaselabs/gocb].
This is an evolving package, but does provide a useful interface to a
[couchbase](http://www.couchbase.com/) server including all of the
pool/bucket discovery features, compatible key distribution with other
clients, and vbucket motion awareness so application can continue to
operate during rebalances.
It also supports view querying with source node randomization so you
don't bang on all one node to do all the work.
## Install
go get github.com/couchbase/go-couchbase
## Example
c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/")
if err != nil {
log.Fatalf("Error connecting: %v", err)
}
pool, err := c.GetPool("default")
if err != nil {
log.Fatalf("Error getting pool: %v", err)
}
bucket, err := pool.GetBucket("default")
if err != nil {
log.Fatalf("Error getting bucket: %v", err)
}
bucket.Set("someKey", 0, []string{"an", "example", "list"})

32
vendor/github.com/couchbase/go-couchbase/audit.go generated vendored Normal file
View file

@ -0,0 +1,32 @@
package couchbase
import ()
// Sample data:
// {"disabled":["12333", "22244"],"uid":"132492431","auditdEnabled":true,
// "disabledUsers":[{"name":"bill","domain":"local"},{"name":"bob","domain":"local"}],
// "logPath":"/Users/johanlarson/Library/Application Support/Couchbase/var/lib/couchbase/logs",
// "rotateInterval":86400,"rotateSize":20971520}
type AuditSpec struct {
Disabled []uint32 `json:"disabled"`
Uid string `json:"uid"`
AuditdEnabled bool `json:"auditdEnabled`
DisabledUsers []AuditUser `json:"disabledUsers"`
LogPath string `json:"logPath"`
RotateInterval int64 `json:"rotateInterval"`
RotateSize int64 `json:"rotateSize"`
}
type AuditUser struct {
Name string `json:"name"`
Domain string `json:"domain"`
}
func (c *Client) GetAuditSpec() (*AuditSpec, error) {
ret := &AuditSpec{}
err := c.parseURLResponse("/settings/audit", ret)
if err != nil {
return nil, err
}
return ret, nil
}

1513
vendor/github.com/couchbase/go-couchbase/client.go generated vendored Normal file

File diff suppressed because it is too large Load diff

421
vendor/github.com/couchbase/go-couchbase/conn_pool.go generated vendored Normal file
View file

@ -0,0 +1,421 @@
package couchbase
import (
"crypto/tls"
"errors"
"sync/atomic"
"time"
"github.com/couchbase/gomemcached"
"github.com/couchbase/gomemcached/client"
"github.com/couchbase/goutils/logging"
)
// GenericMcdAuthHandler is a kind of AuthHandler that performs
// special auth exchange (like non-standard auth, possibly followed by
// select-bucket).
type GenericMcdAuthHandler interface {
AuthHandler
AuthenticateMemcachedConn(host string, conn *memcached.Client) error
}
// Error raised when a connection can't be retrieved from a pool.
var TimeoutError = errors.New("timeout waiting to build connection")
var errClosedPool = errors.New("the connection pool is closed")
var errNoPool = errors.New("no connection pool")
// Default timeout for retrieving a connection from the pool.
var ConnPoolTimeout = time.Hour * 24 * 30
// overflow connection closer cycle time
var ConnCloserInterval = time.Second * 30
// ConnPoolAvailWaitTime is the amount of time to wait for an existing
// connection from the pool before considering the creation of a new
// one.
var ConnPoolAvailWaitTime = time.Millisecond
type connectionPool struct {
host string
mkConn func(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error)
auth AuthHandler
connections chan *memcached.Client
createsem chan bool
bailOut chan bool
poolSize int
connCount uint64
inUse bool
encrypted bool
tlsConfig *tls.Config
bucket string
}
func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int, tlsConfig *tls.Config, bucket string, encrypted bool) *connectionPool {
connSize := poolSize
if closer {
connSize += poolOverflow
}
rv := &connectionPool{
host: host,
connections: make(chan *memcached.Client, connSize),
createsem: make(chan bool, poolSize+poolOverflow),
mkConn: defaultMkConn,
auth: ah,
poolSize: poolSize,
bucket: bucket,
encrypted: encrypted,
}
if encrypted {
rv.tlsConfig = tlsConfig
}
if closer {
rv.bailOut = make(chan bool, 1)
go rv.connCloser()
}
return rv
}
// ConnPoolTimeout is notified whenever connections are acquired from a pool.
var ConnPoolCallback func(host string, source string, start time.Time, err error)
// Use regular in-the-clear connection if tlsConfig is nil.
// Use secure connection (TLS) if tlsConfig is set.
func defaultMkConn(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error) {
var features memcached.Features
var conn *memcached.Client
var err error
if tlsConfig == nil {
conn, err = memcached.Connect("tcp", host)
} else {
conn, err = memcached.ConnectTLS("tcp", host, tlsConfig)
}
if err != nil {
return nil, err
}
if DefaultTimeout > 0 {
conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
}
if TCPKeepalive == true {
conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second)
}
if EnableMutationToken == true {
features = append(features, memcached.FeatureMutationToken)
}
if EnableDataType == true {
features = append(features, memcached.FeatureDataType)
}
if EnableXattr == true {
features = append(features, memcached.FeatureXattr)
}
if EnableCollections {
features = append(features, memcached.FeatureCollections)
}
if len(features) > 0 {
res, err := conn.EnableFeatures(features)
if err != nil && isTimeoutError(err) {
conn.Close()
return nil, err
}
if err != nil || res.Status != gomemcached.SUCCESS {
logging.Warnf("Unable to enable features %v", err)
}
}
if gah, ok := ah.(GenericMcdAuthHandler); ok {
err = gah.AuthenticateMemcachedConn(host, conn)
if err != nil {
conn.Close()
return nil, err
}
if DefaultTimeout > 0 {
conn.SetDeadline(noDeadline)
}
return conn, nil
}
name, pass, bucket := ah.GetCredentials()
if bucket == "" {
// Authenticator does not know specific bucket.
bucket = bucketName
}
if name != "default" {
_, err = conn.Auth(name, pass)
if err != nil {
conn.Close()
return nil, err
}
// Select bucket (Required for cb_auth creds)
// Required when doing auth with _admin credentials
if bucket != "" && bucket != name {
_, err = conn.SelectBucket(bucket)
if err != nil {
conn.Close()
return nil, err
}
}
}
if DefaultTimeout > 0 {
conn.SetDeadline(noDeadline)
}
return conn, nil
}
func (cp *connectionPool) Close() (err error) {
defer func() {
if recover() != nil {
err = errors.New("connectionPool.Close error")
}
}()
if cp.bailOut != nil {
// defensively, we won't wait if the channel is full
select {
case cp.bailOut <- false:
default:
}
}
close(cp.connections)
for c := range cp.connections {
c.Close()
}
return
}
func (cp *connectionPool) Node() string {
return cp.host
}
func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) {
if cp == nil {
return nil, errNoPool
}
path := ""
if ConnPoolCallback != nil {
defer func(path *string, start time.Time) {
ConnPoolCallback(cp.host, *path, start, err)
}(&path, time.Now())
}
path = "short-circuit"
// short-circuit available connetions.
select {
case rv, isopen := <-cp.connections:
if !isopen {
return nil, errClosedPool
}
atomic.AddUint64(&cp.connCount, 1)
return rv, nil
default:
}
t := time.NewTimer(ConnPoolAvailWaitTime)
defer t.Stop()
// Try to grab an available connection within 1ms
select {
case rv, isopen := <-cp.connections:
path = "avail1"
if !isopen {
return nil, errClosedPool
}
atomic.AddUint64(&cp.connCount, 1)
return rv, nil
case <-t.C:
// No connection came around in time, let's see
// whether we can get one or build a new one first.
t.Reset(d) // Reuse the timer for the full timeout.
select {
case rv, isopen := <-cp.connections:
path = "avail2"
if !isopen {
return nil, errClosedPool
}
atomic.AddUint64(&cp.connCount, 1)
return rv, nil
case cp.createsem <- true:
path = "create"
// Build a connection if we can't get a real one.
// This can potentially be an overflow connection, or
// a pooled connection.
rv, err := cp.mkConn(cp.host, cp.auth, cp.tlsConfig, cp.bucket)
if err != nil {
// On error, release our create hold
<-cp.createsem
} else {
atomic.AddUint64(&cp.connCount, 1)
}
return rv, err
case <-t.C:
return nil, ErrTimeout
}
}
}
func (cp *connectionPool) Get() (*memcached.Client, error) {
return cp.GetWithTimeout(ConnPoolTimeout)
}
func (cp *connectionPool) Return(c *memcached.Client) {
if c == nil {
return
}
if cp == nil {
c.Close()
}
if c.IsHealthy() {
defer func() {
if recover() != nil {
// This happens when the pool has already been
// closed and we're trying to return a
// connection to it anyway. Just close the
// connection.
c.Close()
}
}()
select {
case cp.connections <- c:
default:
<-cp.createsem
c.Close()
}
} else {
<-cp.createsem
c.Close()
}
}
// give the ability to discard a connection from a pool
// useful for ditching connections to the wrong node after a rebalance
func (cp *connectionPool) Discard(c *memcached.Client) {
<-cp.createsem
c.Close()
}
// asynchronous connection closer
func (cp *connectionPool) connCloser() {
var connCount uint64
t := time.NewTimer(ConnCloserInterval)
defer t.Stop()
for {
connCount = cp.connCount
// we don't exist anymore! bail out!
select {
case <-cp.bailOut:
return
case <-t.C:
}
t.Reset(ConnCloserInterval)
// no overflow connections open or sustained requests for connections
// nothing to do until the next cycle
if len(cp.connections) <= cp.poolSize ||
ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) {
continue
}
// close overflow connections now that they are not needed
for c := range cp.connections {
select {
case <-cp.bailOut:
return
default:
}
// bail out if close did not work out
if !cp.connCleanup(c) {
return
}
if len(cp.connections) <= cp.poolSize {
break
}
}
}
}
// close connection with recovery on error
func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) {
// just in case we are closing a connection after
// bailOut has been sent but we haven't yet read it
defer func() {
if recover() != nil {
rv = false
}
}()
rv = true
c.Close()
<-cp.createsem
return
}
func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) {
if cp == nil {
return nil, errNoPool
}
mc, err := cp.Get()
if err != nil {
return nil, err
}
// A connection can't be used after TAP; Dont' count it against the
// connection pool capacity
<-cp.createsem
return mc.StartTapFeed(*args)
}
const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) {
if cp == nil {
return nil, errNoPool
}
mc, err := cp.Get()
if err != nil {
return nil, err
}
// A connection can't be used after it has been allocated to UPR;
// Dont' count it against the connection pool capacity
<-cp.createsem
uf, err := mc.NewUprFeed()
if err != nil {
return nil, err
}
if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil {
return nil, err
}
if err := uf.StartFeedWithConfig(data_chan_size); err != nil {
return nil, err
}
return uf, nil
}

288
vendor/github.com/couchbase/go-couchbase/ddocs.go generated vendored Normal file
View file

@ -0,0 +1,288 @@
package couchbase
import (
"bytes"
"encoding/json"
"fmt"
"github.com/couchbase/goutils/logging"
"io/ioutil"
"net/http"
)
// ViewDefinition represents a single view within a design document.
type ViewDefinition struct {
Map string `json:"map"`
Reduce string `json:"reduce,omitempty"`
}
// DDoc is the document body of a design document specifying a view.
type DDoc struct {
Language string `json:"language,omitempty"`
Views map[string]ViewDefinition `json:"views"`
}
// DDocsResult represents the result from listing the design
// documents.
type DDocsResult struct {
Rows []struct {
DDoc struct {
Meta map[string]interface{}
JSON DDoc
} `json:"doc"`
} `json:"rows"`
}
// GetDDocs lists all design documents
func (b *Bucket) GetDDocs() (DDocsResult, error) {
var ddocsResult DDocsResult
b.RLock()
pool := b.pool
uri := b.DDocs.URI
b.RUnlock()
// MB-23555 ephemeral buckets have no ddocs
if uri == "" {
return DDocsResult{}, nil
}
err := pool.client.parseURLResponse(uri, &ddocsResult)
if err != nil {
return DDocsResult{}, err
}
return ddocsResult, nil
}
func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error {
ddocURI := fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
err := b.parseAPIResponse(ddocURI, &into)
if err != nil {
return err
}
return nil
}
func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error) {
var ddocsResult DDocsResult
b.RLock()
uri := b.DDocs.URI
b.RUnlock()
// MB-23555 ephemeral buckets have no ddocs
if uri == "" {
return DDocsResult{}, nil
}
err := b.parseURLResponse(uri, &ddocsResult)
if err != nil {
return DDocsResult{}, err
}
return ddocsResult, nil
}
func (b *Bucket) ddocURL(docname string) (string, error) {
u, err := b.randomBaseURL()
if err != nil {
return "", err
}
u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
return u.String(), nil
}
func (b *Bucket) ddocURLNext(nodeId int, docname string) (string, int, error) {
u, selected, err := b.randomNextURL(nodeId)
if err != nil {
return "", -1, err
}
u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
return u.String(), selected, nil
}
const ABS_MAX_RETRIES = 10
const ABS_MIN_RETRIES = 3
func (b *Bucket) getMaxRetries() (int, error) {
maxRetries := len(b.Nodes())
if maxRetries == 0 {
return 0, fmt.Errorf("No available Couch rest URLs")
}
if maxRetries > ABS_MAX_RETRIES {
maxRetries = ABS_MAX_RETRIES
} else if maxRetries < ABS_MIN_RETRIES {
maxRetries = ABS_MIN_RETRIES
}
return maxRetries, nil
}
// PutDDoc installs a design document.
func (b *Bucket) PutDDoc(docname string, value interface{}) error {
var Err error
maxRetries, err := b.getMaxRetries()
if err != nil {
return err
}
lastNode := START_NODE_ID
for retryCount := 0; retryCount < maxRetries; retryCount++ {
Err = nil
ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
if err != nil {
return err
}
lastNode = selectedNode
logging.Infof(" Trying with selected node %d", selectedNode)
j, err := json.Marshal(value)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", ddocU, bytes.NewReader(j))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */))
if err != nil {
return err
}
res, err := doHTTPRequest(req)
if err != nil {
return err
}
if res.StatusCode != 201 {
body, _ := ioutil.ReadAll(res.Body)
Err = fmt.Errorf("error installing view: %v / %s",
res.Status, body)
logging.Errorf(" Error in PutDDOC %v. Retrying...", Err)
res.Body.Close()
b.Refresh()
continue
}
res.Body.Close()
break
}
return Err
}
// GetDDoc retrieves a specific a design doc.
func (b *Bucket) GetDDoc(docname string, into interface{}) error {
var Err error
var res *http.Response
maxRetries, err := b.getMaxRetries()
if err != nil {
return err
}
lastNode := START_NODE_ID
for retryCount := 0; retryCount < maxRetries; retryCount++ {
Err = nil
ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
if err != nil {
return err
}
lastNode = selectedNode
logging.Infof(" Trying with selected node %d", selectedNode)
req, err := http.NewRequest("GET", ddocU, nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */))
if err != nil {
return err
}
res, err = doHTTPRequest(req)
if err != nil {
return err
}
if res.StatusCode != 200 {
body, _ := ioutil.ReadAll(res.Body)
Err = fmt.Errorf("error reading view: %v / %s",
res.Status, body)
logging.Errorf(" Error in GetDDOC %v Retrying...", Err)
b.Refresh()
res.Body.Close()
continue
}
defer res.Body.Close()
break
}
if Err != nil {
return Err
}
d := json.NewDecoder(res.Body)
return d.Decode(into)
}
// DeleteDDoc removes a design document.
func (b *Bucket) DeleteDDoc(docname string) error {
var Err error
maxRetries, err := b.getMaxRetries()
if err != nil {
return err
}
lastNode := START_NODE_ID
for retryCount := 0; retryCount < maxRetries; retryCount++ {
Err = nil
ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
if err != nil {
return err
}
lastNode = selectedNode
logging.Infof(" Trying with selected node %d", selectedNode)
req, err := http.NewRequest("DELETE", ddocU, nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
err = maybeAddAuth(req, b.authHandler(false /* bucket not already locked */))
if err != nil {
return err
}
res, err := doHTTPRequest(req)
if err != nil {
return err
}
if res.StatusCode != 200 {
body, _ := ioutil.ReadAll(res.Body)
Err = fmt.Errorf("error deleting view : %v / %s", res.Status, body)
logging.Errorf(" Error in DeleteDDOC %v. Retrying ... ", Err)
b.Refresh()
res.Body.Close()
continue
}
res.Body.Close()
break
}
return Err
}

3
vendor/github.com/couchbase/go-couchbase/go.mod generated vendored Normal file
View file

@ -0,0 +1,3 @@
module github.com/couchbase/go-couchbase
go 1.13

300
vendor/github.com/couchbase/go-couchbase/observe.go generated vendored Normal file
View file

@ -0,0 +1,300 @@
package couchbase
import (
"fmt"
"github.com/couchbase/goutils/logging"
"sync"
)
type PersistTo uint8
const (
PersistNone = PersistTo(0x00)
PersistMaster = PersistTo(0x01)
PersistOne = PersistTo(0x02)
PersistTwo = PersistTo(0x03)
PersistThree = PersistTo(0x04)
PersistFour = PersistTo(0x05)
)
type ObserveTo uint8
const (
ObserveNone = ObserveTo(0x00)
ObserveReplicateOne = ObserveTo(0x01)
ObserveReplicateTwo = ObserveTo(0x02)
ObserveReplicateThree = ObserveTo(0x03)
ObserveReplicateFour = ObserveTo(0x04)
)
type JobType uint8
const (
OBSERVE = JobType(0x00)
PERSIST = JobType(0x01)
)
type ObservePersistJob struct {
vb uint16
vbuuid uint64
hostname string
jobType JobType
failover uint8
lastPersistedSeqNo uint64
currentSeqNo uint64
resultChan chan *ObservePersistJob
errorChan chan *OPErrResponse
}
type OPErrResponse struct {
vb uint16
vbuuid uint64
err error
job *ObservePersistJob
}
var ObservePersistPool = NewPool(1024)
var OPJobChan = make(chan *ObservePersistJob, 1024)
var OPJobDone = make(chan bool)
var wg sync.WaitGroup
func (b *Bucket) StartOPPollers(maxWorkers int) {
for i := 0; i < maxWorkers; i++ {
go b.OPJobPoll()
wg.Add(1)
}
wg.Wait()
}
func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
numNodes := len(b.Nodes())
if int(nPersist) > numNodes || int(nObserve) > numNodes {
return fmt.Errorf("Not enough healthy nodes in the cluster")
}
if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
return fmt.Errorf("Not enough replicas in the cluster")
}
if EnableMutationToken == false {
return fmt.Errorf("Mutation Tokens not enabled ")
}
b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
return
}
func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
b.RLock()
ds := b.ds
b.RUnlock()
if ds == nil {
return
}
nj := 0 // total number of jobs
resultChan := make(chan *ObservePersistJob, 10)
errChan := make(chan *OPErrResponse, 10)
nodes := b.GetNodeList(vb)
if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
return fmt.Errorf("Not enough healthy nodes in the cluster"), false
}
logging.Infof("Node list %v", nodes)
if ds.Observe >= ObserveReplicateOne {
// create a job for each host
for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
opJob := ObservePersistPool.Get()
opJob.vb = vb
opJob.vbuuid = vbuuid
opJob.jobType = OBSERVE
opJob.hostname = nodes[i]
opJob.resultChan = resultChan
opJob.errorChan = errChan
OPJobChan <- opJob
nj++
}
}
if ds.Persist >= PersistMaster {
for i := PersistMaster; i < ds.Persist+1; i++ {
opJob := ObservePersistPool.Get()
opJob.vb = vb
opJob.vbuuid = vbuuid
opJob.jobType = PERSIST
opJob.hostname = nodes[i]
opJob.resultChan = resultChan
opJob.errorChan = errChan
OPJobChan <- opJob
nj++
}
}
ok := true
for ok {
select {
case res := <-resultChan:
jobDone := false
if res.failover == 0 {
// no failover
if res.jobType == PERSIST {
if res.lastPersistedSeqNo >= seqNo {
jobDone = true
}
} else {
if res.currentSeqNo >= seqNo {
jobDone = true
}
}
if jobDone == true {
nj--
ObservePersistPool.Put(res)
} else {
// requeue this job
OPJobChan <- res
}
} else {
// Not currently handling failover scenarios TODO
nj--
ObservePersistPool.Put(res)
failover = true
}
if nj == 0 {
// done with all the jobs
ok = false
close(resultChan)
close(errChan)
}
case Err := <-errChan:
logging.Errorf("Error in Observe/Persist %v", Err.err)
err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
nj--
ObservePersistPool.Put(Err.job)
if nj == 0 {
close(resultChan)
close(errChan)
ok = false
}
}
}
return
}
func (b *Bucket) OPJobPoll() {
ok := true
for ok == true {
select {
case job := <-OPJobChan:
pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
if pool == nil {
errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
errRes.job = job
job.errorChan <- errRes
continue
}
conn, err := pool.Get()
if err != nil {
errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
errRes.job = job
job.errorChan <- errRes
continue
}
res, err := conn.ObserveSeq(job.vb, job.vbuuid)
if err != nil {
errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
errRes.err = fmt.Errorf("Command failed %v", err)
errRes.job = job
job.errorChan <- errRes
continue
}
pool.Return(conn)
job.lastPersistedSeqNo = res.LastPersistedSeqNo
job.currentSeqNo = res.CurrentSeqNo
job.failover = res.Failover
job.resultChan <- job
case <-OPJobDone:
logging.Infof("Observe Persist Poller exitting")
ok = false
}
}
wg.Done()
}
func (b *Bucket) GetNodeList(vb uint16) []string {
vbm := b.VBServerMap()
if len(vbm.VBucketMap) < int(vb) {
logging.Infof("vbmap smaller than vblist")
return nil
}
nodes := make([]string, len(vbm.VBucketMap[vb]))
for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
n := vbm.VBucketMap[vb][i]
if n < 0 {
continue
}
node := b.getMasterNode(n)
if len(node) > 1 {
nodes[i] = node
}
continue
}
return nodes
}
//pool of ObservePersist Jobs
type OPpool struct {
pool chan *ObservePersistJob
}
// NewPool creates a new pool of jobs
func NewPool(max int) *OPpool {
return &OPpool{
pool: make(chan *ObservePersistJob, max),
}
}
// Borrow a Client from the pool.
func (p *OPpool) Get() *ObservePersistJob {
var o *ObservePersistJob
select {
case o = <-p.pool:
default:
o = &ObservePersistJob{}
}
return o
}
// Return returns a Client to the pool.
func (p *OPpool) Put(o *ObservePersistJob) {
select {
case p.pool <- o:
default:
// let it go, let it go...
}
}

1746
vendor/github.com/couchbase/go-couchbase/pools.go generated vendored Normal file

File diff suppressed because it is too large Load diff

106
vendor/github.com/couchbase/go-couchbase/port_map.go generated vendored Normal file
View file

@ -0,0 +1,106 @@
package couchbase
/*
The goal here is to map a hostname:port combination to another hostname:port
combination. The original hostname:port gives the name and regular KV port
of a couchbase server. We want to determine the corresponding SSL KV port.
To do this, we have a pool services structure, as obtained from
the /pools/default/nodeServices API.
For a fully configured two-node system, the structure may look like this:
{"rev":32,"nodesExt":[
{"services":{"mgmt":8091,"mgmtSSL":18091,"fts":8094,"ftsSSL":18094,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211},"hostname":"172.23.123.101"},
{"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211,"n1ql":8093,"n1qlSSL":18093},"thisNode":true,"hostname":"172.23.123.102"}]}
In this case, note the "hostname" fields, and the "kv" and "kvSSL" fields.
For a single-node system, perhaps brought up for testing, the structure may look like this:
{"rev":66,"nodesExt":[
{"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"kv":11210,"kvSSL":11207,"capi":8092,"capiSSL":18092,"projector":9999,"n1ql":8093,"n1qlSSL":18093},"thisNode":true}],"clusterCapabilitiesVer":[1,0],"clusterCapabilities":{"n1ql":["enhancedPreparedStatements"]}}
Here, note that there is only a single entry in the "nodeExt" array and that it does not have a "hostname" field.
We will assume that either hostname fields are present, or there is only a single node.
*/
import (
"encoding/json"
"fmt"
"net"
"strconv"
)
func ParsePoolServices(jsonInput string) (*PoolServices, error) {
ps := &PoolServices{}
err := json.Unmarshal([]byte(jsonInput), ps)
return ps, err
}
// Accepts a "host:port" string representing the KV TCP port and the pools
// nodeServices payload and returns a host:port string representing the KV
// TLS port on the same node as the KV TCP port.
// Returns the original host:port if in case of local communication (services
// on the same node as source)
func MapKVtoSSL(hostport string, ps *PoolServices) (string, bool, error) {
return MapKVtoSSLExt(hostport, ps, false)
}
func MapKVtoSSLExt(hostport string, ps *PoolServices, force bool) (string, bool, error) {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
return "", false, fmt.Errorf("Unable to split hostport %s: %v", hostport, err)
}
portInt, err := strconv.Atoi(port)
if err != nil {
return "", false, fmt.Errorf("Unable to parse host/port combination %s: %v", hostport, err)
}
var ns *NodeServices
for i := range ps.NodesExt {
hostname := ps.NodesExt[i].Hostname
if len(hostname) != 0 && hostname != host {
/* If the hostname is the empty string, it means the node (and by extension
the cluster) is configured on the loopback. Further, it means that the client
should use whatever hostname it used to get the nodeServices information in
the first place to access the cluster. Thus, when the hostname is empty in
the nodeService entry we can assume that client will use the hostname it used
to access the KV TCP endpoint - and thus that it automatically "matches".
If hostname is not empty and doesn't match then we move to the next entry.
*/
continue
}
kvPort, found := ps.NodesExt[i].Services["kv"]
if !found {
/* not a node with a KV service */
continue
}
if kvPort == portInt {
ns = &(ps.NodesExt[i])
break
}
}
if ns == nil {
return "", false, fmt.Errorf("Unable to parse host/port combination %s: no matching node found among %d", hostport, len(ps.NodesExt))
}
kvSSL, found := ns.Services["kvSSL"]
if !found {
return "", false, fmt.Errorf("Unable to map host/port combination %s: target host has no kvSSL port listed", hostport)
}
//Don't encrypt for communication between local nodes
if !force && (len(ns.Hostname) == 0 || ns.ThisNode) {
return hostport, false, nil
}
ip := net.ParseIP(host)
if ip != nil && ip.To4() == nil && ip.To16() != nil { // IPv6 and not a FQDN
// Prefix and suffix square brackets as SplitHostPort removes them,
// see: https://golang.org/pkg/net/#SplitHostPort
host = "[" + host + "]"
}
return fmt.Sprintf("%s:%d", host, kvSSL), true, nil
}

228
vendor/github.com/couchbase/go-couchbase/streaming.go generated vendored Normal file
View file

@ -0,0 +1,228 @@
package couchbase
import (
"encoding/json"
"fmt"
"github.com/couchbase/goutils/logging"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"time"
"unsafe"
)
// Bucket auto-updater gets the latest version of the bucket config from
// the server. If the configuration has changed then updated the local
// bucket information. If the bucket has been deleted then notify anyone
// who is holding a reference to this bucket
const MAX_RETRY_COUNT = 5
const DISCONNECT_PERIOD = 120 * time.Second
type NotifyFn func(bucket string, err error)
type StreamingFn func(bucket *Bucket)
// Use TCP keepalive to detect half close sockets
var updaterTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
}
var updaterHTTPClient = &http.Client{Transport: updaterTransport}
func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) {
var err error
var res *http.Response
for i := 0; i < HTTP_MAX_RETRY; i++ {
res, err = updaterHTTPClient.Do(req)
if err != nil && isHttpConnError(err) {
continue
}
break
}
if err != nil {
return nil, err
}
return res, err
}
func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
b.RunBucketUpdater2(nil, notify)
}
func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {
go func() {
err := b.UpdateBucket2(streamingFn)
if err != nil {
if notify != nil {
notify(b.GetName(), err)
}
logging.Errorf(" Bucket Updater exited with err %v", err)
}
}()
}
func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) {
if !bucketLocked {
b.Lock()
defer b.Unlock()
}
old := b.connPools
b.connPools = unsafe.Pointer(&with)
if old != nil {
for _, pool := range *(*[]*connectionPool)(old) {
if pool != nil && pool.inUse == false {
pool.Close()
}
}
}
return
}
func (b *Bucket) UpdateBucket() error {
return b.UpdateBucket2(nil)
}
func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
var failures int
var returnErr error
var poolServices PoolServices
for {
if failures == MAX_RETRY_COUNT {
logging.Errorf(" Maximum failures reached. Exiting loop...")
return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
}
nodes := b.Nodes()
if len(nodes) < 1 {
return fmt.Errorf("No healthy nodes found")
}
startNode := rand.Intn(len(nodes))
node := nodes[(startNode)%len(nodes)]
streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, uriAdj(b.GetName()))
logging.Infof(" Trying with %s", streamUrl)
req, err := http.NewRequest("GET", streamUrl, nil)
if err != nil {
return err
}
// Lock here to avoid having pool closed under us.
b.RLock()
err = maybeAddAuth(req, b.pool.client.ah)
b.RUnlock()
if err != nil {
return err
}
res, err := doHTTPRequestForUpdate(req)
if err != nil {
return err
}
if res.StatusCode != 200 {
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
res.Body.Close()
returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
failures++
continue
}
dec := json.NewDecoder(res.Body)
tmpb := &Bucket{}
for {
err := dec.Decode(&tmpb)
if err != nil {
returnErr = err
res.Body.Close()
break
}
// if we got here, reset failure count
failures = 0
if b.pool.client.tlsConfig != nil {
poolServices, err = b.pool.client.GetPoolServices("default")
if err != nil {
returnErr = err
res.Body.Close()
break
}
}
b.Lock()
// mark all the old connection pools for deletion
pools := b.getConnPools(true /* already locked */)
for _, pool := range pools {
if pool != nil {
pool.inUse = false
}
}
newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
for i := range newcps {
// get the old connection pool and check if it is still valid
pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig {
// if the hostname and index is unchanged then reuse this pool
newcps[i] = pool
pool.inUse = true
continue
}
// else create a new pool
var encrypted bool
hostport := tmpb.VBSMJson.ServerList[i]
if b.pool.client.tlsConfig != nil {
hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
if err != nil {
b.Unlock()
return err
}
}
if b.ah != nil {
newcps[i] = newConnectionPool(hostport,
b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
} else {
newcps[i] = newConnectionPool(hostport,
b.authHandler(true /* bucket already locked */),
false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
}
}
b.replaceConnPools2(newcps, true /* bucket already locked */)
tmpb.ah = b.ah
b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
b.Unlock()
if streamingFn != nil {
streamingFn(tmpb)
}
logging.Debugf("Got new configuration for bucket %s", b.GetName())
}
// we are here because of an error
failures++
continue
}
return nil
}

143
vendor/github.com/couchbase/go-couchbase/tap.go generated vendored Normal file
View file

@ -0,0 +1,143 @@
package couchbase
import (
"github.com/couchbase/gomemcached/client"
"github.com/couchbase/goutils/logging"
"sync"
"time"
)
const initialRetryInterval = 1 * time.Second
const maximumRetryInterval = 30 * time.Second
// A TapFeed streams mutation events from a bucket.
//
// Events from the bucket can be read from the channel 'C'. Remember
// to call Close() on it when you're done, unless its channel has
// closed itself already.
type TapFeed struct {
C <-chan memcached.TapEvent
bucket *Bucket
args *memcached.TapArguments
nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes
output chan memcached.TapEvent // Same as C but writeably-typed
wg sync.WaitGroup
quit chan bool
}
// StartTapFeed creates and starts a new Tap feed
func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
if args == nil {
defaultArgs := memcached.DefaultTapArguments()
args = &defaultArgs
}
feed := &TapFeed{
bucket: b,
args: args,
output: make(chan memcached.TapEvent, 10),
quit: make(chan bool),
}
go feed.run()
feed.C = feed.output
return feed, nil
}
// Goroutine that runs the feed
func (feed *TapFeed) run() {
retryInterval := initialRetryInterval
bucketOK := true
for {
// Connect to the TAP feed of each server node:
if bucketOK {
killSwitch, err := feed.connectToNodes()
if err == nil {
// Run until one of the sub-feeds fails:
select {
case <-killSwitch:
case <-feed.quit:
return
}
feed.closeNodeFeeds()
retryInterval = initialRetryInterval
}
}
// On error, try to refresh the bucket in case the list of nodes changed:
logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
feed.bucket.Name, retryInterval)
err := feed.bucket.Refresh()
bucketOK = err == nil
select {
case <-time.After(retryInterval):
case <-feed.quit:
return
}
if retryInterval *= 2; retryInterval > maximumRetryInterval {
retryInterval = maximumRetryInterval
}
}
}
func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
killSwitch = make(chan bool)
for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
var singleFeed *memcached.TapFeed
singleFeed, err = serverConn.StartTapFeed(feed.args)
if err != nil {
logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
feed.closeNodeFeeds()
return
}
feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
feed.wg.Add(1)
}
return
}
// Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
defer feed.wg.Done()
for {
select {
case event, ok := <-singleFeed.C:
if !ok {
if singleFeed.Error != nil {
logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
}
killSwitch <- true
return
}
feed.output <- event
case <-feed.quit:
return
}
}
}
func (feed *TapFeed) closeNodeFeeds() {
for _, f := range feed.nodeFeeds {
f.Close()
}
feed.nodeFeeds = nil
}
// Close a Tap feed.
func (feed *TapFeed) Close() error {
select {
case <-feed.quit:
return nil
default:
}
feed.closeNodeFeeds()
close(feed.quit)
feed.wg.Wait()
close(feed.output)
return nil
}

399
vendor/github.com/couchbase/go-couchbase/upr.go generated vendored Normal file
View file

@ -0,0 +1,399 @@
package couchbase
import (
"log"
"sync"
"time"
"fmt"
"github.com/couchbase/gomemcached"
"github.com/couchbase/gomemcached/client"
"github.com/couchbase/goutils/logging"
)
// A UprFeed streams mutation events from a bucket.
//
// Events from the bucket can be read from the channel 'C'. Remember
// to call Close() on it when you're done, unless its channel has
// closed itself already.
type UprFeed struct {
C <-chan *memcached.UprEvent
bucket *Bucket
nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes
output chan *memcached.UprEvent // Same as C but writeably-typed
outputClosed bool
quit chan bool
name string // name of this UPR feed
sequence uint32 // sequence number for this feed
connected bool
killSwitch chan bool
closing bool
wg sync.WaitGroup
dcp_buffer_size uint32
data_chan_size int
}
// UprFeed from a single connection
type FeedInfo struct {
uprFeed *memcached.UprFeed // UPR feed handle
host string // hostname
connected bool // connected
quit chan bool // quit channel
}
type FailoverLog map[uint16]memcached.FailoverLog
// GetFailoverLogs, get the failover logs for a set of vbucket ids
func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {
// map vbids to their corresponding hosts
vbHostList := make(map[string][]uint16)
vbm := b.VBServerMap()
if len(vbm.VBucketMap) < len(vBuckets) {
return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
vbm.VBucketMap, vBuckets)
}
for _, vb := range vBuckets {
masterID := vbm.VBucketMap[vb][0]
master := b.getMasterNode(masterID)
if master == "" {
return nil, fmt.Errorf("No master found for vb %d", vb)
}
vbList := vbHostList[master]
if vbList == nil {
vbList = make([]uint16, 0)
}
vbList = append(vbList, vb)
vbHostList[master] = vbList
}
failoverLogMap := make(FailoverLog)
for _, serverConn := range b.getConnPools(false /* not already locked */) {
vbList := vbHostList[serverConn.host]
if vbList == nil {
continue
}
mc, err := serverConn.Get()
if err != nil {
logging.Infof("No Free connections for vblist %v", vbList)
return nil, fmt.Errorf("No Free connections for host %s",
serverConn.host)
}
// close the connection so that it doesn't get reused for upr data
// connection
defer mc.Close()
mc.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
failoverlogs, err := mc.UprGetFailoverLog(vbList)
if err != nil {
return nil, fmt.Errorf("Error getting failover log %s host %s",
err.Error(), serverConn.host)
}
for vb, log := range failoverlogs {
failoverLogMap[vb] = *log
}
}
return failoverLogMap, nil
}
func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
}
// StartUprFeed creates and starts a new Upr feed
// No data will be sent on the channel unless vbuckets streams are requested
func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {
feed := &UprFeed{
bucket: b,
output: make(chan *memcached.UprEvent, data_chan_size),
quit: make(chan bool),
nodeFeeds: make(map[string]*FeedInfo, 0),
name: name,
sequence: sequence,
killSwitch: make(chan bool),
dcp_buffer_size: dcp_buffer_size,
data_chan_size: data_chan_size,
}
err := feed.connectToNodes()
if err != nil {
return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
}
feed.connected = true
go feed.run()
feed.C = feed.output
return feed, nil
}
// UprRequestStream starts a stream for a vb on a feed
func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
defer func() {
if r := recover(); r != nil {
log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
}
}()
vbm := feed.bucket.VBServerMap()
if len(vbm.VBucketMap) < int(vb) {
return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
vb, vbm.VBucketMap)
}
if int(vb) >= len(vbm.VBucketMap) {
return fmt.Errorf("Invalid vbucket id %d", vb)
}
masterID := vbm.VBucketMap[vb][0]
master := feed.bucket.getMasterNode(masterID)
if master == "" {
return fmt.Errorf("Master node not found for vbucket %d", vb)
}
singleFeed := feed.nodeFeeds[master]
if singleFeed == nil {
return fmt.Errorf("UprFeed for this host not found")
}
if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
return err
}
return nil
}
// UprCloseStream ends a vbucket stream.
func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {
defer func() {
if r := recover(); r != nil {
log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
}
}()
vbm := feed.bucket.VBServerMap()
if len(vbm.VBucketMap) < int(vb) {
return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
vb, vbm.VBucketMap)
}
if int(vb) >= len(vbm.VBucketMap) {
return fmt.Errorf("Invalid vbucket id %d", vb)
}
masterID := vbm.VBucketMap[vb][0]
master := feed.bucket.getMasterNode(masterID)
if master == "" {
return fmt.Errorf("Master node not found for vbucket %d", vb)
}
singleFeed := feed.nodeFeeds[master]
if singleFeed == nil {
return fmt.Errorf("UprFeed for this host not found")
}
if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
return err
}
return nil
}
// Goroutine that runs the feed
func (feed *UprFeed) run() {
retryInterval := initialRetryInterval
bucketOK := true
for {
// Connect to the UPR feed of each server node:
if bucketOK {
// Run until one of the sub-feeds fails:
select {
case <-feed.killSwitch:
case <-feed.quit:
return
}
//feed.closeNodeFeeds()
retryInterval = initialRetryInterval
}
if feed.closing == true {
// we have been asked to shut down
return
}
// On error, try to refresh the bucket in case the list of nodes changed:
logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
feed.bucket.Name, retryInterval)
if err := feed.bucket.Refresh(); err != nil {
// if we fail to refresh the bucket, exit the feed
// MB-14917
logging.Infof("Unable to refresh bucket %s ", err.Error())
close(feed.output)
feed.outputClosed = true
feed.closeNodeFeeds()
return
}
// this will only connect to nodes that are not connected or changed
// user will have to reconnect the stream
err := feed.connectToNodes()
if err != nil {
logging.Infof("Unable to connect to nodes..exit ")
close(feed.output)
feed.outputClosed = true
feed.closeNodeFeeds()
return
}
bucketOK = err == nil
select {
case <-time.After(retryInterval):
case <-feed.quit:
return
}
if retryInterval *= 2; retryInterval > maximumRetryInterval {
retryInterval = maximumRetryInterval
}
}
}
func (feed *UprFeed) connectToNodes() (err error) {
nodeCount := 0
for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
// this maybe a reconnection, so check if the connection to the node
// already exists. Connect only if the node is not found in the list
// or connected == false
nodeFeed := feed.nodeFeeds[serverConn.host]
if nodeFeed != nil && nodeFeed.connected == true {
continue
}
var singleFeed *memcached.UprFeed
var name string
if feed.name == "" {
name = "DefaultUprClient"
} else {
name = feed.name
}
singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
if err != nil {
logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
feed.closeNodeFeeds()
return
}
// add the node to the connection map
feedInfo := &FeedInfo{
uprFeed: singleFeed,
connected: true,
host: serverConn.host,
quit: make(chan bool),
}
feed.nodeFeeds[serverConn.host] = feedInfo
go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
feed.wg.Add(1)
nodeCount++
}
if nodeCount == 0 {
return fmt.Errorf("No connection to bucket")
}
return nil
}
// Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
singleFeed := nodeFeed.uprFeed
defer func() {
feed.wg.Done()
if r := recover(); r != nil {
//if feed is not closing, re-throw the panic
if feed.outputClosed != true && feed.closing != true {
panic(r)
} else {
logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")
}
}
}()
for {
select {
case <-nodeFeed.quit:
nodeFeed.connected = false
return
case event, ok := <-singleFeed.C:
if !ok {
if singleFeed.Error != nil {
logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
}
killSwitch <- true
return
}
if feed.outputClosed == true {
// someone closed the node feed
logging.Infof("Node need closed, returning from forwardUprEvent")
return
}
feed.output <- event
if event.Status == gomemcached.NOT_MY_VBUCKET {
logging.Infof(" Got a not my vbucket error !! ")
if err := feed.bucket.Refresh(); err != nil {
logging.Errorf("Unable to refresh bucket %s ", err.Error())
feed.closeNodeFeeds()
return
}
// this will only connect to nodes that are not connected or changed
// user will have to reconnect the stream
if err := feed.connectToNodes(); err != nil {
logging.Errorf("Unable to connect to nodes %s", err.Error())
return
}
}
}
}
}
func (feed *UprFeed) closeNodeFeeds() {
for _, f := range feed.nodeFeeds {
logging.Infof(" Sending close to forwardUprEvent ")
close(f.quit)
f.uprFeed.Close()
}
feed.nodeFeeds = nil
}
// Close a Upr feed.
func (feed *UprFeed) Close() error {
select {
case <-feed.quit:
return nil
default:
}
feed.closing = true
feed.closeNodeFeeds()
close(feed.quit)
feed.wg.Wait()
if feed.outputClosed == false {
feed.outputClosed = true
close(feed.output)
}
return nil
}

121
vendor/github.com/couchbase/go-couchbase/users.go generated vendored Normal file
View file

@ -0,0 +1,121 @@
package couchbase
import (
"bytes"
"fmt"
)
type User struct {
Name string
Id string
Domain string
Roles []Role
}
type Role struct {
Role string
BucketName string `json:"bucket_name"`
ScopeName string `json:"scope_name"`
CollectionName string `json:"collection_name"`
}
// Sample:
// {"role":"admin","name":"Admin","desc":"Can manage ALL cluster features including security.","ce":true}
// {"role":"query_select","bucket_name":"*","name":"Query Select","desc":"Can execute SELECT statement on bucket to retrieve data"}
type RoleDescription struct {
Role string
Name string
Desc string
Ce bool
BucketName string `json:"bucket_name"`
}
// Return user-role data, as parsed JSON.
// Sample:
// [{"id":"ivanivanov","name":"Ivan Ivanov","roles":[{"role":"cluster_admin"},{"bucket_name":"default","role":"bucket_admin"}]},
// {"id":"petrpetrov","name":"Petr Petrov","roles":[{"role":"replication_admin"}]}]
func (c *Client) GetUserRoles() ([]interface{}, error) {
ret := make([]interface{}, 0, 1)
err := c.parseURLResponse("/settings/rbac/users", &ret)
if err != nil {
return nil, err
}
// Get the configured administrator.
// Expected result: {"port":8091,"username":"Administrator"}
adminInfo := make(map[string]interface{}, 2)
err = c.parseURLResponse("/settings/web", &adminInfo)
if err != nil {
return nil, err
}
// Create a special entry for the configured administrator.
adminResult := map[string]interface{}{
"name": adminInfo["username"],
"id": adminInfo["username"],
"domain": "ns_server",
"roles": []interface{}{
map[string]interface{}{
"role": "admin",
},
},
}
// Add the configured administrator to the list of results.
ret = append(ret, adminResult)
return ret, nil
}
func (c *Client) GetUserInfoAll() ([]User, error) {
ret := make([]User, 0, 16)
err := c.parseURLResponse("/settings/rbac/users", &ret)
if err != nil {
return nil, err
}
return ret, nil
}
func rolesToParamFormat(roles []Role) string {
var buffer bytes.Buffer
for i, role := range roles {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(role.Role)
if role.BucketName != "" {
buffer.WriteString("[")
buffer.WriteString(role.BucketName)
buffer.WriteString("]")
}
}
return buffer.String()
}
func (c *Client) PutUserInfo(u *User) error {
params := map[string]interface{}{
"name": u.Name,
"roles": rolesToParamFormat(u.Roles),
}
var target string
switch u.Domain {
case "external":
target = "/settings/rbac/users/" + u.Id
case "local":
target = "/settings/rbac/users/local/" + u.Id
default:
return fmt.Errorf("Unknown user type: %s", u.Domain)
}
var ret string // PUT returns an empty string. We ignore it.
err := c.parsePutURLResponse(target, params, &ret)
return err
}
func (c *Client) GetRolesAll() ([]RoleDescription, error) {
ret := make([]RoleDescription, 0, 32)
err := c.parseURLResponse("/settings/rbac/roles", &ret)
if err != nil {
return nil, err
}
return ret, nil
}

49
vendor/github.com/couchbase/go-couchbase/util.go generated vendored Normal file
View file

@ -0,0 +1,49 @@
package couchbase
import (
"fmt"
"net/url"
"strings"
)
// CleanupHost returns the hostname with the given suffix removed.
func CleanupHost(h, commonSuffix string) string {
if strings.HasSuffix(h, commonSuffix) {
return h[:len(h)-len(commonSuffix)]
}
return h
}
// FindCommonSuffix returns the longest common suffix from the given
// strings.
func FindCommonSuffix(input []string) string {
rv := ""
if len(input) < 2 {
return ""
}
from := input
for i := len(input[0]); i > 0; i-- {
common := true
suffix := input[0][i:]
for _, s := range from {
if !strings.HasSuffix(s, suffix) {
common = false
break
}
}
if common {
rv = suffix
}
}
return rv
}
// ParseURL is a wrapper around url.Parse with some sanity-checking
func ParseURL(urlStr string) (result *url.URL, err error) {
result, err = url.Parse(urlStr)
if result != nil && result.Scheme == "" {
result = nil
err = fmt.Errorf("invalid URL <%s>", urlStr)
}
return
}

77
vendor/github.com/couchbase/go-couchbase/vbmap.go generated vendored Normal file
View file

@ -0,0 +1,77 @@
package couchbase
var crc32tab = []uint32{
0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d}
// VBHash finds the vbucket for the given key.
func (b *Bucket) VBHash(key string) uint32 {
crc := uint32(0xffffffff)
for x := 0; x < len(key); x++ {
crc = (crc >> 8) ^ crc32tab[(uint64(crc)^uint64(key[x]))&0xff]
}
vbm := b.VBServerMap()
return ((^crc) >> 16) & 0x7fff & (uint32(len(vbm.VBucketMap)) - 1)
}

231
vendor/github.com/couchbase/go-couchbase/views.go generated vendored Normal file
View file

@ -0,0 +1,231 @@
package couchbase
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"time"
)
// ViewRow represents a single result from a view.
//
// Doc is present only if include_docs was set on the request.
type ViewRow struct {
ID string
Key interface{}
Value interface{}
Doc *interface{}
}
// A ViewError is a node-specific error indicating a partial failure
// within a view result.
type ViewError struct {
From string
Reason string
}
func (ve ViewError) Error() string {
return "Node: " + ve.From + ", reason: " + ve.Reason
}
// ViewResult holds the entire result set from a view request,
// including the rows and the errors.
type ViewResult struct {
TotalRows int `json:"total_rows"`
Rows []ViewRow
Errors []ViewError
}
func (b *Bucket) randomBaseURL() (*url.URL, error) {
nodes := b.HealthyNodes()
if len(nodes) == 0 {
return nil, errors.New("no available couch rest URLs")
}
nodeNo := rand.Intn(len(nodes))
node := nodes[nodeNo]
b.RLock()
name := b.Name
pool := b.pool
b.RUnlock()
u, err := ParseURL(node.CouchAPIBase)
if err != nil {
return nil, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
name, nodeNo, node.CouchAPIBase, err)
} else if pool != nil {
u.User = pool.client.BaseURL.User
}
return u, err
}
const START_NODE_ID = -1
func (b *Bucket) randomNextURL(lastNode int) (*url.URL, int, error) {
nodes := b.HealthyNodes()
if len(nodes) == 0 {
return nil, -1, errors.New("no available couch rest URLs")
}
var nodeNo int
if lastNode == START_NODE_ID || lastNode >= len(nodes) {
// randomly select a node if the value of lastNode is invalid
nodeNo = rand.Intn(len(nodes))
} else {
// wrap around the node list
nodeNo = (lastNode + 1) % len(nodes)
}
b.RLock()
name := b.Name
pool := b.pool
b.RUnlock()
node := nodes[nodeNo]
u, err := ParseURL(node.CouchAPIBase)
if err != nil {
return nil, -1, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
name, nodeNo, node.CouchAPIBase, err)
} else if pool != nil {
u.User = pool.client.BaseURL.User
}
return u, nodeNo, err
}
// DocID is the document ID type for the startkey_docid parameter in
// views.
type DocID string
func qParam(k, v string) string {
format := `"%s"`
switch k {
case "startkey_docid", "endkey_docid", "stale":
format = "%s"
}
return fmt.Sprintf(format, v)
}
// ViewURL constructs a URL for a view with the given ddoc, view name,
// and parameters.
func (b *Bucket) ViewURL(ddoc, name string,
params map[string]interface{}) (string, error) {
u, err := b.randomBaseURL()
if err != nil {
return "", err
}
values := url.Values{}
for k, v := range params {
switch t := v.(type) {
case DocID:
values[k] = []string{string(t)}
case string:
values[k] = []string{qParam(k, t)}
case int:
values[k] = []string{fmt.Sprintf(`%d`, t)}
case bool:
values[k] = []string{fmt.Sprintf(`%v`, t)}
default:
b, err := json.Marshal(v)
if err != nil {
return "", fmt.Errorf("unsupported value-type %T in Query, "+
"json encoder said %v", t, err)
}
values[k] = []string{fmt.Sprintf(`%v`, string(b))}
}
}
if ddoc == "" && name == "_all_docs" {
u.Path = fmt.Sprintf("/%s/_all_docs", b.GetName())
} else {
u.Path = fmt.Sprintf("/%s/_design/%s/_view/%s", b.GetName(), ddoc, name)
}
u.RawQuery = values.Encode()
return u.String(), nil
}
// ViewCallback is called for each view invocation.
var ViewCallback func(ddoc, name string, start time.Time, err error)
// ViewCustom performs a view request that can map row values to a
// custom type.
//
// See the source to View for an example usage.
func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{},
vres interface{}) (err error) {
if SlowServerCallWarningThreshold > 0 {
defer slowLog(time.Now(), "call to ViewCustom(%q, %q)", ddoc, name)
}
if ViewCallback != nil {
defer func(t time.Time) { ViewCallback(ddoc, name, t, err) }(time.Now())
}
u, err := b.ViewURL(ddoc, name, params)
if err != nil {
return err
}
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return err
}
ah := b.authHandler(false /* bucket not yet locked */)
maybeAddAuth(req, ah)
res, err := doHTTPRequest(req)
if err != nil {
return fmt.Errorf("error starting view req at %v: %v", u, err)
}
defer res.Body.Close()
if res.StatusCode != 200 {
bod := make([]byte, 512)
l, _ := res.Body.Read(bod)
return fmt.Errorf("error executing view req at %v: %v - %s",
u, res.Status, bod[:l])
}
body, err := ioutil.ReadAll(res.Body)
if err := json.Unmarshal(body, vres); err != nil {
return nil
}
return nil
}
// View executes a view.
//
// The ddoc parameter is just the bare name of your design doc without
// the "_design/" prefix.
//
// Parameters are string keys with values that correspond to couchbase
// view parameters. Primitive should work fairly naturally (booleans,
// ints, strings, etc...) and other values will attempt to be JSON
// marshaled (useful for array indexing on on view keys, for example).
//
// Example:
//
// res, err := couchbase.View("myddoc", "myview", map[string]interface{}{
// "group_level": 2,
// "startkey_docid": []interface{}{"thing"},
// "endkey_docid": []interface{}{"thing", map[string]string{}},
// "stale": false,
// })
func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error) {
vres := ViewResult{}
if err := b.ViewCustom(ddoc, name, params, &vres); err != nil {
//error in accessing views. Retry once after a bucket refresh
b.Refresh()
return vres, b.ViewCustom(ddoc, name, params, &vres)
} else {
return vres, nil
}
}

View file

@ -45,7 +45,7 @@ type streamIdNonResumeScopeMeta struct {
}
func (c *CollectionsFilter) IsValid() error {
if c.UseManifestUid {
if c.UseManifestUid && c.UseStreamId {
return fmt.Errorf("Not implemented yet")
}
@ -99,8 +99,10 @@ func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) {
case false:
switch c.UseManifestUid {
case true:
// TODO
return nil, fmt.Errorf("NotImplemented1")
filter := &nonStreamIdResumeScopeMeta{
ManifestId: fmt.Sprintf("%x", c.ManifestUid),
}
output = *filter
case false:
switch len(c.CollectionsList) > 0 {
case true:

View file

@ -19,8 +19,8 @@ import (
)
type ClientIface interface {
Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Auth(user, pass string) (*gomemcached.MCResponse, error)
AuthList() (*gomemcached.MCResponse, error)
AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
@ -30,44 +30,87 @@ type ClientIface interface {
CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
CollectionEnabled() bool
Close() error
Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
Del(vb uint16, key string) (*gomemcached.MCResponse, error)
Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
EnableMutationToken() (*gomemcached.MCResponse, error)
EnableFeatures(features Features) (*gomemcached.MCResponse, error)
Get(vb uint16, key string) (*gomemcached.MCResponse, error)
Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)
GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)
GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error
GetCollectionsManifest() (*gomemcached.MCResponse, error)
GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error)
GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error)
GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error
GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
GetRandomDoc() (*gomemcached.MCResponse, error)
GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)
GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)
Hijack() io.ReadWriteCloser
Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
Observe(vb uint16, key string) (result ObserveResult, err error)
ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
Receive() (*gomemcached.MCResponse, error)
ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
SetKeepAliveOptions(interval time.Duration)
SetReadDeadline(t time.Time)
SetDeadline(t time.Time)
SelectBucket(bucket string) (*gomemcached.MCResponse, error)
SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Stats(key string) ([]StatValue, error)
StatsMap(key string) (map[string]string, error)
StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
Transmit(req *gomemcached.MCRequest) error
TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
TransmitResponse(res *gomemcached.MCResponse) error
UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
// UprFeed Related
NewUprFeed() (*UprFeed, error)
NewUprFeedIface() (UprFeedIface, error)
NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
}
type ClientContext struct {
// Collection-based context
CollId uint32
// VB-state related context
// nil means not used in this context
VbState *VbStateType
}
type VbStateType uint8
const (
VbAlive VbStateType = 0x00
VbActive VbStateType = 0x01
VbReplica VbStateType = 0x02
VbPending VbStateType = 0x03
VbDead VbStateType = 0x04
)
func (context *ClientContext) InitExtras(req *gomemcached.MCRequest, client *Client) {
if req == nil || client == nil {
return
}
var bytesToAllocate int
switch req.Opcode {
case gomemcached.GET_ALL_VB_SEQNOS:
if context.VbState != nil {
bytesToAllocate += 4
}
if client.CollectionEnabled() {
if context.VbState == nil {
bytesToAllocate += 8
} else {
bytesToAllocate += 4
}
}
}
if bytesToAllocate > 0 {
req.Extras = make([]byte, bytesToAllocate)
}
}
const bufsize = 1024
@ -102,8 +145,8 @@ type Client struct {
hdrBuf []byte
featureMtx sync.RWMutex
sentHeloFeatures Features
collectionsEnabled uint32
deadline time.Time
}
var (
@ -156,7 +199,11 @@ func (c *Client) SetReadDeadline(t time.Time) {
}
func (c *Client) SetDeadline(t time.Time) {
if t.Equal(c.deadline) {
return
}
c.conn.SetDeadline(t)
c.deadline = t
}
// Wrap an existing transport.
@ -287,60 +334,103 @@ func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) {
//Send a hello command to enable specific features
func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) {
var payload []byte
collectionsEnabled := 0
for _, feature := range features {
if feature == FeatureCollections {
collectionsEnabled = 1
}
payload = append(payload, 0, 0)
binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
}
c.featureMtx.Lock()
c.sentHeloFeatures = features
c.featureMtx.Unlock()
return c.Send(&gomemcached.MCRequest{
rv, err := c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.HELLO,
Key: []byte("GoMemcached"),
Body: payload,
})
if err == nil && collectionsEnabled != 0 {
atomic.StoreUint32(&c.collectionsEnabled, uint32(collectionsEnabled))
}
return rv, err
}
// Sets collection info for a request
func (c *Client) setCollection(req *gomemcached.MCRequest, context ...*ClientContext) error {
req.CollIdLen = 0
collectionId := uint32(0)
if len(context) > 0 {
collectionId = context[0].CollId
}
// if the optional collection is specified, it must be default for clients that haven't turned on collections
if atomic.LoadUint32(&c.collectionsEnabled) == 0 {
if collectionId != 0 {
return fmt.Errorf("Client does not use collections but a collection was specified")
}
} else {
req.CollIdLen = binary.PutUvarint(req.CollId[:], uint64(collectionId))
}
return nil
}
func (c *Client) setVbSeqnoContext(req *gomemcached.MCRequest, context ...*ClientContext) error {
if len(context) == 0 || req == nil {
return nil
}
switch req.Opcode {
case gomemcached.GET_ALL_VB_SEQNOS:
if len(context) == 0 {
return nil
}
if len(req.Extras) == 0 {
context[0].InitExtras(req, c)
}
if context[0].VbState != nil {
binary.BigEndian.PutUint32(req.Extras, uint32(*(context[0].VbState)))
}
if c.CollectionEnabled() {
binary.BigEndian.PutUint32(req.Extras[4:8], context[0].CollId)
}
return nil
default:
return fmt.Errorf("setVbState Not supported for opcode: %v", req.Opcode.String())
}
}
// Get the value for a key.
func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.GET,
VBucket: vb,
Key: []byte(key),
})
}
// Get the value for a key from a collection, identified by collection id.
func (c *Client) GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error) {
keyBytes := []byte(key)
encodedCid := make([]byte, binary.MaxVarintLen32)
lenEncodedCid := binary.PutUvarint(encodedCid, uint64(cid))
encodedKey := make([]byte, 0, lenEncodedCid+len(keyBytes))
encodedKey = append(encodedKey, encodedCid[0:lenEncodedCid]...)
encodedKey = append(encodedKey, keyBytes...)
return c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.GET,
VBucket: vb,
Key: encodedKey,
})
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Get the xattrs, doc value for the input key
func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) {
func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
extraBuf, valueBuf := GetSubDocVal(subPaths)
res, err := c.Send(&gomemcached.MCRequest{
req := &gomemcached.MCRequest{
Opcode: gomemcached.SUBDOC_MULTI_LOOKUP,
VBucket: vb,
Key: []byte(key),
Extras: extraBuf,
Body: valueBuf,
})
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
res, err := c.Send(req)
if err != nil && IfResStatusError(res) {
return res, err
@ -376,48 +466,56 @@ func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcache
}
func (c *Client) CollectionEnabled() bool {
c.featureMtx.RLock()
defer c.featureMtx.RUnlock()
for _, feature := range c.sentHeloFeatures {
if feature == FeatureCollections {
return true
}
}
return false
return atomic.LoadUint32(&c.collectionsEnabled) > 0
}
// Get the value for a key, and update expiry
func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) {
func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error) {
extraBuf := make([]byte, 4)
binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp))
return c.Send(&gomemcached.MCRequest{
req := &gomemcached.MCRequest{
Opcode: gomemcached.GAT,
VBucket: vb,
Key: []byte(key),
Extras: extraBuf,
})
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Get metadata for a key
func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.GET_META,
VBucket: vb,
Key: []byte(key),
})
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Del deletes a key.
func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.DELETE,
VBucket: vb,
Key: []byte(key)})
Key: []byte(key),
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Get a random document
func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error) {
func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
Opcode: 0xB6,
})
@ -522,8 +620,7 @@ func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) {
}
func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) {
key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: opcode,
VBucket: vb,
@ -533,13 +630,16 @@ func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Body: body}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
return c.Send(req)
}
func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) {
key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: opcode,
VBucket: vb,
@ -549,20 +649,29 @@ func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Body: body}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
return c.Send(req)
}
// Incr increments the value at the given key.
func (c *Client) Incr(vb uint16, key string,
amt, def uint64, exp int) (uint64, error) {
amt, def uint64, exp int, context ...*ClientContext) (uint64, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.INCREMENT,
VBucket: vb,
Key: []byte(key),
Extras: make([]byte, 8+8+4),
}
err := c.setCollection(req, context...)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(req.Extras[:8], amt)
binary.BigEndian.PutUint64(req.Extras[8:16], def)
binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
@ -577,14 +686,18 @@ func (c *Client) Incr(vb uint16, key string,
// Decr decrements the value at the given key.
func (c *Client) Decr(vb uint16, key string,
amt, def uint64, exp int) (uint64, error) {
amt, def uint64, exp int, context ...*ClientContext) (uint64, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.DECREMENT,
VBucket: vb,
Key: []byte(key),
Extras: make([]byte, 8+8+4),
}
err := c.setCollection(req, context...)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(req.Extras[:8], amt)
binary.BigEndian.PutUint64(req.Extras[8:16], def)
binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
@ -599,24 +712,24 @@ func (c *Client) Decr(vb uint16, key string,
// Add a value for a key (store if not exists).
func (c *Client) Add(vb uint16, key string, flags int, exp int,
body []byte) (*gomemcached.MCResponse, error) {
return c.store(gomemcached.ADD, vb, key, flags, exp, body)
body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.store(gomemcached.ADD, vb, key, flags, exp, body, context...)
}
// Set the value for a key.
func (c *Client) Set(vb uint16, key string, flags int, exp int,
body []byte) (*gomemcached.MCResponse, error) {
return c.store(gomemcached.SET, vb, key, flags, exp, body)
body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.store(gomemcached.SET, vb, key, flags, exp, body, context...)
}
// SetCas set the value for a key with cas
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
body []byte) (*gomemcached.MCResponse, error) {
return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body)
body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body, context...)
}
// Append data to the value of a key.
func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) {
func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.APPEND,
VBucket: vb,
@ -625,11 +738,15 @@ func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResp
Opaque: 0,
Body: data}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// GetBulk gets keys in bulk
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error {
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error {
stopch := make(chan bool)
var wg sync.WaitGroup
@ -698,6 +815,10 @@ func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MC
Opcode: gomemcached.GET,
VBucket: vb,
}
err := c.setCollection(memcachedReqPkt, context...)
if err != nil {
return err
}
if len(subPaths) > 0 {
extraBuf, valueBuf := GetSubDocVal(subPaths)
@ -719,7 +840,7 @@ func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MC
} // End of Get request
// finally transmit a NOOP
err := c.Transmit(&gomemcached.MCRequest{
err = c.Transmit(&gomemcached.MCRequest{
Opcode: gomemcached.NOOP,
VBucket: vb,
Opaque: c.opaque,
@ -747,7 +868,10 @@ func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) {
}
// Xattr retrieval - subdoc multi get
extraBuf = append(extraBuf, uint8(0x04))
// Set deleted true only if it is not expiration
if len(subPaths) != 1 || subPaths[0] != "$document.exptime" {
extraBuf = append(extraBuf, uint8(0x04))
}
valueBuf = make([]byte, num*4+totalBytesLen)
@ -1138,6 +1262,38 @@ func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]strin
return nil
}
// UprGetFailoverLog for given list of vbuckets.
func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) {
rq := &gomemcached.MCRequest{
Opcode: gomemcached.UPR_FAILOVERLOG,
Opaque: opaqueFailover,
}
failoverLogs := make(map[uint16]*FailoverLog)
for _, vBucket := range vb {
rq.VBucket = vBucket
if err := mc.Transmit(rq); err != nil {
return nil, err
}
res, err := mc.Receive()
if err != nil {
return nil, fmt.Errorf("failed to receive %s", err.Error())
} else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
}
flog, err := parseFailoverLog(res.Body)
if err != nil {
return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
}
failoverLogs[vBucket] = flog
}
return failoverLogs, nil
}
// Hijack exposes the underlying connection from this client.
//
// It also marks the connection as unhealthy since the client will
@ -1166,3 +1322,98 @@ func IfResStatusError(response *gomemcached.MCResponse) bool {
func (c *Client) Conn() io.ReadWriteCloser {
return c.conn
}
// Since the binary request supports only a single collection at a time, it is possible
// that this may be called multiple times in succession by callers to get vbSeqnos for
// multiple collections. Thus, caller could pass in a non-nil map so the gomemcached
// client won't need to allocate new map for each call to prevent too much GC
// NOTE: If collection is enabled and context is not given, KV will still return stats for default collection
func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error) {
rq := &gomemcached.MCRequest{
Opcode: gomemcached.GET_ALL_VB_SEQNOS,
Opaque: opaqueGetSeqno,
}
err := c.setVbSeqnoContext(rq, context...)
if err != nil {
return vbSeqnoMap, err
}
err = c.Transmit(rq)
if err != nil {
return vbSeqnoMap, err
}
res, err := c.Receive()
if err != nil {
return vbSeqnoMap, fmt.Errorf("failed to receive: %v", err)
}
vbSeqnosList, err := parseGetSeqnoResp(res.Body)
if err != nil {
logging.Errorf("Unable to parse : err: %v\n", err)
return vbSeqnoMap, err
}
if vbSeqnoMap == nil {
vbSeqnoMap = make(map[uint16]uint64)
}
combineMapWithReturnedList(vbSeqnoMap, vbSeqnosList)
return vbSeqnoMap, nil
}
func combineMapWithReturnedList(vbSeqnoMap map[uint16]uint64, list *VBSeqnos) {
if list == nil {
return
}
// If the map contains exactly the existing vbs in the list, no need to modify
needToCleanupMap := true
if len(vbSeqnoMap) == 0 {
needToCleanupMap = false
} else if len(vbSeqnoMap) == len(*list) {
needToCleanupMap = false
for _, pair := range *list {
_, vbExists := vbSeqnoMap[uint16(pair[0])]
if !vbExists {
needToCleanupMap = true
break
}
}
}
if needToCleanupMap {
var vbsToDelete []uint16
for vbInSeqnoMap, _ := range vbSeqnoMap {
// If a vb in the seqno map doesn't exist in the returned list, need to clean up
// to ensure returning an accurate result
found := false
var vbno uint16
for _, pair := range *list {
vbno = uint16(pair[0])
if vbno == vbInSeqnoMap {
found = true
break
} else if vbno > vbInSeqnoMap {
// definitely not in the list
break
}
}
if !found {
vbsToDelete = append(vbsToDelete, vbInSeqnoMap)
}
}
for _, vbno := range vbsToDelete {
delete(vbSeqnoMap, vbno)
}
}
// Set the map with data from the list
for _, pair := range *list {
vbno := uint16(pair[0])
seqno := pair[1]
vbSeqnoMap[vbno] = seqno
}
}

View file

@ -89,6 +89,9 @@ type UprEvent struct {
// FailoverLog containing vvuid and sequnce number
type FailoverLog [][2]uint64
// Containing a pair of vbno and the high seqno
type VBSeqnos [][2]uint64
func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
event := &UprEvent{
Opcode: rq.Opcode,
@ -148,6 +151,8 @@ func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFrom
event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
} else if event.IsSystemEvent() {
event.PopulateEvent(rq.Extras)
} else if event.IsSeqnoAdv() {
event.PopulateSeqnoAdv(rq.Extras)
}
return event
@ -199,17 +204,31 @@ func (event *UprEvent) IsSystemEvent() bool {
return event.Opcode == gomemcached.DCP_SYSTEM_EVENT
}
func (event *UprEvent) IsSeqnoAdv() bool {
return event.Opcode == gomemcached.DCP_SEQNO_ADV
}
func (event *UprEvent) PopulateEvent(extras []byte) {
if len(extras) < dcpSystemEventExtraLen {
// Wrong length, don't parse
return
}
event.Seqno = binary.BigEndian.Uint64(extras[:8])
event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12]))
var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14])
event.SysEventVersion = uint8(versionTemp >> 8)
}
func (event *UprEvent) PopulateSeqnoAdv(extras []byte) {
if len(extras) < dcpSeqnoAdvExtraLen {
// Wrong length, don't parse
return
}
event.Seqno = binary.BigEndian.Uint64(extras[:8])
}
func (event *UprEvent) GetSystemEventName() (string, error) {
switch event.SystemEvent {
case CollectionCreate:

View file

@ -20,9 +20,11 @@ const uprDeletetionExtraLen = 18
const uprDeletetionWithDeletionTimeExtraLen = 21
const uprSnapshotExtraLen = 20
const dcpSystemEventExtraLen = 13
const dcpSeqnoAdvExtraLen = 8
const bufferAckThreshold = 0.2
const opaqueOpen = 0xBEAF0001
const opaqueFailover = 0xDEADBEEF
const opaqueGetSeqno = 0xDEADBEEF
const uprDefaultNoopInterval = 120
// Counter on top of opaqueOpen that others can draw from for open and control msgs
@ -605,44 +607,6 @@ func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, featu
return
}
// UprGetFailoverLog for given list of vbuckets.
func (mc *Client) UprGetFailoverLog(
vb []uint16) (map[uint16]*FailoverLog, error) {
rq := &gomemcached.MCRequest{
Opcode: gomemcached.UPR_FAILOVERLOG,
Opaque: opaqueFailover,
}
var allFeaturesDisabled UprFeatures
if err := doUprOpen(mc, "FailoverLog", 0, allFeaturesDisabled); err != nil {
return nil, fmt.Errorf("UPR_OPEN Failed %s", err.Error())
}
failoverLogs := make(map[uint16]*FailoverLog)
for _, vBucket := range vb {
rq.VBucket = vBucket
if err := mc.Transmit(rq); err != nil {
return nil, err
}
res, err := mc.Receive()
if err != nil {
return nil, fmt.Errorf("failed to receive %s", err.Error())
} else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
}
flog, err := parseFailoverLog(res.Body)
if err != nil {
return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
}
failoverLogs[vBucket] = flog
}
return failoverLogs, nil
}
// UprRequestStream for a single vbucket.
func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
@ -793,7 +757,6 @@ func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error {
}
func parseFailoverLog(body []byte) (*FailoverLog, error) {
if len(body)%16 != 0 {
err := fmt.Errorf("invalid body length %v, in failover-log", len(body))
return nil, err
@ -808,6 +771,24 @@ func parseFailoverLog(body []byte) (*FailoverLog, error) {
return &log, nil
}
func parseGetSeqnoResp(body []byte) (*VBSeqnos, error) {
// vbno of 2 bytes + seqno of 8 bytes
var entryLen int = 10
if len(body)%entryLen != 0 {
err := fmt.Errorf("invalid body length %v, in getVbSeqno", len(body))
return nil, err
}
vbSeqnos := make(VBSeqnos, len(body)/entryLen)
for i, j := 0, 0; i < len(body); i += entryLen {
vbno := binary.BigEndian.Uint16(body[i : i+2])
seqno := binary.BigEndian.Uint64(body[i+2 : i+10])
vbSeqnos[j] = [2]uint64{uint64(vbno), seqno}
j++
}
return &vbSeqnos, nil
}
func handleStreamRequest(
res *gomemcached.MCResponse,
headerBuf []byte,
@ -987,6 +968,14 @@ loop:
break loop
}
event = makeUprEvent(pkt, stream, bytes)
case gomemcached.UPR_FAILOVERLOG:
logging.Infof("Failover log for vb %d received: %v", vb, pkt)
case gomemcached.DCP_SEQNO_ADV:
if stream == nil {
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
break loop
}
event = makeUprEvent(pkt, stream, bytes)
default:
logging.Infof("Recived an unknown response for vbucket %d", vb)
}

View file

@ -74,6 +74,7 @@ const (
TAP_VBUCKET_SET = CommandCode(0x45) // Sets state of vbucket in receiver (used in takeover)
TAP_CHECKPOINT_START = CommandCode(0x46) // Notifies start of new checkpoint
TAP_CHECKPOINT_END = CommandCode(0x47) // Notifies end of checkpoint
GET_ALL_VB_SEQNOS = CommandCode(0x48) // Get current high sequence numbers from all vbuckets located on the server
UPR_OPEN = CommandCode(0x50) // Open a UPR connection with a name
UPR_ADDSTREAM = CommandCode(0x51) // Sent by ebucketMigrator to UPR Consumer
@ -102,18 +103,21 @@ const (
SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta.
DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred
DCP_SEQNO_ADV = CommandCode(0x64) // Sent when the vb seqno has advanced due to an unsubscribed event
)
// command codes that are counted toward DCP control buffer
// when DCP clients receive DCP messages with these command codes, they need to provide acknowledgement
var BufferedCommandCodeMap = map[CommandCode]bool{
SET_VBUCKET: true,
UPR_STREAMEND: true,
UPR_SNAPSHOT: true,
UPR_MUTATION: true,
UPR_DELETION: true,
UPR_EXPIRATION: true}
SET_VBUCKET: true,
UPR_STREAMEND: true,
UPR_SNAPSHOT: true,
UPR_MUTATION: true,
UPR_DELETION: true,
UPR_EXPIRATION: true,
DCP_SYSTEM_EVENT: true,
DCP_SEQNO_ADV: true,
}
// Status field for memcached response.
type Status uint16
@ -274,6 +278,8 @@ func init() {
CommandNames[SUBDOC_MULTI_LOOKUP] = "SUBDOC_MULTI_LOOKUP"
CommandNames[GET_COLLECTIONS_MANIFEST] = "GET_COLLECTIONS_MANIFEST"
CommandNames[COLLECTIONS_GET_CID] = "COLLECTIONS_GET_CID"
CommandNames[DCP_SYSTEM_EVENT] = "DCP_SYSTEM_EVENT"
CommandNames[DCP_SEQNO_ADV] = "DCP_SEQNO_ADV"
StatusNames = make(map[Status]string)
StatusNames[SUCCESS] = "SUCCESS"

View file

@ -11,6 +11,8 @@ import (
// The current limit, 20MB, is the size limit supported by ep-engine.
var MaxBodyLen = int(20 * 1024 * 1024)
const _BUFLEN = 256
// MCRequest is memcached Request
type MCRequest struct {
// The command being issued
@ -27,6 +29,10 @@ type MCRequest struct {
DataType uint8
// len() calls are expensive - cache this in case for collection
Keylen int
// Collection id for collection based operations
CollId [binary.MaxVarintLen32]byte
// Length of collection id
CollIdLen int
// Flexible Framing Extras
FramingExtras []FrameInfo
// Stored length of incoming framing extras
@ -34,8 +40,12 @@ type MCRequest struct {
}
// Size gives the number of bytes this request requires.
func (req *MCRequest) HdrSize() int {
return HDR_LEN + len(req.Extras) + req.CollIdLen + req.FramingElen + len(req.Key)
}
func (req *MCRequest) Size() int {
return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta) + req.FramingElen
return req.HdrSize() + len(req.Body) + len(req.ExtMeta)
}
// A debugging string representation of this request
@ -68,7 +78,7 @@ func (req *MCRequest) fillRegularHeaderBytes(data []byte) int {
data[pos] = byte(req.Opcode)
pos++
binary.BigEndian.PutUint16(data[pos:pos+2],
uint16(len(req.Key)))
uint16(req.CollIdLen+len(req.Key)))
pos += 2
// 4
@ -84,7 +94,7 @@ func (req *MCRequest) fillRegularHeaderBytes(data []byte) int {
// 8
binary.BigEndian.PutUint32(data[pos:pos+4],
uint32(len(req.Body)+len(req.Key)+len(req.Extras)+len(req.ExtMeta)))
uint32(len(req.Body)+req.CollIdLen+len(req.Key)+len(req.Extras)+len(req.ExtMeta)))
pos += 4
// 12
@ -97,15 +107,21 @@ func (req *MCRequest) fillRegularHeaderBytes(data []byte) int {
}
pos += 8
// 24 - extras
if len(req.Extras) > 0 {
copy(data[pos:pos+len(req.Extras)], req.Extras)
pos += len(req.Extras)
}
if len(req.Key) > 0 {
if req.CollIdLen > 0 {
copy(data[pos:pos+req.CollIdLen], req.CollId[:])
pos += req.CollIdLen
}
copy(data[pos:pos+len(req.Key)], req.Key)
pos += len(req.Key)
}
return pos
}
@ -132,7 +148,7 @@ func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) {
data[0] = FLEX_MAGIC
data[1] = byte(req.Opcode)
data[2] = byte(req.FramingElen)
data[3] = byte(req.Keylen)
data[3] = byte(req.Keylen + req.CollIdLen)
elen := len(req.Extras)
data[4] = byte(elen)
if req.DataType != 0 {
@ -140,7 +156,7 @@ func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) {
}
binary.BigEndian.PutUint16(data[6:8], req.VBucket)
binary.BigEndian.PutUint32(data[8:12],
uint32(len(req.Body)+req.Keylen+elen+len(req.ExtMeta)+req.FramingElen))
uint32(len(req.Body)+req.Keylen+req.CollIdLen+elen+len(req.ExtMeta)+req.FramingElen))
binary.BigEndian.PutUint32(data[12:16], req.Opaque)
if req.Cas != 0 {
binary.BigEndian.PutUint64(data[16:24], req.Cas)
@ -205,12 +221,27 @@ func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) {
// Add keys
if req.Keylen > 0 {
if mergeMode {
var key []byte
var keylen int
if req.CollIdLen == 0 {
key = req.Key
keylen = req.Keylen
} else {
key = append(key, req.CollId[:]...)
key = append(key, req.Key...)
keylen = req.Keylen + req.CollIdLen
}
outputBytes = ShiftByteSliceRight4Bits(req.Key)
data = Merge2HalfByteSlices(data, outputBytes)
pos += keylen
} else {
if req.CollIdLen > 0 {
copy(data[pos:pos+req.CollIdLen], req.CollId[:])
pos += req.CollIdLen
}
copy(data[pos:pos+req.Keylen], req.Key)
pos += req.Keylen
}
pos += req.Keylen
}
return pos, mergeMode
@ -227,7 +258,7 @@ func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool) {
// HeaderBytes will return the wire representation of the request header
// (with the extras and key).
func (req *MCRequest) HeaderBytes() []byte {
data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key)+req.FramingElen)
data := make([]byte, HDR_LEN+len(req.Extras)+req.CollIdLen+len(req.Key)+req.FramingElen)
req.FillHeaderBytes(data)
@ -237,7 +268,11 @@ func (req *MCRequest) HeaderBytes() []byte {
// Bytes will return the wire representation of this request.
func (req *MCRequest) Bytes() []byte {
data := make([]byte, req.Size())
req.bytes(data)
return data
}
func (req *MCRequest) bytes(data []byte) {
pos, halfByteMode := req.FillHeaderBytes(data)
// TODO - the halfByteMode should be revisited for a more efficient
// way of doing things
@ -259,15 +294,19 @@ func (req *MCRequest) Bytes() []byte {
copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
}
}
return data
}
// Transmit will send this request message across a writer.
func (req *MCRequest) Transmit(w io.Writer) (n int, err error) {
if len(req.Body) < 128 {
n, err = w.Write(req.Bytes())
l := req.Size()
if l < _BUFLEN {
data := make([]byte, l)
req.bytes(data)
n, err = w.Write(data)
} else {
n, err = w.Write(req.HeaderBytes())
data := make([]byte, req.HdrSize())
req.FillHeaderBytes(data)
n, err = w.Write(data)
if err == nil {
m := 0
m, err = w.Write(req.Body)