 5016926d01
			
		
	
	
	
	
		
			
		
	
	5016926d01
		
			Some checks are pending
		
		
	
	/ release (push) Waiting to run
				
			testing / backend-checks (push) Waiting to run
				
			testing / frontend-checks (push) Waiting to run
				
			testing / test-unit (push) Blocked by required conditions
				
			testing / test-e2e (push) Blocked by required conditions
				
			testing / test-remote-cacher (redis) (push) Blocked by required conditions
				
			testing / test-remote-cacher (valkey) (push) Blocked by required conditions
				
			testing / test-remote-cacher (garnet) (push) Blocked by required conditions
				
			testing / test-remote-cacher (redict) (push) Blocked by required conditions
				
			testing / test-mysql (push) Blocked by required conditions
				
			testing / test-pgsql (push) Blocked by required conditions
				
			testing / test-sqlite (push) Blocked by required conditions
				
			testing / security-check (push) Blocked by required conditions
				
			
		
			
				
	
	
		
			224 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2022 The Gitea Authors. All rights reserved.
 | |
| // SPDX-License-Identifier: MIT
 | |
| 
 | |
| package actions
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"forgejo.org/models/dbfs"
 | |
| 	"forgejo.org/modules/log"
 | |
| 	"forgejo.org/modules/storage"
 | |
| 	"forgejo.org/modules/zstd"
 | |
| 
 | |
| 	runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
 | |
| 	"google.golang.org/protobuf/types/known/timestamppb"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	MaxLineSize = 64 * 1024
 | |
| 	DBFSPrefix  = "actions_log/"
 | |
| 
 | |
| 	timeFormat     = "2006-01-02T15:04:05.0000000Z07:00"
 | |
| 	defaultBufSize = MaxLineSize
 | |
| )
 | |
| 
 | |
| // WriteLogs appends logs to DBFS file for temporary storage.
 | |
| // It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content.
 | |
| // Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage.
 | |
| func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
 | |
| 	flag := os.O_WRONLY
 | |
| 	if offset == 0 {
 | |
| 		// Create file only if offset is 0, or it could result in content holes if the file doesn't exist.
 | |
| 		flag |= os.O_CREATE
 | |
| 	}
 | |
| 	name := DBFSPrefix + filename
 | |
| 	f, err := dbfs.OpenFile(ctx, name, flag)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
 | |
| 	}
 | |
| 	defer f.Close()
 | |
| 
 | |
| 	stat, err := f.Stat()
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("dbfs Stat %q: %w", name, err)
 | |
| 	}
 | |
| 	if stat.Size() < offset {
 | |
| 		// If the size is less than offset, refuse to write, or it could result in content holes.
 | |
| 		// However, if the size is greater than offset, we can still write to overwrite the content.
 | |
| 		return nil, fmt.Errorf("size of %q is less than offset", name)
 | |
| 	}
 | |
| 
 | |
| 	if _, err := f.Seek(offset, io.SeekStart); err != nil {
 | |
| 		return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
 | |
| 	}
 | |
| 
 | |
| 	writer := bufio.NewWriterSize(f, defaultBufSize)
 | |
| 
 | |
| 	ns := make([]int, 0, len(rows))
 | |
| 	for _, row := range rows {
 | |
| 		n, err := writer.WriteString(FormatLog(row.Time.AsTime(), row.Content) + "\n")
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		ns = append(ns, n)
 | |
| 	}
 | |
| 
 | |
| 	if err := writer.Flush(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return ns, nil
 | |
| }
 | |
| 
 | |
