 c88547ce71
			
		
	
	
	
	
	c88547ce71Continues on from #19202. Following the addition of pprof labels we can now more easily understand the relationship between a goroutine and the requests that spawn them. This PR takes advantage of the labels and adds a few others, then provides a mechanism for the monitoring page to query the pprof goroutine profile. The binary profile that results from this profile is immediately piped in to the google library for parsing this and then stack traces are formed for the goroutines. If the goroutine is within a context or has been created from a goroutine within a process context it will acquire the process description labels for that process. The goroutines are mapped with there associate pids and any that do not have an associated pid are placed in a group at the bottom as unbound. In this way we should be able to more easily examine goroutines that have been stuck. A manager command `gitea manager processes` is also provided that can export the processes (with or without stacktraces) to the command line. Signed-off-by: Andrew Thornton <art27@cantab.net>
		
			
				
	
	
		
			424 lines
		
	
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			424 lines
		
	
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2019 The Gitea Authors. All rights reserved.
 | |
| // Use of this source code is governed by a MIT-style
 | |
| // license that can be found in the LICENSE file.
 | |
| 
 | |
| package log
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"runtime/pprof"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/modules/process"
 | |
| )
 | |
| 
 | |
| // Event represents a logging event
 | |
| type Event struct {
 | |
| 	level      Level
 | |
| 	msg        string
 | |
| 	caller     string
 | |
| 	filename   string
 | |
| 	line       int
 | |
| 	time       time.Time
 | |
| 	stacktrace string
 | |
| }
 | |
| 
 | |
| // EventLogger represents the behaviours of a logger
 | |
| type EventLogger interface {
 | |
| 	LogEvent(event *Event) error
 | |
| 	Close()
 | |
| 	Flush()
 | |
| 	GetLevel() Level
 | |
| 	GetStacktraceLevel() Level
 | |
| 	GetName() string
 | |
| 	ReleaseReopen() error
 | |
| }
 | |
| 
 | |
| // ChannelledLog represents a cached channel to a LoggerProvider
 | |
| type ChannelledLog struct {
 | |
| 	ctx            context.Context
 | |
| 	finished       context.CancelFunc
 | |
| 	name           string
 | |
| 	provider       string
 | |
| 	queue          chan *Event
 | |
| 	loggerProvider LoggerProvider
 | |
| 	flush          chan bool
 | |
| 	close          chan bool
 | |
| 	closed         chan bool
 | |
| }
 | |
| 
 | |
| // NewChannelledLog a new logger instance with given logger provider and config.
 | |
| func NewChannelledLog(parent context.Context, name, provider, config string, bufferLength int64) (*ChannelledLog, error) {
 | |
| 	if log, ok := providers[provider]; ok {
 | |
| 
 | |
| 		l := &ChannelledLog{
 | |
| 			queue:  make(chan *Event, bufferLength),
 | |
| 			flush:  make(chan bool),
 | |
| 			close:  make(chan bool),
 | |
| 			closed: make(chan bool),
 | |
| 		}
 | |
| 		l.loggerProvider = log()
 | |
| 		if err := l.loggerProvider.Init(config); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		l.name = name
 | |
| 		l.provider = provider
 | |
| 		l.ctx, _, l.finished = process.GetManager().AddTypedContext(parent, fmt.Sprintf("Logger: %s(%s)", l.name, l.provider), process.SystemProcessType, false)
 | |
| 		go l.Start()
 | |
| 		return l, nil
 | |
| 	}
 | |
| 	return nil, ErrUnknownProvider{provider}
 | |
| }
 | |
| 
 | |
| // Start processing the ChannelledLog
 | |
