Fix recovery middleware to render gitea style page. (#13857)

* Some changes to fix recovery

* Move Recovery to middlewares

* Remove trace code

* Fix lint

* add session middleware and remove dependent on macaron for sso

* Fix panic 500 page rendering

* Fix bugs

* Fix fmt

* Fix vendor

* recover unnecessary change

* Fix lint and addd some comments about the copied codes.

* Use util.StatDir instead of com.StatDir

Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
Lunny Xiao 2021-01-05 21:05:40 +08:00 committed by GitHub
parent 126c9331d6
commit 15a475b7db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
75 changed files with 5233 additions and 307 deletions

View file

@ -4,3 +4,4 @@
*.swp
/gocache/gocache
c.out
.idea

View file

@ -17,11 +17,7 @@ type CollectionsFilter struct {
ScopeId uint32
}
type nonStreamIdNonResumeScopeMeta struct {
ScopeId string `json:"scope"`
}
type nonStreamIdResumeScopeMeta struct {
type nonStreamIdNonCollectionsMeta struct {
ManifestId string `json:"uid"`
}
@ -29,7 +25,7 @@ type nonStreamIdNonResumeCollectionsMeta struct {
CollectionsList []string `json:"collections"`
}
type nonStreamIdResumeCollectionsMeta struct {
type nonStreamIdCollectionsMeta struct {
ManifestId string `json:"uid"`
CollectionsList []string `json:"collections"`
}
@ -99,10 +95,19 @@ func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) {
case false:
switch c.UseManifestUid {
case true:
filter := &nonStreamIdResumeScopeMeta{
ManifestId: fmt.Sprintf("%x", c.ManifestUid),
switch len(c.CollectionsList) > 0 {
case true:
filter := &nonStreamIdCollectionsMeta{
ManifestId: fmt.Sprintf("%x", c.ManifestUid),
CollectionsList: c.outputCollectionsFilterColList(),
}
output = *filter
case false:
filter := &nonStreamIdNonCollectionsMeta{
ManifestId: fmt.Sprintf("%x", c.ManifestUid),
}
output = *filter
}
output = *filter
case false:
switch len(c.CollectionsList) > 0 {
case true:
@ -111,7 +116,7 @@ func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) {
}
output = *filter
case false:
output = nonStreamIdNonResumeScopeMeta{ScopeId: c.outputScopeId()}
return nil, fmt.Errorf("Specifying scopeID must require the use of streamId")
}
}
}

View file

@ -375,6 +375,25 @@ func (c *Client) setCollection(req *gomemcached.MCRequest, context ...*ClientCon
return nil
}
// Sets collection info in extras
func (c *Client) setExtrasCollection(req *gomemcached.MCRequest, context ...*ClientContext) error {
collectionId := uint32(0)
if len(context) > 0 {
collectionId = context[0].CollId
}
// if the optional collection is specified, it must be default for clients that haven't turned on collections
if atomic.LoadUint32(&c.collectionsEnabled) == 0 {
if collectionId != 0 {
return fmt.Errorf("Client does not use collections but a collection was specified")
}
} else {
req.Extras = make([]byte, 4)
binary.BigEndian.PutUint32(req.Extras, collectionId)
}
return nil
}
func (c *Client) setVbSeqnoContext(req *gomemcached.MCRequest, context ...*ClientContext) error {
if len(context) == 0 || req == nil {
return nil
@ -516,9 +535,14 @@ func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemca
// Get a random document
func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
req := &gomemcached.MCRequest{
Opcode: 0xB6,
})
}
err := c.setExtrasCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// AuthList lists SASL auth mechanisms.

View file

@ -83,7 +83,8 @@ type UprEvent struct {
SystemEvent SystemEventType // Only valid if IsSystemEvent() is true
SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different
ValueLen int // Cache it to avoid len() calls for performance
CollectionId uint64 // Valid if Collection is in use
CollectionId uint32 // Valid if Collection is in use
StreamId *uint16 // Nil if not in use
}
// FailoverLog containing vvuid and sequnce number
@ -103,7 +104,7 @@ func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFrom
DataType: rq.DataType,
ValueLen: len(rq.Body),
SystemEvent: InvalidSysEvent,
CollectionId: math.MaxUint64,
CollectionId: math.MaxUint32,
}
event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType)
@ -153,6 +154,8 @@ func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFrom
event.PopulateEvent(rq.Extras)
} else if event.IsSeqnoAdv() {
event.PopulateSeqnoAdv(rq.Extras)
} else if event.IsOsoSnapshot() {
event.PopulateOso(rq.Extras)
}
return event
@ -160,6 +163,15 @@ func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFrom
func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) {
switch streamType {
case CollectionsStreamId:
for _, extra := range rq.FramingExtras {
streamId, streamIdErr := extra.GetStreamId()
if streamIdErr == nil {
event.StreamId = &streamId
}
}
// After parsing streamID, still need to populate regular collectionID
fallthrough
case CollectionsNonStreamId:
switch rq.Opcode {
// Only these will have CID encoded within the key
@ -167,15 +179,12 @@ func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest,
gomemcached.UPR_DELETION,
gomemcached.UPR_EXPIRATION:
uleb128 := Uleb128(rq.Key)
result, bytesShifted := uleb128.ToUint64(rq.Keylen)
result, bytesShifted := uleb128.ToUint32(rq.Keylen)
event.CollectionId = result
event.Key = rq.Key[bytesShifted:]
default:
event.Key = rq.Key
}
case CollectionsStreamId:
// TODO - not implemented
fallthrough
case NonCollectionStream:
// Let default behavior be legacy stream type
fallthrough
@ -208,6 +217,10 @@ func (event *UprEvent) IsSeqnoAdv() bool {
return event.Opcode == gomemcached.DCP_SEQNO_ADV
}
func (event *UprEvent) IsOsoSnapshot() bool {
return event.Opcode == gomemcached.DCP_OSO_SNAPSHOT
}
func (event *UprEvent) PopulateEvent(extras []byte) {
if len(extras) < dcpSystemEventExtraLen {
// Wrong length, don't parse
@ -229,6 +242,14 @@ func (event *UprEvent) PopulateSeqnoAdv(extras []byte) {
event.Seqno = binary.BigEndian.Uint64(extras[:8])
}
func (event *UprEvent) PopulateOso(extras []byte) {
if len(extras) < dcpOsoExtraLen {
// Wrong length, don't parse
return
}
event.Flags = binary.BigEndian.Uint32(extras[:4])
}
func (event *UprEvent) GetSystemEventName() (string, error) {
switch event.SystemEvent {
case CollectionCreate:
@ -345,15 +366,32 @@ func (event *UprEvent) GetMaxTTL() (uint32, error) {
}
}
// Only if error is nil:
// Returns true if event states oso begins
// Return false if event states oso ends
func (event *UprEvent) GetOsoBegin() (bool, error) {
if !event.IsOsoSnapshot() {
return false, ErrorInvalidOp
}
if event.Flags == 1 {
return true, nil
} else if event.Flags == 2 {
return false, nil
} else {
return false, ErrorInvalidOp
}
}
type Uleb128 []byte
func (u Uleb128) ToUint64(cachedLen int) (result uint64, bytesShifted int) {
func (u Uleb128) ToUint32(cachedLen int) (result uint32, bytesShifted int) {
var shift uint = 0
for curByte := 0; curByte < cachedLen; curByte++ {
oneByte := u[curByte]
last7Bits := 0x7f & oneByte
result |= uint64(last7Bits) << shift
result |= uint32(last7Bits) << shift
bytesShifted++
if oneByte&0x80 == 0 {
break

View file

@ -26,6 +26,7 @@ const opaqueOpen = 0xBEAF0001
const opaqueFailover = 0xDEADBEEF
const opaqueGetSeqno = 0xDEADBEEF
const uprDefaultNoopInterval = 120
const dcpOsoExtraLen = 4
// Counter on top of opaqueOpen that others can draw from for open and control msgs
var opaqueOpenCtrlWell uint32 = opaqueOpen
@ -117,6 +118,7 @@ type UprFeatures struct {
DcpPriority PriorityType
EnableExpiry bool
EnableStreamId bool
EnableOso bool
}
/**
@ -601,6 +603,20 @@ func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, featu
activatedFeatures.EnableStreamId = true
}
if features.EnableOso {
rq := &gomemcached.MCRequest{
Opcode: gomemcached.UPR_CONTROL,
Key: []byte("enable_out_of_order_snapshots"),
Body: []byte("true"),
Opaque: getUprOpenCtrlOpaque(),
}
err = sendMcRequestSync(feed.conn, rq)
if err != nil {
return
}
activatedFeatures.EnableOso = true
}
// everything is ok so far, set upr feed to open state
feed.activatedFeatures = activatedFeatures
feed.setOpen()
@ -976,6 +992,12 @@ loop:
break loop
}
event = makeUprEvent(pkt, stream, bytes)
case gomemcached.DCP_OSO_SNAPSHOT:
if stream == nil {
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
break loop
}
event = makeUprEvent(pkt, stream, bytes)
default:
logging.Infof("Recived an unknown response for vbucket %d", vb)
}

3
vendor/github.com/couchbase/gomemcached/go.mod generated vendored Normal file
View file

@ -0,0 +1,3 @@
module github.com/couchbase/gomemcached
go 1.13

View file

@ -104,6 +104,7 @@ const (
DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred
DCP_SEQNO_ADV = CommandCode(0x64) // Sent when the vb seqno has advanced due to an unsubscribed event
DCP_OSO_SNAPSHOT = CommandCode(0x65) // Marks the begin and end of out-of-sequence-number stream
)
// command codes that are counted toward DCP control buffer
@ -117,6 +118,7 @@ var BufferedCommandCodeMap = map[CommandCode]bool{
UPR_EXPIRATION: true,
DCP_SYSTEM_EVENT: true,
DCP_SEQNO_ADV: true,
DCP_OSO_SNAPSHOT: true,
}
// Status field for memcached response.
@ -156,6 +158,9 @@ const (
SUBDOC_PATH_NOT_FOUND = Status(0xc0)
SUBDOC_BAD_MULTI = Status(0xcc)
SUBDOC_MULTI_PATH_FAILURE_DELETED = Status(0xd3)
// Not a Memcached status
UNKNOWN_STATUS = Status(0xffff)
)
// for log redaction
@ -174,6 +179,10 @@ var isFatal = map[Status]bool{
EACCESS: true,
ENOMEM: true,
NOT_SUPPORTED: true,
// consider statuses coming from outside couchbase (eg OS errors) as fatal for the connection
// as there might be unread data left over on the wire
UNKNOWN_STATUS: true,
}
// the producer/consumer bit in dcp flags

View file

@ -38,7 +38,7 @@ func (res *MCResponse) Error() string {
}
func errStatus(e error) Status {
status := Status(0xffff)
status := UNKNOWN_STATUS
if res, ok := e.(*MCResponse); ok {
status = res.Status
}