vm.go: Firecracker was launched with context.Background() instead of the incoming request ctx. A cancelled or timed-out VM creation request could not stop mid-flight Firecracker process spawning, leaving an orphaned process and leaked resources. Replace the four firecrackerCtx uses with ctx directly; the local variable is removed. guest_sessions.go / daemon.go: sessionControllers map was lazily initialized with a nil-check inside every mutating method. With d.mu held this isn't a data race, but the pattern is fragile — any new method that writes to the map without copying the guard can panic. Initialize the map once in Open() alongside the other daemon maps and channels, and remove the redundant nil-checks from setGuestSessionController and claimGuestSessionController.
1266 lines
44 KiB
Go
1266 lines
44 KiB
Go
package daemon
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"banger/internal/api"
|
|
"banger/internal/guest"
|
|
"banger/internal/model"
|
|
"banger/internal/sessionstream"
|
|
"banger/internal/system"
|
|
|
|
"golang.org/x/crypto/ssh"
|
|
)
|
|
|
|
const (
|
|
guestSessionBackendSSH = "ssh"
|
|
guestSessionAttachBackendNone = "none"
|
|
guestSessionAttachBackendSSHBridge = "ssh_rehydratable"
|
|
guestSessionAttachModeExclusive = "exclusive"
|
|
guestSessionTransportUnixSocket = "unix_socket"
|
|
guestSessionStateRoot = "/root/.local/state/banger/sessions"
|
|
guestSessionLogTailLine = 200
|
|
)
|
|
|
|
var guestSessionHostCommandOutputFunc = func(ctx context.Context, name string, args ...string) ([]byte, error) {
|
|
runner := system.NewRunner()
|
|
output, err := runner.Run(ctx, name, args...)
|
|
if err == nil {
|
|
return output, nil
|
|
}
|
|
command := strings.TrimSpace(strings.Join(append([]string{name}, args...), " "))
|
|
detail := strings.TrimSpace(string(output))
|
|
if detail == "" {
|
|
return output, fmt.Errorf("%s: %w", command, err)
|
|
}
|
|
return output, fmt.Errorf("%s: %w: %s", command, err, detail)
|
|
}
|
|
|
|
type guestSSHClient interface {
|
|
Close() error
|
|
RunScript(context.Context, string, io.Writer) error
|
|
RunScriptOutput(context.Context, string) ([]byte, error)
|
|
UploadFile(context.Context, string, os.FileMode, []byte, io.Writer) error
|
|
StreamTar(context.Context, string, string, io.Writer) error
|
|
StreamTarEntries(context.Context, string, []string, string, io.Writer) error
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSSH(ctx context.Context, address string, interval time.Duration) error {
|
|
if d != nil && d.guestWaitForSSH != nil {
|
|
return d.guestWaitForSSH(ctx, address, d.config.SSHKeyPath, interval)
|
|
}
|
|
return guest.WaitForSSH(ctx, address, d.config.SSHKeyPath, interval)
|
|
}
|
|
|
|
func (d *Daemon) dialGuest(ctx context.Context, address string) (guestSSHClient, error) {
|
|
if d != nil && d.guestDial != nil {
|
|
return d.guestDial(ctx, address, d.config.SSHKeyPath)
|
|
}
|
|
return guest.Dial(ctx, address, d.config.SSHKeyPath)
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSessionReadyHook(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) {
|
|
if d != nil && d.waitForGuestSessionReady != nil {
|
|
return d.waitForGuestSessionReady(ctx, vm, session)
|
|
}
|
|
return d.waitForGuestSessionReadyDefault(ctx, vm, session)
|
|
}
|
|
|
|
type guestSessionController struct {
|
|
stream *guest.StreamSession
|
|
streams []*guest.StreamSession
|
|
stdin io.WriteCloser
|
|
attachMu sync.Mutex
|
|
attach net.Conn
|
|
writeMu sync.Mutex
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
func (c *guestSessionController) setAttach(conn net.Conn) error {
|
|
c.attachMu.Lock()
|
|
defer c.attachMu.Unlock()
|
|
if c.attach != nil {
|
|
return errors.New("session already has an active attach")
|
|
}
|
|
c.attach = conn
|
|
return nil
|
|
}
|
|
|
|
func (c *guestSessionController) clearAttach(conn net.Conn) {
|
|
c.attachMu.Lock()
|
|
defer c.attachMu.Unlock()
|
|
if c.attach == conn {
|
|
c.attach = nil
|
|
}
|
|
}
|
|
|
|
func (c *guestSessionController) writeFrame(channel byte, payload []byte) {
|
|
c.attachMu.Lock()
|
|
conn := c.attach
|
|
c.attachMu.Unlock()
|
|
if conn == nil {
|
|
return
|
|
}
|
|
c.writeMu.Lock()
|
|
err := sessionstream.WriteFrame(conn, channel, payload)
|
|
c.writeMu.Unlock()
|
|
if err != nil {
|
|
_ = conn.Close()
|
|
c.clearAttach(conn)
|
|
}
|
|
}
|
|
|
|
func (c *guestSessionController) writeControl(message sessionstream.ControlMessage) {
|
|
c.attachMu.Lock()
|
|
conn := c.attach
|
|
c.attachMu.Unlock()
|
|
if conn == nil {
|
|
return
|
|
}
|
|
c.writeMu.Lock()
|
|
err := sessionstream.WriteControl(conn, message)
|
|
c.writeMu.Unlock()
|
|
if err != nil {
|
|
_ = conn.Close()
|
|
c.clearAttach(conn)
|
|
}
|
|
}
|
|
|
|
func (c *guestSessionController) close() error {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
var err error
|
|
c.closeOnce.Do(func() {
|
|
c.attachMu.Lock()
|
|
conn := c.attach
|
|
c.attach = nil
|
|
c.attachMu.Unlock()
|
|
if conn != nil {
|
|
err = errors.Join(err, conn.Close())
|
|
}
|
|
if c.stdin != nil {
|
|
err = errors.Join(err, c.stdin.Close())
|
|
}
|
|
if c.stream != nil {
|
|
err = errors.Join(err, c.stream.Close())
|
|
}
|
|
for _, stream := range c.streams {
|
|
if stream != nil {
|
|
err = errors.Join(err, stream.Close())
|
|
}
|
|
}
|
|
})
|
|
return err
|
|
}
|
|
|
|
type guestSessionStateSnapshot struct {
|
|
Status string
|
|
GuestPID int
|
|
ExitCode *int
|
|
Alive bool
|
|
LastError string
|
|
}
|
|
|
|
func (d *Daemon) StartGuestSession(ctx context.Context, params api.GuestSessionStartParams) (model.GuestSession, error) {
|
|
stdinMode := model.GuestSessionStdinMode(strings.TrimSpace(params.StdinMode))
|
|
if stdinMode == "" {
|
|
stdinMode = model.GuestSessionStdinClosed
|
|
}
|
|
if stdinMode != model.GuestSessionStdinClosed && stdinMode != model.GuestSessionStdinPipe {
|
|
return model.GuestSession{}, fmt.Errorf("unsupported stdin mode %q", params.StdinMode)
|
|
}
|
|
if strings.TrimSpace(params.Command) == "" {
|
|
return model.GuestSession{}, errors.New("session command is required")
|
|
}
|
|
var created model.GuestSession
|
|
_, err := d.withVMLockByRef(ctx, params.VMIDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name)
|
|
}
|
|
session, err := d.startGuestSessionLocked(ctx, vm, params, stdinMode)
|
|
if err != nil {
|
|
return model.VMRecord{}, err
|
|
}
|
|
created = session
|
|
return vm, nil
|
|
})
|
|
return created, err
|
|
}
|
|
|
|
func (d *Daemon) startGuestSessionLocked(ctx context.Context, vm model.VMRecord, params api.GuestSessionStartParams, stdinMode model.GuestSessionStdinMode) (model.GuestSession, error) {
|
|
id, err := model.NewID()
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
now := model.Now()
|
|
session := model.GuestSession{
|
|
ID: id,
|
|
VMID: vm.ID,
|
|
Name: defaultGuestSessionName(id, params.Command, params.Name),
|
|
Backend: guestSessionBackendSSH,
|
|
Command: params.Command,
|
|
Args: append([]string(nil), params.Args...),
|
|
CWD: strings.TrimSpace(params.CWD),
|
|
Env: cloneStringMap(params.Env),
|
|
StdinMode: stdinMode,
|
|
Status: model.GuestSessionStatusStarting,
|
|
GuestStateDir: guestSessionStateDir(id),
|
|
StdoutLogPath: guestSessionStdoutLogPath(id),
|
|
StderrLogPath: guestSessionStderrLogPath(id),
|
|
Tags: cloneStringMap(params.Tags),
|
|
Attachable: stdinMode == model.GuestSessionStdinPipe,
|
|
Reattachable: stdinMode == model.GuestSessionStdinPipe,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
if session.Attachable {
|
|
session.AttachBackend = guestSessionAttachBackendSSHBridge
|
|
session.AttachMode = guestSessionAttachModeExclusive
|
|
} else {
|
|
session.AttachBackend = guestSessionAttachBackendNone
|
|
}
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
fail := func(stage, message, rawLog string) (model.GuestSession, error) {
|
|
session = failGuestSessionLaunch(session, stage, message, rawLog)
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
address := net.JoinHostPort(vm.Runtime.GuestIP, "22")
|
|
if err := d.waitForGuestSSH(ctx, address, 250*time.Millisecond); err != nil {
|
|
return fail("ssh_unavailable", fmt.Sprintf("guest ssh unavailable: %v", err), "")
|
|
}
|
|
client, err := d.dialGuest(ctx, address)
|
|
if err != nil {
|
|
return fail("dial_guest", fmt.Sprintf("dial guest ssh: %v", err), "")
|
|
}
|
|
defer client.Close()
|
|
var preflightLog bytes.Buffer
|
|
if err := client.RunScript(ctx, guestSessionCWDPreflightScript(session.CWD), &preflightLog); err != nil {
|
|
return fail("preflight_cwd", fmt.Sprintf("guest working directory is unavailable: %s", defaultGuestSessionCWD(session.CWD)), preflightLog.String())
|
|
}
|
|
preflightLog.Reset()
|
|
requiredCommands := normalizeGuestSessionRequiredCommands(params.Command, params.RequiredCommands)
|
|
if err := client.RunScript(ctx, guestSessionCommandPreflightScript(requiredCommands), &preflightLog); err != nil {
|
|
return fail("preflight_command", fmt.Sprintf("required guest command is unavailable: %s", strings.TrimSpace(preflightLog.String())), preflightLog.String())
|
|
}
|
|
var uploadLog bytes.Buffer
|
|
if err := client.UploadFile(ctx, guestSessionScriptPath(id), 0o755, []byte(guestSessionScript(session)), &uploadLog); err != nil {
|
|
return fail("upload_script", "upload guest session script failed", uploadLog.String())
|
|
}
|
|
var launchLog bytes.Buffer
|
|
launchScript := fmt.Sprintf("set -euo pipefail\nnohup bash %s >/dev/null 2>&1 </dev/null &\ndisown || true\n", guestShellQuote(guestSessionScriptPath(id)))
|
|
if err := client.RunScript(ctx, launchScript, &launchLog); err != nil {
|
|
return fail("launch", "launch guest session failed", launchLog.String())
|
|
}
|
|
readyCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
updated, err := d.waitForGuestSessionReadyHook(readyCtx, vm, session)
|
|
if err != nil {
|
|
return fail("ready_wait", "guest session did not report ready state", err.Error())
|
|
}
|
|
session = updated
|
|
if session.Status == model.GuestSessionStatusStarting {
|
|
session.Status = model.GuestSessionStatusRunning
|
|
session.StartedAt = model.Now()
|
|
session.UpdatedAt = model.Now()
|
|
}
|
|
session.LaunchStage = ""
|
|
session.LaunchMessage = ""
|
|
session.LaunchRawLog = ""
|
|
session.LastError = ""
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
func (d *Daemon) GetGuestSession(ctx context.Context, params api.GuestSessionRefParams) (model.GuestSession, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return d.refreshGuestSession(ctx, vm, session)
|
|
}
|
|
|
|
func (d *Daemon) ListGuestSessions(ctx context.Context, params api.VMRefParams) ([]model.GuestSession, error) {
|
|
vm, err := d.FindVM(ctx, params.IDOrName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sessions, err := d.store.ListGuestSessionsByVM(ctx, vm.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for index := range sessions {
|
|
refreshed, refreshErr := d.refreshGuestSession(ctx, vm, sessions[index])
|
|
if refreshErr == nil {
|
|
sessions[index] = refreshed
|
|
}
|
|
}
|
|
return sessions, nil
|
|
}
|
|
|
|
func (d *Daemon) StopGuestSession(ctx context.Context, params api.GuestSessionRefParams) (model.GuestSession, error) {
|
|
return d.signalGuestSession(ctx, params, "TERM")
|
|
}
|
|
|
|
func (d *Daemon) KillGuestSession(ctx context.Context, params api.GuestSessionRefParams) (model.GuestSession, error) {
|
|
return d.signalGuestSession(ctx, params, "KILL")
|
|
}
|
|
|
|
func (d *Daemon) signalGuestSession(ctx context.Context, params api.GuestSessionRefParams, signal string) (model.GuestSession, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
session, _ = d.refreshGuestSession(ctx, vm, session)
|
|
if session.Status == model.GuestSessionStatusExited || session.Status == model.GuestSessionStatusFailed {
|
|
return session, nil
|
|
}
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
session.Status = model.GuestSessionStatusFailed
|
|
session.LastError = "vm is not running"
|
|
now := model.Now()
|
|
session.UpdatedAt = now
|
|
session.EndedAt = now
|
|
session.Attachable = false
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
defer client.Close()
|
|
var log bytes.Buffer
|
|
if err := client.RunScript(ctx, guestSessionSignalScript(session.ID, signal), &log); err != nil {
|
|
return model.GuestSession{}, formatGuestSessionStepError("signal guest session", err, log.String())
|
|
}
|
|
session.Status = model.GuestSessionStatusStopping
|
|
session.UpdatedAt = model.Now()
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
func (d *Daemon) GuestSessionLogs(ctx context.Context, params api.GuestSessionLogsParams) (api.GuestSessionLogsResult, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionLogsResult{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionLogsResult{}, err
|
|
}
|
|
streamName := strings.TrimSpace(params.Stream)
|
|
if streamName == "" {
|
|
streamName = "stdout"
|
|
}
|
|
tailLines := params.TailLines
|
|
if tailLines <= 0 {
|
|
tailLines = guestSessionLogTailLine
|
|
}
|
|
path := session.StdoutLogPath
|
|
if streamName == "stderr" {
|
|
path = session.StderrLogPath
|
|
}
|
|
content, err := d.readGuestSessionLog(ctx, vm, session, streamName, tailLines)
|
|
if err != nil {
|
|
return api.GuestSessionLogsResult{}, err
|
|
}
|
|
return api.GuestSessionLogsResult{Session: session, Stream: streamName, Path: path, Content: content}, nil
|
|
}
|
|
|
|
func (d *Daemon) SendToGuestSession(ctx context.Context, params api.GuestSessionSendParams) (api.GuestSessionSendResult, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionSendResult{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionSendResult{}, err
|
|
}
|
|
if session.StdinMode != model.GuestSessionStdinPipe {
|
|
return api.GuestSessionSendResult{}, errors.New("session does not have a stdin pipe")
|
|
}
|
|
if session.Status != model.GuestSessionStatusRunning {
|
|
return api.GuestSessionSendResult{}, fmt.Errorf("session is not running (status=%s)", session.Status)
|
|
}
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
return api.GuestSessionSendResult{}, fmt.Errorf("vm %q is not running", vm.Name)
|
|
}
|
|
if len(params.Payload) == 0 {
|
|
return api.GuestSessionSendResult{Session: session}, nil
|
|
}
|
|
client, err := d.dialGuest(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"))
|
|
if err != nil {
|
|
return api.GuestSessionSendResult{}, fmt.Errorf("dial guest: %w", err)
|
|
}
|
|
defer client.Close()
|
|
tmpPath := fmt.Sprintf("/tmp/banger-send-%s.bin", session.ID[:8])
|
|
var uploadLog bytes.Buffer
|
|
if err := client.UploadFile(ctx, tmpPath, 0o600, params.Payload, &uploadLog); err != nil {
|
|
return api.GuestSessionSendResult{}, fmt.Errorf("upload payload: %w", err)
|
|
}
|
|
sendScript := fmt.Sprintf(
|
|
"set -euo pipefail\ncat %s >> %s\nrm -f %s\n",
|
|
guestShellQuote(tmpPath),
|
|
guestShellQuote(guestSessionStdinPipePath(session.ID)),
|
|
guestShellQuote(tmpPath),
|
|
)
|
|
var sendLog bytes.Buffer
|
|
if err := client.RunScript(ctx, sendScript, &sendLog); err != nil {
|
|
return api.GuestSessionSendResult{}, fmt.Errorf("send to session: %w: %s", err, strings.TrimSpace(sendLog.String()))
|
|
}
|
|
return api.GuestSessionSendResult{Session: session, BytesWritten: len(params.Payload)}, nil
|
|
}
|
|
|
|
func (d *Daemon) BeginGuestSessionAttach(ctx context.Context, params api.GuestSessionAttachBeginParams) (api.GuestSessionAttachBeginResult, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
session, _ = d.refreshGuestSession(ctx, vm, session)
|
|
if !session.Attachable {
|
|
return api.GuestSessionAttachBeginResult{}, errors.New("session is not attachable")
|
|
}
|
|
controller := &guestSessionController{}
|
|
if !d.claimGuestSessionController(session.ID, controller) {
|
|
return api.GuestSessionAttachBeginResult{}, errors.New("session already has an active attach")
|
|
}
|
|
attachID, err := model.NewID()
|
|
if err != nil {
|
|
d.clearGuestSessionController(session.ID)
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
socketPath := filepath.Join(d.layout.RuntimeDir, "guest-session-attach-"+attachID[:12]+".sock")
|
|
_ = os.Remove(socketPath)
|
|
listener, err := net.Listen("unix", socketPath)
|
|
if err != nil {
|
|
d.clearGuestSessionController(session.ID)
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
if err := os.Chmod(socketPath, 0o600); err != nil {
|
|
_ = listener.Close()
|
|
_ = os.Remove(socketPath)
|
|
d.clearGuestSessionController(session.ID)
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
go d.serveGuestSessionAttach(session, controller, attachID, socketPath, listener)
|
|
return api.GuestSessionAttachBeginResult{
|
|
Session: session,
|
|
AttachID: attachID,
|
|
TransportKind: guestSessionTransportUnixSocket,
|
|
TransportTarget: socketPath,
|
|
SocketPath: socketPath,
|
|
StreamFormat: sessionstream.FormatV1,
|
|
}, nil
|
|
}
|
|
|
|
func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
d.sessionControllers[id] = controller
|
|
}
|
|
|
|
func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if d.sessionControllers[id] != nil {
|
|
return false
|
|
}
|
|
d.sessionControllers[id] = controller
|
|
return true
|
|
}
|
|
|
|
func (d *Daemon) getGuestSessionController(id string) *guestSessionController {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
return d.sessionControllers[id]
|
|
}
|
|
|
|
func (d *Daemon) clearGuestSessionController(id string) *guestSessionController {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
controller := d.sessionControllers[id]
|
|
delete(d.sessionControllers, id)
|
|
return controller
|
|
}
|
|
|
|
func (d *Daemon) closeGuestSessionControllers() error {
|
|
d.mu.Lock()
|
|
controllers := make([]*guestSessionController, 0, len(d.sessionControllers))
|
|
for _, controller := range d.sessionControllers {
|
|
controllers = append(controllers, controller)
|
|
}
|
|
d.sessionControllers = nil
|
|
d.mu.Unlock()
|
|
var err error
|
|
for _, controller := range controllers {
|
|
err = errors.Join(err, controller.close())
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (d *Daemon) forwardGuestSessionOutput(_ string, controller *guestSessionController, channel byte, reader io.Reader) {
|
|
buffer := make([]byte, 32*1024)
|
|
for {
|
|
n, err := reader.Read(buffer)
|
|
if n > 0 {
|
|
controller.writeFrame(channel, buffer[:n])
|
|
}
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) {
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSessionExit(id string, controller *guestSessionController, session model.GuestSession) {
|
|
err := controller.stream.Wait()
|
|
updated := session
|
|
updated.Attachable = false
|
|
now := model.Now()
|
|
updated.UpdatedAt = now
|
|
updated.EndedAt = now
|
|
if exitCode, ok := guestSessionExitCode(err); ok {
|
|
updated.ExitCode = &exitCode
|
|
if exitCode == 0 {
|
|
updated.Status = model.GuestSessionStatusExited
|
|
} else {
|
|
updated.Status = model.GuestSessionStatusFailed
|
|
}
|
|
}
|
|
if err != nil && updated.LastError == "" {
|
|
updated.LastError = err.Error()
|
|
}
|
|
if vm, getErr := d.store.GetVMByID(context.Background(), updated.VMID); getErr == nil {
|
|
if refreshed, refreshErr := d.refreshGuestSession(context.Background(), vm, updated); refreshErr == nil {
|
|
updated = refreshed
|
|
}
|
|
}
|
|
_ = d.store.UpsertGuestSession(context.Background(), updated)
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: updated.ExitCode})
|
|
_ = controller.close()
|
|
d.clearGuestSessionController(id)
|
|
}
|
|
|
|
func (d *Daemon) serveGuestSessionAttach(session model.GuestSession, controller *guestSessionController, _ string, socketPath string, listener net.Listener) {
|
|
defer func() {
|
|
_ = listener.Close()
|
|
_ = os.Remove(socketPath)
|
|
_ = controller.close()
|
|
d.clearGuestSessionController(session.ID)
|
|
}()
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
if err := controller.setAttach(conn); err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
defer controller.clearAttach(conn)
|
|
if err := d.attachGuestSessionBridge(session, controller); err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
for {
|
|
channel, payload, err := sessionstream.ReadFrame(conn)
|
|
if err != nil {
|
|
return
|
|
}
|
|
switch channel {
|
|
case sessionstream.ChannelStdin:
|
|
if controller.stdin == nil {
|
|
continue
|
|
}
|
|
if _, err := controller.stdin.Write(payload); err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
case sessionstream.ChannelControl:
|
|
message, err := sessionstream.ReadControl(payload)
|
|
if err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
if message.Type == "eof" && controller.stdin != nil {
|
|
_ = controller.stdin.Close()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) attachGuestSessionBridge(session model.GuestSession, controller *guestSessionController) error {
|
|
vm, err := d.store.GetVMByID(context.Background(), session.VMID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
return fmt.Errorf("vm %q is not running", vm.Name)
|
|
}
|
|
address := net.JoinHostPort(vm.Runtime.GuestIP, "22")
|
|
stdinStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachInputCommand(session.ID))
|
|
if err != nil {
|
|
return fmt.Errorf("open guest session stdin stream: %w", err)
|
|
}
|
|
stdoutStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StdoutLogPath))
|
|
if err != nil {
|
|
_ = stdinStream.Close()
|
|
return fmt.Errorf("open guest session stdout stream: %w", err)
|
|
}
|
|
stderrStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StderrLogPath))
|
|
if err != nil {
|
|
_ = stdinStream.Close()
|
|
_ = stdoutStream.Close()
|
|
return fmt.Errorf("open guest session stderr stream: %w", err)
|
|
}
|
|
controller.streams = append(controller.streams, stdinStream, stdoutStream, stderrStream)
|
|
controller.stdin = stdinStream.Stdin()
|
|
go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStdout, stdoutStream.Stdout())
|
|
go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStderr, stderrStream.Stdout())
|
|
go d.watchGuestSessionAttach(session.ID, controller, session)
|
|
return nil
|
|
}
|
|
|
|
func (d *Daemon) openGuestSessionAttachStream(address, command string) (*guest.StreamSession, error) {
|
|
client, err := guest.Dial(context.Background(), address, d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
stream, err := client.StartCommand(context.Background(), command)
|
|
if err != nil {
|
|
_ = client.Close()
|
|
return nil, err
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
func (d *Daemon) watchGuestSessionAttach(id string, controller *guestSessionController, session model.GuestSession) {
|
|
ticker := time.NewTicker(250 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
vm, err := d.store.GetVMByID(context.Background(), session.VMID)
|
|
if err != nil {
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
_ = controller.close()
|
|
return
|
|
}
|
|
refreshed, err := d.refreshGuestSession(context.Background(), vm, session)
|
|
if err == nil {
|
|
session = refreshed
|
|
}
|
|
if session.Status == model.GuestSessionStatusExited || session.Status == model.GuestSessionStatusFailed {
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: session.ExitCode})
|
|
_ = controller.close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSessionReadyDefault(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) {
|
|
for {
|
|
updated, err := d.refreshGuestSession(ctx, vm, session)
|
|
if err == nil {
|
|
session = updated
|
|
if session.GuestPID != 0 || session.ExitCode != nil || session.Status == model.GuestSessionStatusRunning || session.Status == model.GuestSessionStatusFailed || session.Status == model.GuestSessionStatusExited {
|
|
return session, nil
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return session, ctx.Err()
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) refreshGuestSession(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) {
|
|
if session.Status != model.GuestSessionStatusStarting && session.Status != model.GuestSessionStatusRunning && session.Status != model.GuestSessionStatusStopping {
|
|
return session, nil
|
|
}
|
|
snapshot, err := d.inspectGuestSessionState(ctx, vm, session)
|
|
if err != nil {
|
|
return session, err
|
|
}
|
|
original := session
|
|
applyGuestSessionSnapshot(&session, snapshot, vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath))
|
|
if guestSessionStateChanged(original, session) {
|
|
session.UpdatedAt = model.Now()
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return session, err
|
|
}
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
func applyGuestSessionSnapshot(session *model.GuestSession, snapshot guestSessionStateSnapshot, vmRunning bool) {
|
|
if session == nil {
|
|
return
|
|
}
|
|
if snapshot.GuestPID != 0 {
|
|
session.GuestPID = snapshot.GuestPID
|
|
}
|
|
if snapshot.LastError != "" {
|
|
session.LastError = snapshot.LastError
|
|
}
|
|
if snapshot.ExitCode != nil {
|
|
session.ExitCode = snapshot.ExitCode
|
|
session.Attachable = false
|
|
session.Reattachable = false
|
|
if session.StartedAt.IsZero() {
|
|
session.StartedAt = model.Now()
|
|
}
|
|
if session.EndedAt.IsZero() {
|
|
session.EndedAt = model.Now()
|
|
}
|
|
if *snapshot.ExitCode == 0 {
|
|
session.Status = model.GuestSessionStatusExited
|
|
} else {
|
|
session.Status = model.GuestSessionStatusFailed
|
|
}
|
|
return
|
|
}
|
|
if snapshot.Alive {
|
|
if session.StartedAt.IsZero() {
|
|
session.StartedAt = model.Now()
|
|
}
|
|
session.Status = model.GuestSessionStatusRunning
|
|
return
|
|
}
|
|
if !vmRunning && (session.Status == model.GuestSessionStatusStarting || session.Status == model.GuestSessionStatusRunning || session.Status == model.GuestSessionStatusStopping) {
|
|
session.Status = model.GuestSessionStatusFailed
|
|
session.Attachable = false
|
|
session.Reattachable = false
|
|
if session.LastError == "" {
|
|
session.LastError = "vm is not running"
|
|
}
|
|
if session.EndedAt.IsZero() {
|
|
session.EndedAt = model.Now()
|
|
}
|
|
return
|
|
}
|
|
if snapshot.Status == string(model.GuestSessionStatusRunning) {
|
|
if session.StartedAt.IsZero() {
|
|
session.StartedAt = model.Now()
|
|
}
|
|
session.Status = model.GuestSessionStatusRunning
|
|
}
|
|
if session.Status == model.GuestSessionStatusRunning && session.StdinMode == model.GuestSessionStdinPipe {
|
|
session.Attachable = true
|
|
session.Reattachable = true
|
|
if session.AttachBackend == "" {
|
|
session.AttachBackend = guestSessionAttachBackendSSHBridge
|
|
}
|
|
if session.AttachMode == "" {
|
|
session.AttachMode = guestSessionAttachModeExclusive
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) inspectGuestSessionState(ctx context.Context, vm model.VMRecord, session model.GuestSession) (guestSessionStateSnapshot, error) {
|
|
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return guestSessionStateSnapshot{}, err
|
|
}
|
|
defer client.Close()
|
|
var output bytes.Buffer
|
|
if err := client.RunScript(ctx, guestSessionInspectScript(session.ID), &output); err != nil {
|
|
return guestSessionStateSnapshot{}, formatGuestSessionStepError("inspect guest session state", err, output.String())
|
|
}
|
|
return parseGuestSessionState(output.String())
|
|
}
|
|
return d.inspectGuestSessionStateFromWorkDisk(ctx, vm, session.ID)
|
|
}
|
|
|
|
func (d *Daemon) inspectGuestSessionStateFromWorkDisk(ctx context.Context, vm model.VMRecord, sessionID string) (guestSessionStateSnapshot, error) {
|
|
runner := d.runner
|
|
if runner == nil {
|
|
runner = system.NewRunner()
|
|
}
|
|
workMount, cleanup, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false)
|
|
if err != nil {
|
|
return guestSessionStateSnapshot{}, err
|
|
}
|
|
defer cleanup()
|
|
stateDir := filepath.Join(workMount, guestSessionRelativeStateDir(sessionID))
|
|
return inspectGuestSessionStateFromDir(stateDir)
|
|
}
|
|
|
|
func inspectGuestSessionStateFromDir(stateDir string) (guestSessionStateSnapshot, error) {
|
|
var snapshot guestSessionStateSnapshot
|
|
statusData, _ := os.ReadFile(filepath.Join(stateDir, "status"))
|
|
snapshot.Status = strings.TrimSpace(string(statusData))
|
|
pidData, _ := os.ReadFile(filepath.Join(stateDir, "pid"))
|
|
if pidValue, err := strconv.Atoi(strings.TrimSpace(string(pidData))); err == nil {
|
|
snapshot.GuestPID = pidValue
|
|
}
|
|
exitData, _ := os.ReadFile(filepath.Join(stateDir, "exit_code"))
|
|
if exitValue, err := strconv.Atoi(strings.TrimSpace(string(exitData))); err == nil {
|
|
snapshot.ExitCode = &exitValue
|
|
}
|
|
errorData, _ := os.ReadFile(filepath.Join(stateDir, "error"))
|
|
snapshot.LastError = strings.TrimSpace(string(errorData))
|
|
if snapshot.GuestPID != 0 {
|
|
snapshot.Alive = processAlive(snapshot.GuestPID)
|
|
}
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (d *Daemon) readGuestSessionLog(ctx context.Context, vm model.VMRecord, session model.GuestSession, stream string, tailLines int) (string, error) {
|
|
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer client.Close()
|
|
path := session.StdoutLogPath
|
|
if stream == "stderr" {
|
|
path = session.StderrLogPath
|
|
}
|
|
var output bytes.Buffer
|
|
script := fmt.Sprintf("set -euo pipefail\nif [ -f %s ]; then tail -n %d %s; fi\n", guestShellQuote(path), tailLines, guestShellQuote(path))
|
|
if err := client.RunScript(ctx, script, &output); err != nil {
|
|
return "", formatGuestSessionStepError("read guest session log", err, output.String())
|
|
}
|
|
return output.String(), nil
|
|
}
|
|
runner := d.runner
|
|
if runner == nil {
|
|
runner = system.NewRunner()
|
|
}
|
|
workMount, cleanup, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer cleanup()
|
|
logPath := filepath.Join(workMount, guestSessionRelativeStateDir(session.ID), stream+".log")
|
|
return tailFileContent(logPath, tailLines)
|
|
}
|
|
|
|
func (d *Daemon) findGuestSession(ctx context.Context, vmID, idOrName string) (model.GuestSession, error) {
|
|
if strings.TrimSpace(idOrName) == "" {
|
|
return model.GuestSession{}, errors.New("session id or name is required")
|
|
}
|
|
if session, err := d.store.GetGuestSession(ctx, vmID, idOrName); err == nil {
|
|
return session, nil
|
|
}
|
|
sessions, err := d.store.ListGuestSessionsByVM(ctx, vmID)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
matches := make([]model.GuestSession, 0, 1)
|
|
for _, session := range sessions {
|
|
if strings.HasPrefix(session.ID, idOrName) || strings.HasPrefix(session.Name, idOrName) {
|
|
matches = append(matches, session)
|
|
}
|
|
}
|
|
switch len(matches) {
|
|
case 0:
|
|
return model.GuestSession{}, fmt.Errorf("session %q not found", idOrName)
|
|
case 1:
|
|
return matches[0], nil
|
|
default:
|
|
return model.GuestSession{}, fmt.Errorf("multiple sessions match %q", idOrName)
|
|
}
|
|
}
|
|
|
|
func guestSessionScript(session model.GuestSession) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "STATE_DIR=%s\n", guestShellQuote(session.GuestStateDir))
|
|
fmt.Fprintf(&script, "STDOUT_LOG=%s\n", guestShellQuote(session.StdoutLogPath))
|
|
fmt.Fprintf(&script, "STDERR_LOG=%s\n", guestShellQuote(session.StderrLogPath))
|
|
fmt.Fprintf(&script, "PID_FILE=%s\n", guestShellQuote(guestSessionPIDPath(session.ID)))
|
|
fmt.Fprintf(&script, "MONITOR_PID_FILE=%s\n", guestShellQuote(guestSessionMonitorPIDPath(session.ID)))
|
|
fmt.Fprintf(&script, "EXIT_FILE=%s\n", guestShellQuote(guestSessionExitCodePath(session.ID)))
|
|
fmt.Fprintf(&script, "STATUS_FILE=%s\n", guestShellQuote(guestSessionStatusPath(session.ID)))
|
|
fmt.Fprintf(&script, "ERROR_FILE=%s\n", guestShellQuote(guestSessionErrorPath(session.ID)))
|
|
fmt.Fprintf(&script, "STDIN_PIPE=%s\n", guestShellQuote(guestSessionStdinPipePath(session.ID)))
|
|
fmt.Fprintf(&script, "STDIN_KEEPALIVE_PID_FILE=%s\n", guestShellQuote(guestSessionStdinKeepalivePIDPath(session.ID)))
|
|
fmt.Fprintf(&script, "SESSION_CWD=%s\n", guestShellQuote(defaultGuestSessionCWD(session.CWD)))
|
|
script.WriteString("mkdir -p \"$STATE_DIR\"\n")
|
|
script.WriteString(": >\"$STDOUT_LOG\"\n")
|
|
script.WriteString(": >\"$STDERR_LOG\"\n")
|
|
script.WriteString("rm -f \"$EXIT_FILE\" \"$ERROR_FILE\" \"$STDIN_KEEPALIVE_PID_FILE\"\n")
|
|
if session.StdinMode == model.GuestSessionStdinPipe {
|
|
script.WriteString("rm -f \"$STDIN_PIPE\"\n")
|
|
script.WriteString("mkfifo -m 600 \"$STDIN_PIPE\"\n")
|
|
}
|
|
script.WriteString("printf '%s\\n' \"${BASHPID:-$$}\" >\"$MONITOR_PID_FILE\"\n")
|
|
script.WriteString("printf 'starting\\n' >\"$STATUS_FILE\"\n")
|
|
script.WriteString("cd \"$SESSION_CWD\"\n")
|
|
script.WriteString("exec > >(tee -a \"$STDOUT_LOG\") 2> >(tee -a \"$STDERR_LOG\" >&2)\n")
|
|
for _, line := range guestSessionEnvLines(session.Env) {
|
|
script.WriteString(line)
|
|
script.WriteByte('\n')
|
|
}
|
|
script.WriteString("COMMAND=(")
|
|
for _, value := range append([]string{session.Command}, session.Args...) {
|
|
script.WriteByte(' ')
|
|
script.WriteString(guestShellQuote(value))
|
|
}
|
|
script.WriteString(" )\n")
|
|
if session.StdinMode == model.GuestSessionStdinPipe {
|
|
script.WriteString("( while :; do sleep 3600; done ) >\"$STDIN_PIPE\" &\n")
|
|
script.WriteString("keepalive=$!\n")
|
|
script.WriteString("printf '%s\\n' \"$keepalive\" >\"$STDIN_KEEPALIVE_PID_FILE\"\n")
|
|
script.WriteString("\"${COMMAND[@]}\" <\"$STDIN_PIPE\" &\n")
|
|
} else {
|
|
script.WriteString("\"${COMMAND[@]}\" &\n")
|
|
}
|
|
script.WriteString("child=$!\n")
|
|
script.WriteString("printf '%s\\n' \"$child\" >\"$PID_FILE\"\n")
|
|
script.WriteString("printf 'running\\n' >\"$STATUS_FILE\"\n")
|
|
script.WriteString("wait \"$child\"\n")
|
|
script.WriteString("rc=$?\n")
|
|
if session.StdinMode == model.GuestSessionStdinPipe {
|
|
script.WriteString("if [ -f \"$STDIN_KEEPALIVE_PID_FILE\" ]; then kill \"$(cat \"$STDIN_KEEPALIVE_PID_FILE\")\" 2>/dev/null || true; fi\n")
|
|
}
|
|
script.WriteString("printf '%s\\n' \"$rc\" >\"$EXIT_FILE\"\n")
|
|
script.WriteString("if [ \"$rc\" -eq 0 ]; then printf 'exited\\n' >\"$STATUS_FILE\"; else printf 'failed\\n' >\"$STATUS_FILE\"; fi\n")
|
|
script.WriteString("exit \"$rc\"\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionInspectScript(sessionID string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "DIR=%s\n", guestShellQuote(guestSessionStateDir(sessionID)))
|
|
script.WriteString("status=''\n")
|
|
script.WriteString("pid=''\n")
|
|
script.WriteString("exit_code=''\n")
|
|
script.WriteString("last_error=''\n")
|
|
script.WriteString("alive=false\n")
|
|
script.WriteString("[ -f \"$DIR/status\" ] && status=\"$(cat \"$DIR/status\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/pid\" ] && pid=\"$(cat \"$DIR/pid\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/exit_code\" ] && exit_code=\"$(cat \"$DIR/exit_code\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/error\" ] && last_error=\"$(cat \"$DIR/error\")\"\n")
|
|
script.WriteString("if [ -n \"$pid\" ] && kill -0 \"$pid\" 2>/dev/null; then alive=true; fi\n")
|
|
script.WriteString("printf 'status=%s\\n' \"$status\"\n")
|
|
script.WriteString("printf 'pid=%s\\n' \"$pid\"\n")
|
|
script.WriteString("printf 'exit=%s\\n' \"$exit_code\"\n")
|
|
script.WriteString("printf 'alive=%s\\n' \"$alive\"\n")
|
|
script.WriteString("printf 'error=%s\\n' \"$last_error\"\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionSignalScript(sessionID, signal string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "DIR=%s\n", guestShellQuote(guestSessionStateDir(sessionID)))
|
|
fmt.Fprintf(&script, "SIGNAL=%s\n", guestShellQuote(signal))
|
|
script.WriteString("pid=''\n")
|
|
script.WriteString("monitor=''\n")
|
|
script.WriteString("keepalive=''\n")
|
|
script.WriteString("[ -f \"$DIR/pid\" ] && pid=\"$(cat \"$DIR/pid\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/monitor_pid\" ] && monitor=\"$(cat \"$DIR/monitor_pid\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/stdin_keepalive.pid\" ] && keepalive=\"$(cat \"$DIR/stdin_keepalive.pid\")\"\n")
|
|
script.WriteString("printf 'stopping\\n' >\"$DIR/status\"\n")
|
|
script.WriteString("if [ -n \"$pid\" ]; then kill -${SIGNAL} \"$pid\" 2>/dev/null || true; fi\n")
|
|
script.WriteString("if [ -n \"$monitor\" ]; then kill -${SIGNAL} \"$monitor\" 2>/dev/null || true; fi\n")
|
|
script.WriteString("if [ -n \"$keepalive\" ]; then kill -${SIGNAL} \"$keepalive\" 2>/dev/null || true; fi\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionStateDir(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateRoot, id))
|
|
}
|
|
|
|
func guestSessionRelativeStateDir(id string) string {
|
|
return strings.TrimPrefix(guestSessionStateDir(id), "/root/")
|
|
}
|
|
|
|
func guestSessionScriptPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "run.sh"))
|
|
}
|
|
|
|
func guestSessionPIDPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "pid"))
|
|
}
|
|
|
|
func guestSessionMonitorPIDPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "monitor_pid"))
|
|
}
|
|
|
|
func guestSessionExitCodePath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "exit_code"))
|
|
}
|
|
|
|
func guestSessionStdinPipePath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stdin.pipe"))
|
|
}
|
|
|
|
func guestSessionStdinKeepalivePIDPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stdin_keepalive.pid"))
|
|
}
|
|
|
|
func guestSessionStatusPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "status"))
|
|
}
|
|
|
|
func guestSessionErrorPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "error"))
|
|
}
|
|
|
|
func guestSessionStdoutLogPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stdout.log"))
|
|
}
|
|
|
|
func guestSessionStderrLogPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stderr.log"))
|
|
}
|
|
|
|
func defaultGuestSessionName(id, command, explicit string) string {
|
|
if trimmed := strings.TrimSpace(explicit); trimmed != "" {
|
|
return trimmed
|
|
}
|
|
base := filepath.Base(strings.TrimSpace(command))
|
|
if base == "." || base == string(filepath.Separator) || base == "" {
|
|
base = "session"
|
|
}
|
|
return base + "-" + system.ShortID(id)
|
|
}
|
|
|
|
func defaultGuestSessionCWD(value string) string {
|
|
if trimmed := strings.TrimSpace(value); trimmed != "" {
|
|
return trimmed
|
|
}
|
|
return "/root"
|
|
}
|
|
|
|
func failGuestSessionLaunch(session model.GuestSession, stage, message, rawLog string) model.GuestSession {
|
|
now := model.Now()
|
|
session.Status = model.GuestSessionStatusFailed
|
|
session.LastError = strings.TrimSpace(message)
|
|
session.Attachable = false
|
|
session.Reattachable = false
|
|
session.LaunchStage = strings.TrimSpace(stage)
|
|
session.LaunchMessage = strings.TrimSpace(message)
|
|
session.LaunchRawLog = strings.TrimSpace(rawLog)
|
|
session.UpdatedAt = now
|
|
session.EndedAt = now
|
|
return session
|
|
}
|
|
|
|
func normalizeGuestSessionRequiredCommands(command string, extras []string) []string {
|
|
ordered := make([]string, 0, len(extras)+1)
|
|
seen := map[string]struct{}{}
|
|
appendValue := func(value string) {
|
|
trimmed := strings.TrimSpace(value)
|
|
if trimmed == "" {
|
|
return
|
|
}
|
|
if _, ok := seen[trimmed]; ok {
|
|
return
|
|
}
|
|
seen[trimmed] = struct{}{}
|
|
ordered = append(ordered, trimmed)
|
|
}
|
|
appendValue(command)
|
|
for _, extra := range extras {
|
|
appendValue(extra)
|
|
}
|
|
return ordered
|
|
}
|
|
|
|
func guestSessionCWDPreflightScript(cwd string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "DIR=%s\n", guestShellQuote(defaultGuestSessionCWD(cwd)))
|
|
script.WriteString("if [ ! -d \"$DIR\" ]; then echo \"missing cwd: $DIR\"; exit 1; fi\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionCommandPreflightScript(commands []string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
script.WriteString("check_command() {\n")
|
|
script.WriteString(" cmd=\"$1\"\n")
|
|
script.WriteString(" case \"$cmd\" in\n")
|
|
script.WriteString(" */*) [ -x \"$cmd\" ] || { echo \"missing command: $cmd\"; exit 1; } ;;\n")
|
|
script.WriteString(" *) command -v \"$cmd\" >/dev/null 2>&1 || { echo \"missing command: $cmd\"; exit 1; } ;;\n")
|
|
script.WriteString(" esac\n")
|
|
script.WriteString("}\n")
|
|
for _, command := range commands {
|
|
fmt.Fprintf(&script, "check_command %s\n", guestShellQuote(command))
|
|
}
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionAttachInputCommand(sessionID string) string {
|
|
path := guestSessionStdinPipePath(sessionID)
|
|
return "bash -lc " + guestShellQuote(fmt.Sprintf("set -euo pipefail\n[ -p %s ] || mkfifo -m 600 %s\nexec cat > %s\n", guestShellQuote(path), guestShellQuote(path), guestShellQuote(path)))
|
|
}
|
|
|
|
func guestSessionAttachTailCommand(path string) string {
|
|
return "bash -lc " + guestShellQuote(fmt.Sprintf("set -euo pipefail\ntouch %s\nexec tail -n 0 -F %s 2>/dev/null\n", guestShellQuote(path), guestShellQuote(path)))
|
|
}
|
|
|
|
func guestSessionEnvLines(values map[string]string) []string {
|
|
if len(values) == 0 {
|
|
return nil
|
|
}
|
|
keys := make([]string, 0, len(values))
|
|
for key := range values {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
lines := make([]string, 0, len(keys))
|
|
for _, key := range keys {
|
|
lines = append(lines, "export "+key+"="+guestShellQuote(values[key]))
|
|
}
|
|
return lines
|
|
}
|
|
|
|
func guestShellQuote(value string) string {
|
|
return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'"
|
|
}
|
|
|
|
func parseGuestSessionState(raw string) (guestSessionStateSnapshot, error) {
|
|
var snapshot guestSessionStateSnapshot
|
|
scanner := bufio.NewScanner(strings.NewReader(raw))
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
key, value, ok := strings.Cut(line, "=")
|
|
if !ok {
|
|
continue
|
|
}
|
|
switch strings.TrimSpace(key) {
|
|
case "status":
|
|
snapshot.Status = strings.TrimSpace(value)
|
|
case "pid":
|
|
if pid, err := strconv.Atoi(strings.TrimSpace(value)); err == nil {
|
|
snapshot.GuestPID = pid
|
|
}
|
|
case "exit":
|
|
if exitCode, err := strconv.Atoi(strings.TrimSpace(value)); err == nil {
|
|
snapshot.ExitCode = &exitCode
|
|
}
|
|
case "alive":
|
|
snapshot.Alive = strings.TrimSpace(value) == "true"
|
|
case "error":
|
|
snapshot.LastError = strings.TrimSpace(value)
|
|
}
|
|
}
|
|
return snapshot, scanner.Err()
|
|
}
|
|
|
|
func guestSessionExitCode(err error) (int, bool) {
|
|
if err == nil {
|
|
return 0, true
|
|
}
|
|
var exitErr *ssh.ExitError
|
|
if errors.As(err, &exitErr) {
|
|
return exitErr.ExitStatus(), true
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func cloneStringMap(values map[string]string) map[string]string {
|
|
if len(values) == 0 {
|
|
return nil
|
|
}
|
|
cloned := make(map[string]string, len(values))
|
|
for key, value := range values {
|
|
cloned[key] = value
|
|
}
|
|
return cloned
|
|
}
|
|
|
|
func tailFileContent(path string, lines int) (string, error) {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return "", nil
|
|
}
|
|
return "", err
|
|
}
|
|
if lines <= 0 {
|
|
return string(data), nil
|
|
}
|
|
parts := strings.Split(string(data), "\n")
|
|
if len(parts) <= lines {
|
|
return string(data), nil
|
|
}
|
|
return strings.Join(parts[len(parts)-lines-1:], "\n"), nil
|
|
}
|
|
|
|
func processAlive(pid int) bool {
|
|
if pid <= 0 {
|
|
return false
|
|
}
|
|
return syscallKill(pid, syscall.Signal(0)) == nil
|
|
}
|
|
|
|
var syscallKill = func(pid int, signal os.Signal) error {
|
|
proc, err := os.FindProcess(pid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return proc.Signal(signal)
|
|
}
|
|
|
|
func formatGuestSessionStepError(action string, err error, log string) error {
|
|
log = strings.TrimSpace(log)
|
|
if log == "" {
|
|
return fmt.Errorf("%s: %w", action, err)
|
|
}
|
|
return fmt.Errorf("%s: %w: %s", action, err, log)
|
|
}
|
|
|
|
func guestSessionStateChanged(before, after model.GuestSession) bool {
|
|
if before.Status != after.Status || before.GuestPID != after.GuestPID || before.LastError != after.LastError || before.Attachable != after.Attachable || before.Reattachable != after.Reattachable || before.AttachBackend != after.AttachBackend || before.AttachMode != after.AttachMode || before.LaunchStage != after.LaunchStage || before.LaunchMessage != after.LaunchMessage || before.LaunchRawLog != after.LaunchRawLog {
|
|
return true
|
|
}
|
|
if before.StartedAt != after.StartedAt || before.EndedAt != after.EndedAt {
|
|
return true
|
|
}
|
|
switch {
|
|
case before.ExitCode == nil && after.ExitCode == nil:
|
|
return false
|
|
case before.ExitCode == nil || after.ExitCode == nil:
|
|
return true
|
|
default:
|
|
return *before.ExitCode != *after.ExitCode
|
|
}
|
|
}
|