| func (l *ChannelledLog) Start() {
 | |
| 	pprof.SetGoroutineLabels(l.ctx)
 | |
| 	defer l.finished()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case event, ok := <-l.queue:
 | |
| 			if !ok {
 | |
| 				l.closeLogger()
 | |
| 				return
 | |
| 			}
 | |
| 			l.loggerProvider.LogEvent(event)
 | |
| 		case _, ok := <-l.flush:
 | |
| 			if !ok {
 | |
| 				l.closeLogger()
 | |
| 				return
 | |
| 			}
 | |
| 			l.loggerProvider.Flush()
 | |
| 		case <-l.close:
 | |
| 			l.closeLogger()
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LogEvent logs an event to this ChannelledLog
 | |
| func (l *ChannelledLog) LogEvent(event *Event) error {
 | |
| 	select {
 | |
| 	case l.queue <- event:
 | |
| 		return nil
 | |
| 	case <-time.After(60 * time.Second):
 | |
| 		// We're blocked!
 | |
| 		return ErrTimeout{
 | |
| 			Name:     l.name,
 | |
| 			Provider: l.provider,
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *ChannelledLog) closeLogger() {
 | |
| 	l.loggerProvider.Flush()
 | |
| 	l.loggerProvider.Close()
 | |
| 	l.closed <- true
 | |
| }
 | |
| 
 | |
| // Close this ChannelledLog
 | |
| func (l *ChannelledLog) Close() {
 | |
| 	l.close <- true
 | |
| 	<-l.closed
 | |
| }
 | |
| 
 | |
| // Flush this ChannelledLog
 | |
| func (l *ChannelledLog) Flush() {
 | |
| 	l.flush <- true
 | |
| }
 | |
| 
 | |
| // ReleaseReopen this ChannelledLog
 | |
| func (l *ChannelledLog) ReleaseReopen() error {
 | |
| 	return l.loggerProvider.ReleaseReopen()
 | |
| }
 | |
| 
 | |
| // GetLevel gets the level of this ChannelledLog
 | |
| func (l *ChannelledLog) GetLevel() Level {
 | |
| 	return l.loggerProvider.GetLevel()
 | |
| }
 | |
| 
 | |
| // GetStacktraceLevel gets the level of this ChannelledLog
 | |
| func (l *ChannelledLog) GetStacktraceLevel() Level {
 | |
| 	return l.loggerProvider.GetStacktraceLevel()
 | |
| }
 | |
| 
 | |
| // GetName returns the name of this ChannelledLog
 | |
| func (l *ChannelledLog) GetName() string {
 | |
| 	return l.name
 | |
| }
 | |
| 
 | |
| // MultiChannelledLog represents a cached channel to a LoggerProvider
 | |
| type MultiChannelledLog struct {
 | |
| 	ctx             context.Context
 | |
| 	finished        context.CancelFunc
 | |
| 	name            string
 | |
| 	bufferLength    int64
 | |
| 	queue           chan *Event
 | |
| 	rwmutex         sync.RWMutex
 | |
| 	loggers         map[string]EventLogger
 | |
| 	flush           chan bool
 | |
| 	close           chan bool
 | |
| 	started         bool
 | |
| 	level           Level
 | |
| 	stacktraceLevel Level
 | |
| 	closed          chan bool
 | |
| 	paused          chan bool
 | |
| }
 | |
| 
 | |
| // NewMultiChannelledLog a new logger instance with given logger provider and config.
 | |
| func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog {
 | |
| 	ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Logger: %s", name), process.SystemProcessType, false)
 | |
| 
 | |
| 	m := &MultiChannelledLog{
 | |
| 		ctx:             ctx,
 | |
| 		finished:        finished,
 | |
| 		name:            name,
 | |
| 		queue:           make(chan *Event, bufferLength),
 | |
| 		flush:           make(chan bool),
 | |
| 		bufferLength:    bufferLength,
 | |
| 		loggers:         make(map[string]EventLogger),
 | |
| 		level:           NONE,
 | |
| 		stacktraceLevel: NONE,
 | |
| 		close:           make(chan bool),
 | |
| 		closed:          make(chan bool),
 | |
| 		paused:          make(chan bool),
 | |
| 	}
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // AddLogger adds a logger to this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
 | |
| 	m.rwmutex.Lock()
 | |
| 	name := logger.GetName()
 | |
| 	if _, has := m.loggers[name]; has {
 | |
| 		m.rwmutex.Unlock()
 | |
| 		return ErrDuplicateName{name}
 | |
| 	}
 | |
| 	m.loggers[name] = logger
 | |
| 	if logger.GetLevel() < m.level {
 | |
| 		m.level = logger.GetLevel()
 | |
| 	}
 | |
| 	if logger.GetStacktraceLevel() < m.stacktraceLevel {
 | |
| 		m.stacktraceLevel = logger.GetStacktraceLevel()
 | |
| 	}
 | |
| 	m.rwmutex.Unlock()
 | |
| 	go m.Start()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // DelLogger removes a sub logger from this MultiChannelledLog
 | |
| // NB: If you delete the last sublogger this logger will simply drop
 | |
| // log events
 | |
| func (m *MultiChannelledLog) DelLogger(name string) bool {
 | |
| 	m.rwmutex.Lock()
 | |
| 	logger, has := m.loggers[name]
 | |
| 	if !has {
 | |
| 		m.rwmutex.Unlock()
 | |
| 		return false
 | |
| 	}
 | |
| 	delete(m.loggers, name)
 | |
| 	m.internalResetLevel()
 | |
| 	m.rwmutex.Unlock()
 | |
| 	logger.Flush()
 | |
| 	logger.Close()
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // GetEventLogger returns a sub logger from this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger {
 | |
| 	m.rwmutex.RLock()
 | |
| 	defer m.rwmutex.RUnlock()
 | |
| 	return m.loggers[name]
 | |
| }
 | |
| 
 | |
| // GetEventProvider returns a sub logger provider content from this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) GetLoggerProviderContent(name string) (string, error) {
 | |
| 	channelledLogger := m.GetEventLogger(name).(*ChannelledLog)
 | |
| 	return channelledLogger.loggerProvider.Content()
 | |
| }
 | |
| 
 | |
| // GetEventLoggerNames returns a list of names
 | |
| func (m *MultiChannelledLog) GetEventLoggerNames() []string {
 | |
| 	m.rwmutex.RLock()
 | |
| 	defer m.rwmutex.RUnlock()
 | |
| 	var keys []string
 | |
| 	for k := range m.loggers {
 | |
| 		keys = append(keys, k)
 | |
| 	}
 | |
| 	return keys
 | |
| }
 | |
| 
 | |
| func (m *MultiChannelledLog) closeLoggers() {
 | |
| 	m.rwmutex.Lock()
 | |
| 	for _, logger := range m.loggers {
 | |
| 		logger.Flush()
 | |
| 		logger.Close()
 | |
| 	}
 | |
| 	m.rwmutex.Unlock()
 | |
| 	m.closed <- true
 | |
| }
 | |
| 
 | |
| // Pause pauses this Logger
 | |
| func (m *MultiChannelledLog) Pause() {
 | |
| 	m.paused <- true
 | |
| }
 | |
| 
 | |
| // Resume resumes this Logger
 | |
| func (m *MultiChannelledLog) Resume() {
 | |
| 	m.paused <- false
 | |
| }
 | |
| 
 | |
| // ReleaseReopen causes this logger to tell its subloggers to release and reopen
 | |
| func (m *MultiChannelledLog) ReleaseReopen() error {
 | |
| 	m.rwmutex.Lock()
 | |
| 	defer m.rwmutex.Unlock()
 | |
| 	var accumulatedErr error
 | |
| 	for _, logger := range m.loggers {
 | |
| 		if err := logger.ReleaseReopen(); err != nil {
 | |
| 			if accumulatedErr == nil {
 | |
| 				accumulatedErr = fmt.Errorf("Error whilst reopening: %s Error: %v", logger.GetName(), err)
 | |
| 			} else {
 | |
| 				accumulatedErr = fmt.Errorf("Error whilst reopening: %s Error: %v & %v", logger.GetName(), err, accumulatedErr)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return accumulatedErr
 | |
| }
 | |
| 
 | |
| // Start processing the MultiChannelledLog
 | |
| func (m *MultiChannelledLog) Start() {
 | |
| 	m.rwmutex.Lock()
 | |
| 	if m.started {
 | |
| 		m.rwmutex.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	pprof.SetGoroutineLabels(m.ctx)
 | |
| 	defer m.finished()
 | |
| 
 | |
| 	m.started = true
 | |
| 	m.rwmutex.Unlock()
 | |
| 	paused := false
 | |
| 	for {
 | |
| 		if paused {
 | |
| 			select {
 | |
| 			case paused = <-m.paused:
 | |
| 				if !paused {
 | |
| 					m.ResetLevel()
 | |
| 				}
 | |
| 			case _, ok := <-m.flush:
 | |
| 				if !ok {
 | |
| 					m.closeLoggers()
 | |
| 					return
 | |
| 				}
 | |
| 				m.rwmutex.RLock()
 | |
| 				for _, logger := range m.loggers {
 | |
| 					logger.Flush()
 | |
| 				}
 | |
| 				m.rwmutex.RUnlock()
 | |
| 			case <-m.close:
 | |
| 				m.closeLoggers()
 | |
| 				return
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 		select {
 | |
| 		case paused = <-m.paused:
 | |
| 			if paused && m.level < INFO {
 | |
| 				m.level = INFO
 | |
| 			}
 | |
| 		case event, ok := <-m.queue:
 | |
| 			if !ok {
 | |
| 				m.closeLoggers()
 | |
| 				return
 | |
| 			}
 | |
| 			m.rwmutex.RLock()
 | |
| 			for _, logger := range m.loggers {
 | |
| 				err := logger.LogEvent(event)
 | |
| 				if err != nil {
 | |
| 					fmt.Println(err)
 | |
| 				}
 | |
| 			}
 | |
| 			m.rwmutex.RUnlock()
 | |
| 		case _, ok := <-m.flush:
 | |
| 			if !ok {
 | |
| 				m.closeLoggers()
 | |
| 				return
 | |
| 			}
 | |
| 			m.rwmutex.RLock()
 | |
| 			for _, logger := range m.loggers {
 | |
| 				logger.Flush()
 | |
| 			}
 | |
| 			m.rwmutex.RUnlock()
 | |
| 		case <-m.close:
 | |
| 			m.closeLoggers()
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LogEvent logs an event to this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) LogEvent(event *Event) error {
 | |
| 	select {
 | |
| 	case m.queue <- event:
 | |
| 		return nil
 | |
| 	case <-time.After(100 * time.Millisecond):
 | |
| 		// We're blocked!
 | |
| 		return ErrTimeout{
 | |
| 			Name:     m.name,
 | |
| 			Provider: "MultiChannelledLog",
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Close this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) Close() {
 | |
| 	m.close <- true
 | |
| 	<-m.closed
 | |
| }
 | |
| 
 | |
| // Flush this ChannelledLog
 | |
| func (m *MultiChannelledLog) Flush() {
 | |
| 	m.flush <- true
 | |
| }
 | |
| 
 | |
| // GetLevel gets the level of this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) GetLevel() Level {
 | |
| 	m.rwmutex.RLock()
 | |
| 	defer m.rwmutex.RUnlock()
 | |
| 	return m.level
 | |
| }
 | |
| 
 | |
| // GetStacktraceLevel gets the level of this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) GetStacktraceLevel() Level {
 | |
| 	m.rwmutex.RLock()
 | |
| 	defer m.rwmutex.RUnlock()
 | |
| 	return m.stacktraceLevel
 | |
| }
 | |
| 
 | |
| func (m *MultiChannelledLog) internalResetLevel() Level {
 | |
| 	m.level = NONE
 | |
| 	for _, logger := range m.loggers {
 | |
| 		level := logger.GetLevel()
 | |
| 		if level < m.level {
 | |
| 			m.level = level
 | |
| 		}
 | |
| 		level = logger.GetStacktraceLevel()
 | |
| 		if level < m.stacktraceLevel {
 | |
| 			m.stacktraceLevel = level
 | |
| 		}
 | |
| 	}
 | |
| 	return m.level
 | |
| }
 | |
| 
 | |
| // ResetLevel will reset the level of this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) ResetLevel() Level {
 | |
| 	m.rwmutex.Lock()
 | |
| 	defer m.rwmutex.Unlock()
 | |
| 	return m.internalResetLevel()
 | |
| }
 | |
| 
 | |
| // GetName gets the name of this MultiChannelledLog
 | |
| func (m *MultiChannelledLog) GetName() string {
 | |
| 	return m.name
 | |
| }
 |