| func ReadLogs(ctx context.Context, inStorage bool, filename string, offset, limit int64) ([]*runnerv1.LogRow, error) {
 | |
| 	f, err := OpenLogs(ctx, inStorage, filename)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer f.Close()
 | |
| 
 | |
| 	if _, err := f.Seek(offset, io.SeekStart); err != nil {
 | |
| 		return nil, fmt.Errorf("file seek: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	scanner := bufio.NewScanner(f)
 | |
| 	maxLineSize := len(timeFormat) + MaxLineSize + 1
 | |
| 	scanner.Buffer(make([]byte, maxLineSize), maxLineSize)
 | |
| 
 | |
| 	var rows []*runnerv1.LogRow
 | |
| 	for scanner.Scan() && (int64(len(rows)) < limit || limit < 0) {
 | |
| 		t, c, err := ParseLog(scanner.Text())
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("parse log %q: %w", scanner.Text(), err)
 | |
| 		}
 | |
| 		rows = append(rows, &runnerv1.LogRow{
 | |
| 			Time:    timestamppb.New(t),
 | |
| 			Content: c,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	if err := scanner.Err(); err != nil {
 | |
| 		return nil, fmt.Errorf("ReadLogs scan: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return rows, nil
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	// logZstdBlockSize is the block size for zstd compression.
 | |
| 	// 128KB leads the compression ratio to be close to the regular zstd compression.
 | |
| 	// And it means each read from the underlying object storage will be at least 128KB*(compression ratio).
 | |
| 	// The compression ratio is about 30% for text files, so the actual read size is about 38KB, which should be acceptable.
 | |
| 	logZstdBlockSize = 128 * 1024 // 128KB
 | |
| )
 | |
| 
 | |
| // TransferLogs transfers logs from DBFS to object storage.
 | |
| // It happens when the file is complete and no more logs will be appended.
 | |
| // It respects the file format in the filename like ".zst", and compresses the content if needed.
 | |
| func TransferLogs(ctx context.Context, filename string) (func(), error) {
 | |
| 	name := DBFSPrefix + filename
 | |
| 	remove := func() {
 | |
| 		if err := dbfs.Remove(ctx, name); err != nil {
 | |
| 			log.Warn("dbfs remove %q: %v", name, err)
 | |
| 		}
 | |
| 	}
 | |
| 	f, err := dbfs.Open(ctx, name)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("dbfs open %q: %w", name, err)
 | |
| 	}
 | |
| 	defer f.Close()
 | |
| 
 | |
| 	var reader io.Reader = f
 | |
| 	if strings.HasSuffix(filename, ".zst") {
 | |
| 		r, w := io.Pipe()
 | |
| 		reader = r
 | |
| 		zstdWriter, err := zstd.NewSeekableWriter(w, logZstdBlockSize)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("zstd NewSeekableWriter: %w", err)
 | |
| 		}
 | |
| 		go func() {
 | |
| 			defer func() {
 | |
| 				_ = w.CloseWithError(zstdWriter.Close())
 | |
| 			}()
 | |
| 			if _, err := io.Copy(zstdWriter, f); err != nil {
 | |
| 				_ = w.CloseWithError(err)
 | |
| 				return
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	if _, err := storage.Actions.Save(filename, reader, -1); err != nil {
 | |
| 		return nil, fmt.Errorf("storage save %q: %w", filename, err)
 | |
| 	}
 | |
| 	return remove, nil
 | |
| }
 | |
| 
 | |
| func RemoveLogs(ctx context.Context, inStorage bool, filename string) error {
 | |
| 	if !inStorage {
 | |
| 		name := DBFSPrefix + filename
 | |
| 		err := dbfs.Remove(ctx, name)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("dbfs remove %q: %w", name, err)
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 	err := storage.Actions.Delete(filename)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("storage delete %q: %w", filename, err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeekCloser, error) {
 | |
| 	if !inStorage {
 | |
| 		name := DBFSPrefix + filename
 | |
| 		f, err := dbfs.Open(ctx, name)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("dbfs open %q: %w", name, err)
 | |
| 		}
 | |
| 		return f, nil
 | |
| 	}
 | |
| 
 | |
| 	f, err := storage.Actions.Open(filename)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("storage open %q: %w", filename, err)
 | |
| 	}
 | |
| 
 | |
| 	var reader io.ReadSeekCloser = f
 | |
| 	if strings.HasSuffix(filename, ".zst") {
 | |
| 		r, err := zstd.NewSeekableReader(f)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("zstd NewSeekableReader: %w", err)
 | |
| 		}
 | |
| 		reader = r
 | |
| 	}
 | |
| 
 | |
| 	return reader, nil
 | |
| }
 | |
| 
 | |
| func FormatLog(timestamp time.Time, content string) string {
 | |
| 	// Content shouldn't contain new line, it will break log indexes, other control chars are safe.
 | |
| 	content = strings.ReplaceAll(content, "\n", `\n`)
 | |
| 	if len(content) > MaxLineSize {
 | |
| 		content = content[:MaxLineSize]
 | |
| 	}
 | |
| 	return fmt.Sprintf("%s %s", timestamp.UTC().Format(timeFormat), content)
 | |
| }
 | |
| 
 | |
| func ParseLog(in string) (time.Time, string, error) {
 | |
| 	index := strings.IndexRune(in, ' ')
 | |
| 	if index < 0 {
 | |
| 		return time.Time{}, "", fmt.Errorf("invalid log: %q", in)
 | |
| 	}
 | |
| 	timestamp, err := time.Parse(timeFormat, in[:index])
 | |
| 	if err != nil {
 | |
| 		return time.Time{}, "", err
 | |
| 	}
 | |
| 	return timestamp, in[index+1:], nil
 | |
| }
 |