Guest session cwd and command preflight helpers were emitting literal `\\n` separators, so the guest shell saw malformed one-line scripts and could fail `preflight_cwd` even when `/root/repo` already existed. Replace those builders with real newlines, and fix the nearby attach helper commands that were making the same mistake. Add a small daemon guest-SSH seam so workspace preparation and session start can share a fake backend in tests, then cover the regression with an end-to-end daemon test for `PrepareVMWorkspace` followed by `StartGuestSession` on `/root/repo`. Validation: `GOCACHE=/tmp/banger-gocache go test ./internal/daemon` and `GOCACHE=/tmp/banger-gocache go test ./...`.
1227 lines
42 KiB
Go
1227 lines
42 KiB
Go
package daemon
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"banger/internal/api"
|
|
"banger/internal/guest"
|
|
"banger/internal/model"
|
|
"banger/internal/sessionstream"
|
|
"banger/internal/system"
|
|
|
|
"golang.org/x/crypto/ssh"
|
|
)
|
|
|
|
const (
|
|
guestSessionBackendSSH = "ssh"
|
|
guestSessionAttachBackendNone = "none"
|
|
guestSessionAttachBackendSSHBridge = "ssh_rehydratable"
|
|
guestSessionAttachModeExclusive = "exclusive"
|
|
guestSessionTransportUnixSocket = "unix_socket"
|
|
guestSessionStateRoot = "/root/.local/state/banger/sessions"
|
|
guestSessionLogTailLine = 200
|
|
)
|
|
|
|
var guestSessionHostCommandOutputFunc = func(ctx context.Context, name string, args ...string) ([]byte, error) {
|
|
runner := system.NewRunner()
|
|
output, err := runner.Run(ctx, name, args...)
|
|
if err == nil {
|
|
return output, nil
|
|
}
|
|
command := strings.TrimSpace(strings.Join(append([]string{name}, args...), " "))
|
|
detail := strings.TrimSpace(string(output))
|
|
if detail == "" {
|
|
return output, fmt.Errorf("%s: %w", command, err)
|
|
}
|
|
return output, fmt.Errorf("%s: %w: %s", command, err, detail)
|
|
}
|
|
|
|
type guestSSHClient interface {
|
|
Close() error
|
|
RunScript(context.Context, string, io.Writer) error
|
|
UploadFile(context.Context, string, os.FileMode, []byte, io.Writer) error
|
|
StreamTar(context.Context, string, string, io.Writer) error
|
|
StreamTarEntries(context.Context, string, []string, string, io.Writer) error
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSSH(ctx context.Context, address string, interval time.Duration) error {
|
|
if d != nil && d.guestWaitForSSH != nil {
|
|
return d.guestWaitForSSH(ctx, address, d.config.SSHKeyPath, interval)
|
|
}
|
|
return guest.WaitForSSH(ctx, address, d.config.SSHKeyPath, interval)
|
|
}
|
|
|
|
func (d *Daemon) dialGuest(ctx context.Context, address string) (guestSSHClient, error) {
|
|
if d != nil && d.guestDial != nil {
|
|
return d.guestDial(ctx, address, d.config.SSHKeyPath)
|
|
}
|
|
return guest.Dial(ctx, address, d.config.SSHKeyPath)
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSessionReadyHook(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) {
|
|
if d != nil && d.waitForGuestSessionReady != nil {
|
|
return d.waitForGuestSessionReady(ctx, vm, session)
|
|
}
|
|
return d.waitForGuestSessionReadyDefault(ctx, vm, session)
|
|
}
|
|
|
|
type guestSessionController struct {
|
|
stream *guest.StreamSession
|
|
streams []*guest.StreamSession
|
|
stdin io.WriteCloser
|
|
attachMu sync.Mutex
|
|
attach net.Conn
|
|
writeMu sync.Mutex
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
func (c *guestSessionController) setAttach(conn net.Conn) error {
|
|
c.attachMu.Lock()
|
|
defer c.attachMu.Unlock()
|
|
if c.attach != nil {
|
|
return errors.New("session already has an active attach")
|
|
}
|
|
c.attach = conn
|
|
return nil
|
|
}
|
|
|
|
func (c *guestSessionController) clearAttach(conn net.Conn) {
|
|
c.attachMu.Lock()
|
|
defer c.attachMu.Unlock()
|
|
if c.attach == conn {
|
|
c.attach = nil
|
|
}
|
|
}
|
|
|
|
func (c *guestSessionController) writeFrame(channel byte, payload []byte) {
|
|
c.attachMu.Lock()
|
|
conn := c.attach
|
|
c.attachMu.Unlock()
|
|
if conn == nil {
|
|
return
|
|
}
|
|
c.writeMu.Lock()
|
|
err := sessionstream.WriteFrame(conn, channel, payload)
|
|
c.writeMu.Unlock()
|
|
if err != nil {
|
|
_ = conn.Close()
|
|
c.clearAttach(conn)
|
|
}
|
|
}
|
|
|
|
func (c *guestSessionController) writeControl(message sessionstream.ControlMessage) {
|
|
c.attachMu.Lock()
|
|
conn := c.attach
|
|
c.attachMu.Unlock()
|
|
if conn == nil {
|
|
return
|
|
}
|
|
c.writeMu.Lock()
|
|
err := sessionstream.WriteControl(conn, message)
|
|
c.writeMu.Unlock()
|
|
if err != nil {
|
|
_ = conn.Close()
|
|
c.clearAttach(conn)
|
|
}
|
|
}
|
|
|
|
func (c *guestSessionController) close() error {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
var err error
|
|
c.closeOnce.Do(func() {
|
|
c.attachMu.Lock()
|
|
conn := c.attach
|
|
c.attach = nil
|
|
c.attachMu.Unlock()
|
|
if conn != nil {
|
|
err = errors.Join(err, conn.Close())
|
|
}
|
|
if c.stdin != nil {
|
|
err = errors.Join(err, c.stdin.Close())
|
|
}
|
|
if c.stream != nil {
|
|
err = errors.Join(err, c.stream.Close())
|
|
}
|
|
for _, stream := range c.streams {
|
|
if stream != nil {
|
|
err = errors.Join(err, stream.Close())
|
|
}
|
|
}
|
|
})
|
|
return err
|
|
}
|
|
|
|
type guestSessionStateSnapshot struct {
|
|
Status string
|
|
GuestPID int
|
|
ExitCode *int
|
|
Alive bool
|
|
LastError string
|
|
}
|
|
|
|
func (d *Daemon) StartGuestSession(ctx context.Context, params api.GuestSessionStartParams) (model.GuestSession, error) {
|
|
stdinMode := model.GuestSessionStdinMode(strings.TrimSpace(params.StdinMode))
|
|
if stdinMode == "" {
|
|
stdinMode = model.GuestSessionStdinClosed
|
|
}
|
|
if stdinMode != model.GuestSessionStdinClosed && stdinMode != model.GuestSessionStdinPipe {
|
|
return model.GuestSession{}, fmt.Errorf("unsupported stdin mode %q", params.StdinMode)
|
|
}
|
|
if strings.TrimSpace(params.Command) == "" {
|
|
return model.GuestSession{}, errors.New("session command is required")
|
|
}
|
|
var created model.GuestSession
|
|
_, err := d.withVMLockByRef(ctx, params.VMIDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name)
|
|
}
|
|
session, err := d.startGuestSessionLocked(ctx, vm, params, stdinMode)
|
|
if err != nil {
|
|
return model.VMRecord{}, err
|
|
}
|
|
created = session
|
|
return vm, nil
|
|
})
|
|
return created, err
|
|
}
|
|
|
|
func (d *Daemon) startGuestSessionLocked(ctx context.Context, vm model.VMRecord, params api.GuestSessionStartParams, stdinMode model.GuestSessionStdinMode) (model.GuestSession, error) {
|
|
id, err := model.NewID()
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
now := model.Now()
|
|
session := model.GuestSession{
|
|
ID: id,
|
|
VMID: vm.ID,
|
|
Name: defaultGuestSessionName(id, params.Command, params.Name),
|
|
Backend: guestSessionBackendSSH,
|
|
Command: params.Command,
|
|
Args: append([]string(nil), params.Args...),
|
|
CWD: strings.TrimSpace(params.CWD),
|
|
Env: cloneStringMap(params.Env),
|
|
StdinMode: stdinMode,
|
|
Status: model.GuestSessionStatusStarting,
|
|
GuestStateDir: guestSessionStateDir(id),
|
|
StdoutLogPath: guestSessionStdoutLogPath(id),
|
|
StderrLogPath: guestSessionStderrLogPath(id),
|
|
Tags: cloneStringMap(params.Tags),
|
|
Attachable: stdinMode == model.GuestSessionStdinPipe,
|
|
Reattachable: stdinMode == model.GuestSessionStdinPipe,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
if session.Attachable {
|
|
session.AttachBackend = guestSessionAttachBackendSSHBridge
|
|
session.AttachMode = guestSessionAttachModeExclusive
|
|
} else {
|
|
session.AttachBackend = guestSessionAttachBackendNone
|
|
}
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
fail := func(stage, message, rawLog string) (model.GuestSession, error) {
|
|
session = failGuestSessionLaunch(session, stage, message, rawLog)
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
address := net.JoinHostPort(vm.Runtime.GuestIP, "22")
|
|
if err := d.waitForGuestSSH(ctx, address, 250*time.Millisecond); err != nil {
|
|
return fail("ssh_unavailable", fmt.Sprintf("guest ssh unavailable: %v", err), "")
|
|
}
|
|
client, err := d.dialGuest(ctx, address)
|
|
if err != nil {
|
|
return fail("dial_guest", fmt.Sprintf("dial guest ssh: %v", err), "")
|
|
}
|
|
defer client.Close()
|
|
var preflightLog bytes.Buffer
|
|
if err := client.RunScript(ctx, guestSessionCWDPreflightScript(session.CWD), &preflightLog); err != nil {
|
|
return fail("preflight_cwd", fmt.Sprintf("guest working directory is unavailable: %s", defaultGuestSessionCWD(session.CWD)), preflightLog.String())
|
|
}
|
|
preflightLog.Reset()
|
|
requiredCommands := normalizeGuestSessionRequiredCommands(params.Command, params.RequiredCommands)
|
|
if err := client.RunScript(ctx, guestSessionCommandPreflightScript(requiredCommands), &preflightLog); err != nil {
|
|
return fail("preflight_command", fmt.Sprintf("required guest command is unavailable: %s", strings.TrimSpace(preflightLog.String())), preflightLog.String())
|
|
}
|
|
var uploadLog bytes.Buffer
|
|
if err := client.UploadFile(ctx, guestSessionScriptPath(id), 0o755, []byte(guestSessionScript(session)), &uploadLog); err != nil {
|
|
return fail("upload_script", "upload guest session script failed", uploadLog.String())
|
|
}
|
|
var launchLog bytes.Buffer
|
|
launchScript := fmt.Sprintf("set -euo pipefail\nnohup bash %s >/dev/null 2>&1 </dev/null &\ndisown || true\n", guestShellQuote(guestSessionScriptPath(id)))
|
|
if err := client.RunScript(ctx, launchScript, &launchLog); err != nil {
|
|
return fail("launch", "launch guest session failed", launchLog.String())
|
|
}
|
|
readyCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
updated, err := d.waitForGuestSessionReadyHook(readyCtx, vm, session)
|
|
if err != nil {
|
|
return fail("ready_wait", "guest session did not report ready state", err.Error())
|
|
}
|
|
session = updated
|
|
if session.Status == model.GuestSessionStatusStarting {
|
|
session.Status = model.GuestSessionStatusRunning
|
|
session.StartedAt = model.Now()
|
|
session.UpdatedAt = model.Now()
|
|
}
|
|
session.LaunchStage = ""
|
|
session.LaunchMessage = ""
|
|
session.LaunchRawLog = ""
|
|
session.LastError = ""
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
func (d *Daemon) GetGuestSession(ctx context.Context, params api.GuestSessionRefParams) (model.GuestSession, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return d.refreshGuestSession(ctx, vm, session)
|
|
}
|
|
|
|
func (d *Daemon) ListGuestSessions(ctx context.Context, params api.VMRefParams) ([]model.GuestSession, error) {
|
|
vm, err := d.FindVM(ctx, params.IDOrName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sessions, err := d.store.ListGuestSessionsByVM(ctx, vm.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for index := range sessions {
|
|
refreshed, refreshErr := d.refreshGuestSession(ctx, vm, sessions[index])
|
|
if refreshErr == nil {
|
|
sessions[index] = refreshed
|
|
}
|
|
}
|
|
return sessions, nil
|
|
}
|
|
|
|
func (d *Daemon) StopGuestSession(ctx context.Context, params api.GuestSessionRefParams) (model.GuestSession, error) {
|
|
return d.signalGuestSession(ctx, params, "TERM")
|
|
}
|
|
|
|
func (d *Daemon) KillGuestSession(ctx context.Context, params api.GuestSessionRefParams) (model.GuestSession, error) {
|
|
return d.signalGuestSession(ctx, params, "KILL")
|
|
}
|
|
|
|
func (d *Daemon) signalGuestSession(ctx context.Context, params api.GuestSessionRefParams, signal string) (model.GuestSession, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
session, _ = d.refreshGuestSession(ctx, vm, session)
|
|
if session.Status == model.GuestSessionStatusExited || session.Status == model.GuestSessionStatusFailed {
|
|
return session, nil
|
|
}
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
session.Status = model.GuestSessionStatusFailed
|
|
session.LastError = "vm is not running"
|
|
now := model.Now()
|
|
session.UpdatedAt = now
|
|
session.EndedAt = now
|
|
session.Attachable = false
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
defer client.Close()
|
|
var log bytes.Buffer
|
|
if err := client.RunScript(ctx, guestSessionSignalScript(session.ID, signal), &log); err != nil {
|
|
return model.GuestSession{}, formatGuestSessionStepError("signal guest session", err, log.String())
|
|
}
|
|
session.Status = model.GuestSessionStatusStopping
|
|
session.UpdatedAt = model.Now()
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
func (d *Daemon) GuestSessionLogs(ctx context.Context, params api.GuestSessionLogsParams) (api.GuestSessionLogsResult, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionLogsResult{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionLogsResult{}, err
|
|
}
|
|
streamName := strings.TrimSpace(params.Stream)
|
|
if streamName == "" {
|
|
streamName = "stdout"
|
|
}
|
|
tailLines := params.TailLines
|
|
if tailLines <= 0 {
|
|
tailLines = guestSessionLogTailLine
|
|
}
|
|
path := session.StdoutLogPath
|
|
if streamName == "stderr" {
|
|
path = session.StderrLogPath
|
|
}
|
|
content, err := d.readGuestSessionLog(ctx, vm, session, streamName, tailLines)
|
|
if err != nil {
|
|
return api.GuestSessionLogsResult{}, err
|
|
}
|
|
return api.GuestSessionLogsResult{Session: session, Stream: streamName, Path: path, Content: content}, nil
|
|
}
|
|
|
|
func (d *Daemon) BeginGuestSessionAttach(ctx context.Context, params api.GuestSessionAttachBeginParams) (api.GuestSessionAttachBeginResult, error) {
|
|
vm, err := d.FindVM(ctx, params.VMIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName)
|
|
if err != nil {
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
session, _ = d.refreshGuestSession(ctx, vm, session)
|
|
if !session.Attachable {
|
|
return api.GuestSessionAttachBeginResult{}, errors.New("session is not attachable")
|
|
}
|
|
controller := &guestSessionController{}
|
|
if !d.claimGuestSessionController(session.ID, controller) {
|
|
return api.GuestSessionAttachBeginResult{}, errors.New("session already has an active attach")
|
|
}
|
|
attachID, err := model.NewID()
|
|
if err != nil {
|
|
d.clearGuestSessionController(session.ID)
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
socketPath := filepath.Join(d.layout.RuntimeDir, "guest-session-attach-"+attachID[:12]+".sock")
|
|
_ = os.Remove(socketPath)
|
|
listener, err := net.Listen("unix", socketPath)
|
|
if err != nil {
|
|
d.clearGuestSessionController(session.ID)
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
if err := os.Chmod(socketPath, 0o600); err != nil {
|
|
_ = listener.Close()
|
|
_ = os.Remove(socketPath)
|
|
d.clearGuestSessionController(session.ID)
|
|
return api.GuestSessionAttachBeginResult{}, err
|
|
}
|
|
go d.serveGuestSessionAttach(session, controller, attachID, socketPath, listener)
|
|
return api.GuestSessionAttachBeginResult{
|
|
Session: session,
|
|
AttachID: attachID,
|
|
TransportKind: guestSessionTransportUnixSocket,
|
|
TransportTarget: socketPath,
|
|
SocketPath: socketPath,
|
|
StreamFormat: sessionstream.FormatV1,
|
|
}, nil
|
|
}
|
|
|
|
func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if d.sessionControllers == nil {
|
|
d.sessionControllers = make(map[string]*guestSessionController)
|
|
}
|
|
d.sessionControllers[id] = controller
|
|
}
|
|
|
|
func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if d.sessionControllers == nil {
|
|
d.sessionControllers = make(map[string]*guestSessionController)
|
|
}
|
|
if d.sessionControllers[id] != nil {
|
|
return false
|
|
}
|
|
d.sessionControllers[id] = controller
|
|
return true
|
|
}
|
|
|
|
func (d *Daemon) getGuestSessionController(id string) *guestSessionController {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
return d.sessionControllers[id]
|
|
}
|
|
|
|
func (d *Daemon) clearGuestSessionController(id string) *guestSessionController {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
controller := d.sessionControllers[id]
|
|
delete(d.sessionControllers, id)
|
|
return controller
|
|
}
|
|
|
|
func (d *Daemon) closeGuestSessionControllers() error {
|
|
d.mu.Lock()
|
|
controllers := make([]*guestSessionController, 0, len(d.sessionControllers))
|
|
for _, controller := range d.sessionControllers {
|
|
controllers = append(controllers, controller)
|
|
}
|
|
d.sessionControllers = nil
|
|
d.mu.Unlock()
|
|
var err error
|
|
for _, controller := range controllers {
|
|
err = errors.Join(err, controller.close())
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (d *Daemon) forwardGuestSessionOutput(_ string, controller *guestSessionController, channel byte, reader io.Reader) {
|
|
buffer := make([]byte, 32*1024)
|
|
for {
|
|
n, err := reader.Read(buffer)
|
|
if n > 0 {
|
|
controller.writeFrame(channel, buffer[:n])
|
|
}
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) {
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSessionExit(id string, controller *guestSessionController, session model.GuestSession) {
|
|
err := controller.stream.Wait()
|
|
updated := session
|
|
updated.Attachable = false
|
|
now := model.Now()
|
|
updated.UpdatedAt = now
|
|
updated.EndedAt = now
|
|
if exitCode, ok := guestSessionExitCode(err); ok {
|
|
updated.ExitCode = &exitCode
|
|
if exitCode == 0 {
|
|
updated.Status = model.GuestSessionStatusExited
|
|
} else {
|
|
updated.Status = model.GuestSessionStatusFailed
|
|
}
|
|
}
|
|
if err != nil && updated.LastError == "" {
|
|
updated.LastError = err.Error()
|
|
}
|
|
if vm, getErr := d.store.GetVMByID(context.Background(), updated.VMID); getErr == nil {
|
|
if refreshed, refreshErr := d.refreshGuestSession(context.Background(), vm, updated); refreshErr == nil {
|
|
updated = refreshed
|
|
}
|
|
}
|
|
_ = d.store.UpsertGuestSession(context.Background(), updated)
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: updated.ExitCode})
|
|
_ = controller.close()
|
|
d.clearGuestSessionController(id)
|
|
}
|
|
|
|
func (d *Daemon) serveGuestSessionAttach(session model.GuestSession, controller *guestSessionController, _ string, socketPath string, listener net.Listener) {
|
|
defer func() {
|
|
_ = listener.Close()
|
|
_ = os.Remove(socketPath)
|
|
_ = controller.close()
|
|
d.clearGuestSessionController(session.ID)
|
|
}()
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
if err := controller.setAttach(conn); err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
defer controller.clearAttach(conn)
|
|
if err := d.attachGuestSessionBridge(session, controller); err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
for {
|
|
channel, payload, err := sessionstream.ReadFrame(conn)
|
|
if err != nil {
|
|
return
|
|
}
|
|
switch channel {
|
|
case sessionstream.ChannelStdin:
|
|
if controller.stdin == nil {
|
|
continue
|
|
}
|
|
if _, err := controller.stdin.Write(payload); err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
case sessionstream.ChannelControl:
|
|
message, err := sessionstream.ReadControl(payload)
|
|
if err != nil {
|
|
_ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
return
|
|
}
|
|
if message.Type == "eof" && controller.stdin != nil {
|
|
_ = controller.stdin.Close()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) attachGuestSessionBridge(session model.GuestSession, controller *guestSessionController) error {
|
|
vm, err := d.store.GetVMByID(context.Background(), session.VMID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
return fmt.Errorf("vm %q is not running", vm.Name)
|
|
}
|
|
address := net.JoinHostPort(vm.Runtime.GuestIP, "22")
|
|
stdinStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachInputCommand(session.ID))
|
|
if err != nil {
|
|
return fmt.Errorf("open guest session stdin stream: %w", err)
|
|
}
|
|
stdoutStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StdoutLogPath))
|
|
if err != nil {
|
|
_ = stdinStream.Close()
|
|
return fmt.Errorf("open guest session stdout stream: %w", err)
|
|
}
|
|
stderrStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StderrLogPath))
|
|
if err != nil {
|
|
_ = stdinStream.Close()
|
|
_ = stdoutStream.Close()
|
|
return fmt.Errorf("open guest session stderr stream: %w", err)
|
|
}
|
|
controller.streams = append(controller.streams, stdinStream, stdoutStream, stderrStream)
|
|
controller.stdin = stdinStream.Stdin()
|
|
go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStdout, stdoutStream.Stdout())
|
|
go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStderr, stderrStream.Stdout())
|
|
go d.watchGuestSessionAttach(session.ID, controller, session)
|
|
return nil
|
|
}
|
|
|
|
func (d *Daemon) openGuestSessionAttachStream(address, command string) (*guest.StreamSession, error) {
|
|
client, err := guest.Dial(context.Background(), address, d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
stream, err := client.StartCommand(context.Background(), command)
|
|
if err != nil {
|
|
_ = client.Close()
|
|
return nil, err
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
func (d *Daemon) watchGuestSessionAttach(id string, controller *guestSessionController, session model.GuestSession) {
|
|
ticker := time.NewTicker(250 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
vm, err := d.store.GetVMByID(context.Background(), session.VMID)
|
|
if err != nil {
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()})
|
|
_ = controller.close()
|
|
return
|
|
}
|
|
refreshed, err := d.refreshGuestSession(context.Background(), vm, session)
|
|
if err == nil {
|
|
session = refreshed
|
|
}
|
|
if session.Status == model.GuestSessionStatusExited || session.Status == model.GuestSessionStatusFailed {
|
|
controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: session.ExitCode})
|
|
_ = controller.close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) waitForGuestSessionReadyDefault(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) {
|
|
for {
|
|
updated, err := d.refreshGuestSession(ctx, vm, session)
|
|
if err == nil {
|
|
session = updated
|
|
if session.GuestPID != 0 || session.ExitCode != nil || session.Status == model.GuestSessionStatusRunning || session.Status == model.GuestSessionStatusFailed || session.Status == model.GuestSessionStatusExited {
|
|
return session, nil
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return session, ctx.Err()
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) refreshGuestSession(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) {
|
|
if session.Status != model.GuestSessionStatusStarting && session.Status != model.GuestSessionStatusRunning && session.Status != model.GuestSessionStatusStopping {
|
|
return session, nil
|
|
}
|
|
snapshot, err := d.inspectGuestSessionState(ctx, vm, session)
|
|
if err != nil {
|
|
return session, err
|
|
}
|
|
original := session
|
|
applyGuestSessionSnapshot(&session, snapshot, vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath))
|
|
if guestSessionStateChanged(original, session) {
|
|
session.UpdatedAt = model.Now()
|
|
if err := d.store.UpsertGuestSession(ctx, session); err != nil {
|
|
return session, err
|
|
}
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
func applyGuestSessionSnapshot(session *model.GuestSession, snapshot guestSessionStateSnapshot, vmRunning bool) {
|
|
if session == nil {
|
|
return
|
|
}
|
|
if snapshot.GuestPID != 0 {
|
|
session.GuestPID = snapshot.GuestPID
|
|
}
|
|
if snapshot.LastError != "" {
|
|
session.LastError = snapshot.LastError
|
|
}
|
|
if snapshot.ExitCode != nil {
|
|
session.ExitCode = snapshot.ExitCode
|
|
session.Attachable = false
|
|
session.Reattachable = false
|
|
if session.StartedAt.IsZero() {
|
|
session.StartedAt = model.Now()
|
|
}
|
|
if session.EndedAt.IsZero() {
|
|
session.EndedAt = model.Now()
|
|
}
|
|
if *snapshot.ExitCode == 0 {
|
|
session.Status = model.GuestSessionStatusExited
|
|
} else {
|
|
session.Status = model.GuestSessionStatusFailed
|
|
}
|
|
return
|
|
}
|
|
if snapshot.Alive {
|
|
if session.StartedAt.IsZero() {
|
|
session.StartedAt = model.Now()
|
|
}
|
|
session.Status = model.GuestSessionStatusRunning
|
|
return
|
|
}
|
|
if !vmRunning && (session.Status == model.GuestSessionStatusStarting || session.Status == model.GuestSessionStatusRunning || session.Status == model.GuestSessionStatusStopping) {
|
|
session.Status = model.GuestSessionStatusFailed
|
|
session.Attachable = false
|
|
session.Reattachable = false
|
|
if session.LastError == "" {
|
|
session.LastError = "vm is not running"
|
|
}
|
|
if session.EndedAt.IsZero() {
|
|
session.EndedAt = model.Now()
|
|
}
|
|
return
|
|
}
|
|
if snapshot.Status == string(model.GuestSessionStatusRunning) {
|
|
if session.StartedAt.IsZero() {
|
|
session.StartedAt = model.Now()
|
|
}
|
|
session.Status = model.GuestSessionStatusRunning
|
|
}
|
|
if session.Status == model.GuestSessionStatusRunning && session.StdinMode == model.GuestSessionStdinPipe {
|
|
session.Attachable = true
|
|
session.Reattachable = true
|
|
if session.AttachBackend == "" {
|
|
session.AttachBackend = guestSessionAttachBackendSSHBridge
|
|
}
|
|
if session.AttachMode == "" {
|
|
session.AttachMode = guestSessionAttachModeExclusive
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) inspectGuestSessionState(ctx context.Context, vm model.VMRecord, session model.GuestSession) (guestSessionStateSnapshot, error) {
|
|
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return guestSessionStateSnapshot{}, err
|
|
}
|
|
defer client.Close()
|
|
var output bytes.Buffer
|
|
if err := client.RunScript(ctx, guestSessionInspectScript(session.ID), &output); err != nil {
|
|
return guestSessionStateSnapshot{}, formatGuestSessionStepError("inspect guest session state", err, output.String())
|
|
}
|
|
return parseGuestSessionState(output.String())
|
|
}
|
|
return d.inspectGuestSessionStateFromWorkDisk(ctx, vm, session.ID)
|
|
}
|
|
|
|
func (d *Daemon) inspectGuestSessionStateFromWorkDisk(ctx context.Context, vm model.VMRecord, sessionID string) (guestSessionStateSnapshot, error) {
|
|
runner := d.runner
|
|
if runner == nil {
|
|
runner = system.NewRunner()
|
|
}
|
|
workMount, cleanup, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false)
|
|
if err != nil {
|
|
return guestSessionStateSnapshot{}, err
|
|
}
|
|
defer cleanup()
|
|
stateDir := filepath.Join(workMount, guestSessionRelativeStateDir(sessionID))
|
|
return inspectGuestSessionStateFromDir(stateDir)
|
|
}
|
|
|
|
func inspectGuestSessionStateFromDir(stateDir string) (guestSessionStateSnapshot, error) {
|
|
var snapshot guestSessionStateSnapshot
|
|
statusData, _ := os.ReadFile(filepath.Join(stateDir, "status"))
|
|
snapshot.Status = strings.TrimSpace(string(statusData))
|
|
pidData, _ := os.ReadFile(filepath.Join(stateDir, "pid"))
|
|
if pidValue, err := strconv.Atoi(strings.TrimSpace(string(pidData))); err == nil {
|
|
snapshot.GuestPID = pidValue
|
|
}
|
|
exitData, _ := os.ReadFile(filepath.Join(stateDir, "exit_code"))
|
|
if exitValue, err := strconv.Atoi(strings.TrimSpace(string(exitData))); err == nil {
|
|
snapshot.ExitCode = &exitValue
|
|
}
|
|
errorData, _ := os.ReadFile(filepath.Join(stateDir, "error"))
|
|
snapshot.LastError = strings.TrimSpace(string(errorData))
|
|
if snapshot.GuestPID != 0 {
|
|
snapshot.Alive = processAlive(snapshot.GuestPID)
|
|
}
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (d *Daemon) readGuestSessionLog(ctx context.Context, vm model.VMRecord, session model.GuestSession, stream string, tailLines int) (string, error) {
|
|
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
|
|
client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer client.Close()
|
|
path := session.StdoutLogPath
|
|
if stream == "stderr" {
|
|
path = session.StderrLogPath
|
|
}
|
|
var output bytes.Buffer
|
|
script := fmt.Sprintf("set -euo pipefail\nif [ -f %s ]; then tail -n %d %s; fi\n", guestShellQuote(path), tailLines, guestShellQuote(path))
|
|
if err := client.RunScript(ctx, script, &output); err != nil {
|
|
return "", formatGuestSessionStepError("read guest session log", err, output.String())
|
|
}
|
|
return output.String(), nil
|
|
}
|
|
runner := d.runner
|
|
if runner == nil {
|
|
runner = system.NewRunner()
|
|
}
|
|
workMount, cleanup, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer cleanup()
|
|
logPath := filepath.Join(workMount, guestSessionRelativeStateDir(session.ID), stream+".log")
|
|
return tailFileContent(logPath, tailLines)
|
|
}
|
|
|
|
func (d *Daemon) findGuestSession(ctx context.Context, vmID, idOrName string) (model.GuestSession, error) {
|
|
if strings.TrimSpace(idOrName) == "" {
|
|
return model.GuestSession{}, errors.New("session id or name is required")
|
|
}
|
|
if session, err := d.store.GetGuestSession(ctx, vmID, idOrName); err == nil {
|
|
return session, nil
|
|
}
|
|
sessions, err := d.store.ListGuestSessionsByVM(ctx, vmID)
|
|
if err != nil {
|
|
return model.GuestSession{}, err
|
|
}
|
|
matches := make([]model.GuestSession, 0, 1)
|
|
for _, session := range sessions {
|
|
if strings.HasPrefix(session.ID, idOrName) || strings.HasPrefix(session.Name, idOrName) {
|
|
matches = append(matches, session)
|
|
}
|
|
}
|
|
switch len(matches) {
|
|
case 0:
|
|
return model.GuestSession{}, fmt.Errorf("session %q not found", idOrName)
|
|
case 1:
|
|
return matches[0], nil
|
|
default:
|
|
return model.GuestSession{}, fmt.Errorf("multiple sessions match %q", idOrName)
|
|
}
|
|
}
|
|
|
|
func guestSessionScript(session model.GuestSession) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "STATE_DIR=%s\n", guestShellQuote(session.GuestStateDir))
|
|
fmt.Fprintf(&script, "STDOUT_LOG=%s\n", guestShellQuote(session.StdoutLogPath))
|
|
fmt.Fprintf(&script, "STDERR_LOG=%s\n", guestShellQuote(session.StderrLogPath))
|
|
fmt.Fprintf(&script, "PID_FILE=%s\n", guestShellQuote(guestSessionPIDPath(session.ID)))
|
|
fmt.Fprintf(&script, "MONITOR_PID_FILE=%s\n", guestShellQuote(guestSessionMonitorPIDPath(session.ID)))
|
|
fmt.Fprintf(&script, "EXIT_FILE=%s\n", guestShellQuote(guestSessionExitCodePath(session.ID)))
|
|
fmt.Fprintf(&script, "STATUS_FILE=%s\n", guestShellQuote(guestSessionStatusPath(session.ID)))
|
|
fmt.Fprintf(&script, "ERROR_FILE=%s\n", guestShellQuote(guestSessionErrorPath(session.ID)))
|
|
fmt.Fprintf(&script, "STDIN_PIPE=%s\n", guestShellQuote(guestSessionStdinPipePath(session.ID)))
|
|
fmt.Fprintf(&script, "STDIN_KEEPALIVE_PID_FILE=%s\n", guestShellQuote(guestSessionStdinKeepalivePIDPath(session.ID)))
|
|
fmt.Fprintf(&script, "SESSION_CWD=%s\n", guestShellQuote(defaultGuestSessionCWD(session.CWD)))
|
|
script.WriteString("mkdir -p \"$STATE_DIR\"\n")
|
|
script.WriteString(": >\"$STDOUT_LOG\"\n")
|
|
script.WriteString(": >\"$STDERR_LOG\"\n")
|
|
script.WriteString("rm -f \"$EXIT_FILE\" \"$ERROR_FILE\" \"$STDIN_KEEPALIVE_PID_FILE\"\n")
|
|
if session.StdinMode == model.GuestSessionStdinPipe {
|
|
script.WriteString("rm -f \"$STDIN_PIPE\"\n")
|
|
script.WriteString("mkfifo -m 600 \"$STDIN_PIPE\"\n")
|
|
}
|
|
script.WriteString("printf '%s\\n' \"${BASHPID:-$$}\" >\"$MONITOR_PID_FILE\"\n")
|
|
script.WriteString("printf 'starting\\n' >\"$STATUS_FILE\"\n")
|
|
script.WriteString("cd \"$SESSION_CWD\"\n")
|
|
script.WriteString("exec > >(tee -a \"$STDOUT_LOG\") 2> >(tee -a \"$STDERR_LOG\" >&2)\n")
|
|
for _, line := range guestSessionEnvLines(session.Env) {
|
|
script.WriteString(line)
|
|
script.WriteByte('\n')
|
|
}
|
|
script.WriteString("COMMAND=(")
|
|
for _, value := range append([]string{session.Command}, session.Args...) {
|
|
script.WriteByte(' ')
|
|
script.WriteString(guestShellQuote(value))
|
|
}
|
|
script.WriteString(" )\n")
|
|
if session.StdinMode == model.GuestSessionStdinPipe {
|
|
script.WriteString("( while :; do sleep 3600; done ) >\"$STDIN_PIPE\" &\n")
|
|
script.WriteString("keepalive=$!\n")
|
|
script.WriteString("printf '%s\\n' \"$keepalive\" >\"$STDIN_KEEPALIVE_PID_FILE\"\n")
|
|
script.WriteString("\"${COMMAND[@]}\" <\"$STDIN_PIPE\" &\n")
|
|
} else {
|
|
script.WriteString("\"${COMMAND[@]}\" &\n")
|
|
}
|
|
script.WriteString("child=$!\n")
|
|
script.WriteString("printf '%s\\n' \"$child\" >\"$PID_FILE\"\n")
|
|
script.WriteString("printf 'running\\n' >\"$STATUS_FILE\"\n")
|
|
script.WriteString("wait \"$child\"\n")
|
|
script.WriteString("rc=$?\n")
|
|
if session.StdinMode == model.GuestSessionStdinPipe {
|
|
script.WriteString("if [ -f \"$STDIN_KEEPALIVE_PID_FILE\" ]; then kill \"$(cat \"$STDIN_KEEPALIVE_PID_FILE\")\" 2>/dev/null || true; fi\n")
|
|
}
|
|
script.WriteString("printf '%s\\n' \"$rc\" >\"$EXIT_FILE\"\n")
|
|
script.WriteString("if [ \"$rc\" -eq 0 ]; then printf 'exited\\n' >\"$STATUS_FILE\"; else printf 'failed\\n' >\"$STATUS_FILE\"; fi\n")
|
|
script.WriteString("exit \"$rc\"\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionInspectScript(sessionID string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "DIR=%s\n", guestShellQuote(guestSessionStateDir(sessionID)))
|
|
script.WriteString("status=''\n")
|
|
script.WriteString("pid=''\n")
|
|
script.WriteString("exit_code=''\n")
|
|
script.WriteString("last_error=''\n")
|
|
script.WriteString("alive=false\n")
|
|
script.WriteString("[ -f \"$DIR/status\" ] && status=\"$(cat \"$DIR/status\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/pid\" ] && pid=\"$(cat \"$DIR/pid\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/exit_code\" ] && exit_code=\"$(cat \"$DIR/exit_code\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/error\" ] && last_error=\"$(cat \"$DIR/error\")\"\n")
|
|
script.WriteString("if [ -n \"$pid\" ] && kill -0 \"$pid\" 2>/dev/null; then alive=true; fi\n")
|
|
script.WriteString("printf 'status=%s\\n' \"$status\"\n")
|
|
script.WriteString("printf 'pid=%s\\n' \"$pid\"\n")
|
|
script.WriteString("printf 'exit=%s\\n' \"$exit_code\"\n")
|
|
script.WriteString("printf 'alive=%s\\n' \"$alive\"\n")
|
|
script.WriteString("printf 'error=%s\\n' \"$last_error\"\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionSignalScript(sessionID, signal string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "DIR=%s\n", guestShellQuote(guestSessionStateDir(sessionID)))
|
|
fmt.Fprintf(&script, "SIGNAL=%s\n", guestShellQuote(signal))
|
|
script.WriteString("pid=''\n")
|
|
script.WriteString("monitor=''\n")
|
|
script.WriteString("keepalive=''\n")
|
|
script.WriteString("[ -f \"$DIR/pid\" ] && pid=\"$(cat \"$DIR/pid\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/monitor_pid\" ] && monitor=\"$(cat \"$DIR/monitor_pid\")\"\n")
|
|
script.WriteString("[ -f \"$DIR/stdin_keepalive.pid\" ] && keepalive=\"$(cat \"$DIR/stdin_keepalive.pid\")\"\n")
|
|
script.WriteString("printf 'stopping\\n' >\"$DIR/status\"\n")
|
|
script.WriteString("if [ -n \"$pid\" ]; then kill -${SIGNAL} \"$pid\" 2>/dev/null || true; fi\n")
|
|
script.WriteString("if [ -n \"$monitor\" ]; then kill -${SIGNAL} \"$monitor\" 2>/dev/null || true; fi\n")
|
|
script.WriteString("if [ -n \"$keepalive\" ]; then kill -${SIGNAL} \"$keepalive\" 2>/dev/null || true; fi\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionStateDir(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateRoot, id))
|
|
}
|
|
|
|
func guestSessionRelativeStateDir(id string) string {
|
|
return strings.TrimPrefix(guestSessionStateDir(id), "/root/")
|
|
}
|
|
|
|
func guestSessionScriptPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "run.sh"))
|
|
}
|
|
|
|
func guestSessionPIDPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "pid"))
|
|
}
|
|
|
|
func guestSessionMonitorPIDPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "monitor_pid"))
|
|
}
|
|
|
|
func guestSessionExitCodePath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "exit_code"))
|
|
}
|
|
|
|
func guestSessionStdinPipePath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stdin.pipe"))
|
|
}
|
|
|
|
func guestSessionStdinKeepalivePIDPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stdin_keepalive.pid"))
|
|
}
|
|
|
|
func guestSessionStatusPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "status"))
|
|
}
|
|
|
|
func guestSessionErrorPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "error"))
|
|
}
|
|
|
|
func guestSessionStdoutLogPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stdout.log"))
|
|
}
|
|
|
|
func guestSessionStderrLogPath(id string) string {
|
|
return filepath.ToSlash(filepath.Join(guestSessionStateDir(id), "stderr.log"))
|
|
}
|
|
|
|
func defaultGuestSessionName(id, command, explicit string) string {
|
|
if trimmed := strings.TrimSpace(explicit); trimmed != "" {
|
|
return trimmed
|
|
}
|
|
base := filepath.Base(strings.TrimSpace(command))
|
|
if base == "." || base == string(filepath.Separator) || base == "" {
|
|
base = "session"
|
|
}
|
|
return base + "-" + system.ShortID(id)
|
|
}
|
|
|
|
func defaultGuestSessionCWD(value string) string {
|
|
if trimmed := strings.TrimSpace(value); trimmed != "" {
|
|
return trimmed
|
|
}
|
|
return "/root"
|
|
}
|
|
|
|
func failGuestSessionLaunch(session model.GuestSession, stage, message, rawLog string) model.GuestSession {
|
|
now := model.Now()
|
|
session.Status = model.GuestSessionStatusFailed
|
|
session.LastError = strings.TrimSpace(message)
|
|
session.Attachable = false
|
|
session.Reattachable = false
|
|
session.LaunchStage = strings.TrimSpace(stage)
|
|
session.LaunchMessage = strings.TrimSpace(message)
|
|
session.LaunchRawLog = strings.TrimSpace(rawLog)
|
|
session.UpdatedAt = now
|
|
session.EndedAt = now
|
|
return session
|
|
}
|
|
|
|
func normalizeGuestSessionRequiredCommands(command string, extras []string) []string {
|
|
ordered := make([]string, 0, len(extras)+1)
|
|
seen := map[string]struct{}{}
|
|
appendValue := func(value string) {
|
|
trimmed := strings.TrimSpace(value)
|
|
if trimmed == "" {
|
|
return
|
|
}
|
|
if _, ok := seen[trimmed]; ok {
|
|
return
|
|
}
|
|
seen[trimmed] = struct{}{}
|
|
ordered = append(ordered, trimmed)
|
|
}
|
|
appendValue(command)
|
|
for _, extra := range extras {
|
|
appendValue(extra)
|
|
}
|
|
return ordered
|
|
}
|
|
|
|
func guestSessionCWDPreflightScript(cwd string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
fmt.Fprintf(&script, "DIR=%s\n", guestShellQuote(defaultGuestSessionCWD(cwd)))
|
|
script.WriteString("if [ ! -d \"$DIR\" ]; then echo \"missing cwd: $DIR\"; exit 1; fi\n")
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionCommandPreflightScript(commands []string) string {
|
|
var script strings.Builder
|
|
script.WriteString("set -euo pipefail\n")
|
|
script.WriteString("check_command() {\n")
|
|
script.WriteString(" cmd=\"$1\"\n")
|
|
script.WriteString(" case \"$cmd\" in\n")
|
|
script.WriteString(" */*) [ -x \"$cmd\" ] || { echo \"missing command: $cmd\"; exit 1; } ;;\n")
|
|
script.WriteString(" *) command -v \"$cmd\" >/dev/null 2>&1 || { echo \"missing command: $cmd\"; exit 1; } ;;\n")
|
|
script.WriteString(" esac\n")
|
|
script.WriteString("}\n")
|
|
for _, command := range commands {
|
|
fmt.Fprintf(&script, "check_command %s\n", guestShellQuote(command))
|
|
}
|
|
return script.String()
|
|
}
|
|
|
|
func guestSessionAttachInputCommand(sessionID string) string {
|
|
path := guestSessionStdinPipePath(sessionID)
|
|
return "bash -lc " + guestShellQuote(fmt.Sprintf("set -euo pipefail\n[ -p %s ] || mkfifo -m 600 %s\nexec cat > %s\n", guestShellQuote(path), guestShellQuote(path), guestShellQuote(path)))
|
|
}
|
|
|
|
func guestSessionAttachTailCommand(path string) string {
|
|
return "bash -lc " + guestShellQuote(fmt.Sprintf("set -euo pipefail\ntouch %s\nexec tail -n 0 -F %s 2>/dev/null\n", guestShellQuote(path), guestShellQuote(path)))
|
|
}
|
|
|
|
func guestSessionEnvLines(values map[string]string) []string {
|
|
if len(values) == 0 {
|
|
return nil
|
|
}
|
|
keys := make([]string, 0, len(values))
|
|
for key := range values {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
lines := make([]string, 0, len(keys))
|
|
for _, key := range keys {
|
|
lines = append(lines, "export "+key+"="+guestShellQuote(values[key]))
|
|
}
|
|
return lines
|
|
}
|
|
|
|
func guestShellQuote(value string) string {
|
|
return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'"
|
|
}
|
|
|
|
func parseGuestSessionState(raw string) (guestSessionStateSnapshot, error) {
|
|
var snapshot guestSessionStateSnapshot
|
|
scanner := bufio.NewScanner(strings.NewReader(raw))
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
key, value, ok := strings.Cut(line, "=")
|
|
if !ok {
|
|
continue
|
|
}
|
|
switch strings.TrimSpace(key) {
|
|
case "status":
|
|
snapshot.Status = strings.TrimSpace(value)
|
|
case "pid":
|
|
if pid, err := strconv.Atoi(strings.TrimSpace(value)); err == nil {
|
|
snapshot.GuestPID = pid
|
|
}
|
|
case "exit":
|
|
if exitCode, err := strconv.Atoi(strings.TrimSpace(value)); err == nil {
|
|
snapshot.ExitCode = &exitCode
|
|
}
|
|
case "alive":
|
|
snapshot.Alive = strings.TrimSpace(value) == "true"
|
|
case "error":
|
|
snapshot.LastError = strings.TrimSpace(value)
|
|
}
|
|
}
|
|
return snapshot, scanner.Err()
|
|
}
|
|
|
|
func guestSessionExitCode(err error) (int, bool) {
|
|
if err == nil {
|
|
return 0, true
|
|
}
|
|
var exitErr *ssh.ExitError
|
|
if errors.As(err, &exitErr) {
|
|
return exitErr.ExitStatus(), true
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func cloneStringMap(values map[string]string) map[string]string {
|
|
if len(values) == 0 {
|
|
return nil
|
|
}
|
|
cloned := make(map[string]string, len(values))
|
|
for key, value := range values {
|
|
cloned[key] = value
|
|
}
|
|
return cloned
|
|
}
|
|
|
|
func tailFileContent(path string, lines int) (string, error) {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return "", nil
|
|
}
|
|
return "", err
|
|
}
|
|
if lines <= 0 {
|
|
return string(data), nil
|
|
}
|
|
parts := strings.Split(string(data), "\n")
|
|
if len(parts) <= lines {
|
|
return string(data), nil
|
|
}
|
|
return strings.Join(parts[len(parts)-lines-1:], "\n"), nil
|
|
}
|
|
|
|
func processAlive(pid int) bool {
|
|
if pid <= 0 {
|
|
return false
|
|
}
|
|
return syscallKill(pid, syscall.Signal(0)) == nil
|
|
}
|
|
|
|
var syscallKill = func(pid int, signal os.Signal) error {
|
|
proc, err := os.FindProcess(pid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return proc.Signal(signal)
|
|
}
|
|
|
|
func formatGuestSessionStepError(action string, err error, log string) error {
|
|
log = strings.TrimSpace(log)
|
|
if log == "" {
|
|
return fmt.Errorf("%s: %w", action, err)
|
|
}
|
|
return fmt.Errorf("%s: %w: %s", action, err, log)
|
|
}
|
|
|
|
func guestSessionStateChanged(before, after model.GuestSession) bool {
|
|
if before.Status != after.Status || before.GuestPID != after.GuestPID || before.LastError != after.LastError || before.Attachable != after.Attachable || before.Reattachable != after.Reattachable || before.AttachBackend != after.AttachBackend || before.AttachMode != after.AttachMode || before.LaunchStage != after.LaunchStage || before.LaunchMessage != after.LaunchMessage || before.LaunchRawLog != after.LaunchRawLog {
|
|
return true
|
|
}
|
|
if before.StartedAt != after.StartedAt || before.EndedAt != after.EndedAt {
|
|
return true
|
|
}
|
|
switch {
|
|
case before.ExitCode == nil && after.ExitCode == nil:
|
|
return false
|
|
case before.ExitCode == nil || after.ExitCode == nil:
|
|
return true
|
|
default:
|
|
return *before.ExitCode != *after.ExitCode
|
|
}
|
|
}
|