daemon split (3/5): extract *WorkspaceService service
Third phase of splitting the daemon god-struct. WorkspaceService now
owns workspace.prepare / workspace.export plus the ssh-key +
git-identity + arbitrary-file sync that runs as part of VM start's
prepare_work_disk capability hook. workspaceLocks (the per-VM tar
serialisation set) lives on the service.
workspace.go and vm_authsync.go flipped receivers from *Daemon to
*WorkspaceService. The workspaceInspectRepo / workspaceImport test
seams moved onto the service as fields.
Peer-service dependencies go through narrow function-typed fields:
vmResolver, aliveChecker, waitGuestSSH, dialGuest, imageResolver,
imageWorkSeed, withVMLockByRef, beginOperation. WorkspaceService
never touches VMService / HostNetwork / ImageService directly —
only the exact operations the Daemon hands it at construction.
Daemon lazy-init helper workspaceSvc() mirrors the Phase 1/2
pattern. Test literals still write `&Daemon{store: db, runner: r}`
and get a wired workspace service for free. Tests that override the
inspect/import seams (workspace_test.go, ~4 sites) assign them on
d.workspaceSvc() instead of on the daemon literal.
Dispatch in daemon.go: vm.workspace.prepare and vm.workspace.export
now forward one-liners to d.workspaceSvc().
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d7614a3b2b
commit
c0d456e734
8 changed files with 202 additions and 94 deletions
|
|
@ -203,13 +203,13 @@ func (workDiskCapability) PrepareHost(ctx context.Context, d *Daemon, vm *model.
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.ensureAuthorizedKeyOnWorkDisk(ctx, vm, image, prep); err != nil {
|
||||
if err := d.workspaceSvc().ensureAuthorizedKeyOnWorkDisk(ctx, vm, image, prep); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.ensureGitIdentityOnWorkDisk(ctx, vm); err != nil {
|
||||
if err := d.workspaceSvc().ensureGitIdentityOnWorkDisk(ctx, vm); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.runFileSync(ctx, vm)
|
||||
return d.workspaceSvc().runFileSync(ctx, vm)
|
||||
}
|
||||
|
||||
func (workDiskCapability) AddDoctorChecks(_ context.Context, d *Daemon, report *system.Report) {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"banger/internal/buildinfo"
|
||||
"banger/internal/config"
|
||||
"banger/internal/daemon/opstate"
|
||||
ws "banger/internal/daemon/workspace"
|
||||
"banger/internal/model"
|
||||
"banger/internal/paths"
|
||||
"banger/internal/rpc"
|
||||
|
|
@ -48,19 +47,18 @@ type Daemon struct {
|
|||
// See internal/daemon/vm_handles.go — persistent durable state
|
||||
// lives in the store, this is rebuildable from a per-VM
|
||||
// handles.json scratch file and OS inspection.
|
||||
handles *handleCache
|
||||
net *HostNetwork
|
||||
img *ImageService
|
||||
closing chan struct{}
|
||||
once sync.Once
|
||||
pid int
|
||||
listener net.Listener
|
||||
vmCaps []vmCapability
|
||||
requestHandler func(context.Context, rpc.Request) rpc.Response
|
||||
guestWaitForSSH func(context.Context, string, string, time.Duration) error
|
||||
guestDial func(context.Context, string, string) (guestSSHClient, error)
|
||||
workspaceInspectRepo func(ctx context.Context, sourcePath, branchName, fromRef string) (ws.RepoSpec, error)
|
||||
workspaceImport func(ctx context.Context, client ws.GuestClient, spec ws.RepoSpec, guestPath string, mode model.WorkspacePrepareMode) error
|
||||
handles *handleCache
|
||||
net *HostNetwork
|
||||
img *ImageService
|
||||
ws *WorkspaceService
|
||||
closing chan struct{}
|
||||
once sync.Once
|
||||
pid int
|
||||
listener net.Listener
|
||||
vmCaps []vmCapability
|
||||
requestHandler func(context.Context, rpc.Request) rpc.Response
|
||||
guestWaitForSSH func(context.Context, string, string, time.Duration) error
|
||||
guestDial func(context.Context, string, string) (guestSSHClient, error)
|
||||
}
|
||||
|
||||
func Open(ctx context.Context) (d *Daemon, err error) {
|
||||
|
|
@ -427,14 +425,14 @@ func (d *Daemon) dispatch(ctx context.Context, req rpc.Request) rpc.Response {
|
|||
if err != nil {
|
||||
return rpc.NewError("bad_request", err.Error())
|
||||
}
|
||||
workspace, err := d.PrepareVMWorkspace(ctx, params)
|
||||
workspace, err := d.workspaceSvc().PrepareVMWorkspace(ctx, params)
|
||||
return marshalResultOrError(api.VMWorkspacePrepareResult{Workspace: workspace}, err)
|
||||
case "vm.workspace.export":
|
||||
params, err := rpc.DecodeParams[api.WorkspaceExportParams](req)
|
||||
if err != nil {
|
||||
return rpc.NewError("bad_request", err.Error())
|
||||
}
|
||||
result, err := d.ExportVMWorkspace(ctx, params)
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, params)
|
||||
return marshalResultOrError(result, err)
|
||||
case "image.list":
|
||||
images, err := d.store.ListImages(ctx)
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ func TestEnsureAuthorizedKeyOnWorkDiskSkipsRepairForMatchingSeededFingerprint(t
|
|||
vm.Runtime.WorkDiskPath = filepath.Join(t.TempDir(), "root.ext4")
|
||||
image := model.Image{SeededSSHPublicKeyFingerprint: fingerprint}
|
||||
|
||||
if err := d.ensureAuthorizedKeyOnWorkDisk(context.Background(), &vm, image, workDiskPreparation{ClonedFromSeed: true}); err != nil {
|
||||
if err := d.workspaceSvc().ensureAuthorizedKeyOnWorkDisk(context.Background(), &vm, image, workDiskPreparation{ClonedFromSeed: true}); err != nil {
|
||||
t.Fatalf("ensureAuthorizedKeyOnWorkDisk: %v", err)
|
||||
}
|
||||
runner.assertExhausted()
|
||||
|
|
|
|||
|
|
@ -23,8 +23,8 @@ type gitIdentity struct {
|
|||
Email string
|
||||
}
|
||||
|
||||
func (d *Daemon) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VMRecord, image model.Image, prep workDiskPreparation) error {
|
||||
fingerprint, err := guest.AuthorizedPublicKeyFingerprint(d.config.SSHKeyPath)
|
||||
func (s *WorkspaceService) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VMRecord, image model.Image, prep workDiskPreparation) error {
|
||||
fingerprint, err := guest.AuthorizedPublicKeyFingerprint(s.config.SSHKeyPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("derive authorized ssh key fingerprint: %w", err)
|
||||
}
|
||||
|
|
@ -32,18 +32,18 @@ func (d *Daemon) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VM
|
|||
vmCreateStage(ctx, "prepare_work_disk", "using seeded SSH access")
|
||||
return nil
|
||||
}
|
||||
publicKey, err := guest.AuthorizedPublicKey(d.config.SSHKeyPath)
|
||||
publicKey, err := guest.AuthorizedPublicKey(s.config.SSHKeyPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("derive authorized ssh key: %w", err)
|
||||
}
|
||||
vmCreateStage(ctx, "prepare_work_disk", "provisioning SSH access on work disk")
|
||||
workMount, cleanupWork, err := system.MountTempDir(ctx, d.runner, vm.Runtime.WorkDiskPath, false)
|
||||
workMount, cleanupWork, err := system.MountTempDir(ctx, s.runner, vm.Runtime.WorkDiskPath, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cleanupWork()
|
||||
|
||||
if err := d.flattenNestedWorkHome(ctx, workMount); err != nil {
|
||||
if err := flattenNestedWorkHome(ctx, s.runner, workMount); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -51,23 +51,23 @@ func (d *Daemon) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VM
|
|||
// mounts at /root, which sshd inspects when StrictModes is on (the
|
||||
// default after the hardening drop-in). Any drift — owner != root,
|
||||
// group/other-writable — would make sshd silently reject the key.
|
||||
if err := normaliseHomeDirPerms(ctx, d.runner, workMount); err != nil {
|
||||
if err := normaliseHomeDirPerms(ctx, s.runner, workMount); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sshDir := filepath.Join(workMount, ".ssh")
|
||||
if _, err := d.runner.RunSudo(ctx, "mkdir", "-p", sshDir); err != nil {
|
||||
if _, err := s.runner.RunSudo(ctx, "mkdir", "-p", sshDir); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := d.runner.RunSudo(ctx, "chmod", "700", sshDir); err != nil {
|
||||
if _, err := s.runner.RunSudo(ctx, "chmod", "700", sshDir); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := d.runner.RunSudo(ctx, "chown", "0:0", sshDir); err != nil {
|
||||
if _, err := s.runner.RunSudo(ctx, "chown", "0:0", sshDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
authorizedKeysPath := filepath.Join(sshDir, "authorized_keys")
|
||||
existing, err := d.runner.RunSudo(ctx, "cat", authorizedKeysPath)
|
||||
existing, err := s.runner.RunSudo(ctx, "cat", authorizedKeysPath)
|
||||
if err != nil {
|
||||
existing = nil
|
||||
}
|
||||
|
|
@ -89,12 +89,12 @@ func (d *Daemon) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VM
|
|||
}
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
if _, err := d.runner.RunSudo(ctx, "install", "-m", "600", tmpPath, authorizedKeysPath); err != nil {
|
||||
if _, err := s.runner.RunSudo(ctx, "install", "-m", "600", tmpPath, authorizedKeysPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if prep.ClonedFromSeed && image.Managed {
|
||||
vmCreateStage(ctx, "prepare_work_disk", "refreshing managed work seed")
|
||||
if err := d.imageSvc().refreshManagedWorkSeedFingerprint(ctx, image, fingerprint); err != nil {
|
||||
if err := s.imageWorkSeed(ctx, image, fingerprint); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -120,15 +120,15 @@ func normaliseHomeDirPerms(ctx context.Context, runner system.CommandRunner, wor
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) ensureGitIdentityOnWorkDisk(ctx context.Context, vm *model.VMRecord) error {
|
||||
runner := d.runner
|
||||
func (s *WorkspaceService) ensureGitIdentityOnWorkDisk(ctx context.Context, vm *model.VMRecord) error {
|
||||
runner := s.runner
|
||||
if runner == nil {
|
||||
runner = system.NewRunner()
|
||||
}
|
||||
|
||||
identity, err := resolveHostGlobalGitIdentity(ctx, runner)
|
||||
if err != nil {
|
||||
d.warnGitIdentitySyncSkipped(*vm, hostGlobalGitIdentitySource, err)
|
||||
s.warnGitIdentitySyncSkipped(*vm, hostGlobalGitIdentitySource, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -139,7 +139,7 @@ func (d *Daemon) ensureGitIdentityOnWorkDisk(ctx context.Context, vm *model.VMRe
|
|||
}
|
||||
defer cleanupWork()
|
||||
|
||||
if err := d.flattenNestedWorkHome(ctx, workMount); err != nil {
|
||||
if err := flattenNestedWorkHome(ctx, s.runner, workMount); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -155,12 +155,12 @@ func (d *Daemon) ensureGitIdentityOnWorkDisk(ctx context.Context, vm *model.VMRe
|
|||
// Directory entries: walked in Go — each file is installed with its
|
||||
// source permissions, each subdir is mkdir'd. The entry's `mode`
|
||||
// field is only honoured for file entries.
|
||||
func (d *Daemon) runFileSync(ctx context.Context, vm *model.VMRecord) error {
|
||||
if len(d.config.FileSync) == 0 {
|
||||
func (s *WorkspaceService) runFileSync(ctx context.Context, vm *model.VMRecord) error {
|
||||
if len(s.config.FileSync) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
runner := d.runner
|
||||
runner := s.runner
|
||||
if runner == nil {
|
||||
runner = system.NewRunner()
|
||||
}
|
||||
|
|
@ -183,7 +183,7 @@ func (d *Daemon) runFileSync(ctx context.Context, vm *model.VMRecord) error {
|
|||
}
|
||||
workMount = m
|
||||
cleanupWork = c
|
||||
if err := d.flattenNestedWorkHome(ctx, workMount); err != nil {
|
||||
if err := flattenNestedWorkHome(ctx, s.runner, workMount); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return workMount, nil
|
||||
|
|
@ -194,14 +194,14 @@ func (d *Daemon) runFileSync(ctx context.Context, vm *model.VMRecord) error {
|
|||
}
|
||||
}()
|
||||
|
||||
for _, entry := range d.config.FileSync {
|
||||
for _, entry := range s.config.FileSync {
|
||||
hostPath := expandHostPath(entry.Host, hostHome)
|
||||
guestRel := guestPathRelativeToRoot(entry.Guest)
|
||||
|
||||
info, err := os.Stat(hostPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
d.warnFileSyncSkipped(*vm, hostPath, err)
|
||||
s.warnFileSyncSkipped(*vm, hostPath, err)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("file_sync: stat %s: %w", hostPath, err)
|
||||
|
|
@ -365,18 +365,18 @@ func writeGitIdentity(ctx context.Context, runner system.CommandRunner, gitConfi
|
|||
return err
|
||||
}
|
||||
|
||||
func (d *Daemon) warnFileSyncSkipped(vm model.VMRecord, hostPath string, err error) {
|
||||
if d.logger == nil || err == nil {
|
||||
func (s *WorkspaceService) warnFileSyncSkipped(vm model.VMRecord, hostPath string, err error) {
|
||||
if s.logger == nil || err == nil {
|
||||
return
|
||||
}
|
||||
d.logger.Warn("file_sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...)
|
||||
s.logger.Warn("file_sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...)
|
||||
}
|
||||
|
||||
func (d *Daemon) warnGitIdentitySyncSkipped(vm model.VMRecord, source string, err error) {
|
||||
if d.logger == nil || err == nil {
|
||||
func (s *WorkspaceService) warnGitIdentitySyncSkipped(vm model.VMRecord, source string, err error) {
|
||||
if s.logger == nil || err == nil {
|
||||
return
|
||||
}
|
||||
d.logger.Warn("guest git identity sync skipped", append(vmLogAttrs(vm), "source", source, "error", err.Error())...)
|
||||
s.logger.Warn("guest git identity sync skipped", append(vmLogAttrs(vm), "source", source, "error", err.Error())...)
|
||||
}
|
||||
|
||||
func mergeAuthorizedKey(existing, managed []byte) []byte {
|
||||
|
|
|
|||
|
|
@ -811,7 +811,7 @@ func TestEnsureAuthorizedKeyOnWorkDiskRepairsNestedRootLayout(t *testing.T) {
|
|||
vm := testVM("seed-repair", "image-seed-repair", "172.16.0.61")
|
||||
vm.Runtime.WorkDiskPath = workDiskDir
|
||||
|
||||
if err := d.ensureAuthorizedKeyOnWorkDisk(context.Background(), &vm, model.Image{}, workDiskPreparation{}); err != nil {
|
||||
if err := d.workspaceSvc().ensureAuthorizedKeyOnWorkDisk(context.Background(), &vm, model.Image{}, workDiskPreparation{}); err != nil {
|
||||
t.Fatalf("ensureAuthorizedKeyOnWorkDisk: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(filepath.Join(workDiskDir, "root")); !os.IsNotExist(err) {
|
||||
|
|
@ -848,7 +848,7 @@ func TestEnsureGitIdentityOnWorkDiskCopiesHostGlobalIdentity(t *testing.T) {
|
|||
vm := testVM("git-identity", "image-git-identity", "172.16.0.67")
|
||||
vm.Runtime.WorkDiskPath = workDiskDir
|
||||
|
||||
if err := d.ensureGitIdentityOnWorkDisk(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().ensureGitIdentityOnWorkDisk(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("ensureGitIdentityOnWorkDisk: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -881,7 +881,7 @@ func TestEnsureGitIdentityOnWorkDiskPreservesExistingGuestConfig(t *testing.T) {
|
|||
vm := testVM("git-identity-preserve", "image-git-identity", "172.16.0.68")
|
||||
vm.Runtime.WorkDiskPath = workDiskDir
|
||||
|
||||
if err := d.ensureGitIdentityOnWorkDisk(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().ensureGitIdentityOnWorkDisk(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("ensureGitIdentityOnWorkDisk: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -925,7 +925,7 @@ func TestEnsureGitIdentityOnWorkDiskWarnsAndSkipsWhenHostIdentityIncomplete(t *t
|
|||
vm := testVM("git-identity-missing", "image-git-identity", "172.16.0.69")
|
||||
vm.Runtime.WorkDiskPath = workDiskDir
|
||||
|
||||
if err := d.ensureGitIdentityOnWorkDisk(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().ensureGitIdentityOnWorkDisk(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("ensureGitIdentityOnWorkDisk: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -951,7 +951,7 @@ func TestEnsureGitIdentityOnWorkDiskWarnsAndSkipsWhenHostIdentityIncomplete(t *t
|
|||
func TestRunFileSyncNoOpWhenConfigEmpty(t *testing.T) {
|
||||
d := &Daemon{runner: &filesystemRunner{t: t}}
|
||||
vm := testVM("no-sync", "image", "172.16.0.70")
|
||||
if err := d.runFileSync(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().runFileSync(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("runFileSync: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -979,7 +979,7 @@ func TestRunFileSyncCopiesFile(t *testing.T) {
|
|||
}
|
||||
vm := testVM("sync-file", "image", "172.16.0.71")
|
||||
vm.Runtime.WorkDiskPath = workDisk
|
||||
if err := d.runFileSync(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().runFileSync(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("runFileSync: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -1019,7 +1019,7 @@ func TestRunFileSyncRespectsCustomMode(t *testing.T) {
|
|||
}
|
||||
vm := testVM("sync-mode", "image", "172.16.0.72")
|
||||
vm.Runtime.WorkDiskPath = workDisk
|
||||
if err := d.runFileSync(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().runFileSync(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("runFileSync: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -1054,7 +1054,7 @@ func TestRunFileSyncSkipsMissingHostPath(t *testing.T) {
|
|||
}
|
||||
vm := testVM("sync-missing", "image", "172.16.0.73")
|
||||
vm.Runtime.WorkDiskPath = workDisk
|
||||
if err := d.runFileSync(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().runFileSync(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("runFileSync: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -1093,7 +1093,7 @@ func TestRunFileSyncOverwritesExistingGuestFile(t *testing.T) {
|
|||
}
|
||||
vm := testVM("sync-overwrite", "image", "172.16.0.74")
|
||||
vm.Runtime.WorkDiskPath = workDisk
|
||||
if err := d.runFileSync(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().runFileSync(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("runFileSync: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -1135,7 +1135,7 @@ func TestRunFileSyncCopiesDirectoryRecursively(t *testing.T) {
|
|||
}
|
||||
vm := testVM("sync-dir", "image", "172.16.0.75")
|
||||
vm.Runtime.WorkDiskPath = workDisk
|
||||
if err := d.runFileSync(context.Background(), &vm); err != nil {
|
||||
if err := d.workspaceSvc().runFileSync(context.Background(), &vm); err != nil {
|
||||
t.Fatalf("runFileSync: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,42 +17,42 @@ import (
|
|||
// workspaceInspectRepoHook + workspaceImportHook dispatch through the
|
||||
// per-instance Daemon seams when set, falling back to the real
|
||||
// workspace package implementations. Keeping the fallbacks here (as
|
||||
// opposed to always requiring callers to populate d.workspaceInspectRepo
|
||||
// opposed to always requiring callers to populate s.workspaceInspectRepo
|
||||
// in a constructor) lets tests selectively override one hook without
|
||||
// having to wire both.
|
||||
func (d *Daemon) workspaceInspectRepoHook(ctx context.Context, sourcePath, branchName, fromRef string) (ws.RepoSpec, error) {
|
||||
if d != nil && d.workspaceInspectRepo != nil {
|
||||
return d.workspaceInspectRepo(ctx, sourcePath, branchName, fromRef)
|
||||
func (s *WorkspaceService) workspaceInspectRepoHook(ctx context.Context, sourcePath, branchName, fromRef string) (ws.RepoSpec, error) {
|
||||
if s != nil && s.workspaceInspectRepo != nil {
|
||||
return s.workspaceInspectRepo(ctx, sourcePath, branchName, fromRef)
|
||||
}
|
||||
return ws.InspectRepo(ctx, sourcePath, branchName, fromRef)
|
||||
}
|
||||
|
||||
func (d *Daemon) workspaceImportHook(ctx context.Context, client ws.GuestClient, spec ws.RepoSpec, guestPath string, mode model.WorkspacePrepareMode) error {
|
||||
if d != nil && d.workspaceImport != nil {
|
||||
return d.workspaceImport(ctx, client, spec, guestPath, mode)
|
||||
func (s *WorkspaceService) workspaceImportHook(ctx context.Context, client ws.GuestClient, spec ws.RepoSpec, guestPath string, mode model.WorkspacePrepareMode) error {
|
||||
if s != nil && s.workspaceImport != nil {
|
||||
return s.workspaceImport(ctx, client, spec, guestPath, mode)
|
||||
}
|
||||
return ws.ImportRepoToGuest(ctx, client, spec, guestPath, mode)
|
||||
}
|
||||
|
||||
func (d *Daemon) ExportVMWorkspace(ctx context.Context, params api.WorkspaceExportParams) (api.WorkspaceExportResult, error) {
|
||||
func (s *WorkspaceService) ExportVMWorkspace(ctx context.Context, params api.WorkspaceExportParams) (api.WorkspaceExportResult, error) {
|
||||
guestPath := strings.TrimSpace(params.GuestPath)
|
||||
if guestPath == "" {
|
||||
guestPath = "/root/repo"
|
||||
}
|
||||
vm, err := d.FindVM(ctx, params.IDOrName)
|
||||
vm, err := s.vmResolver(ctx, params.IDOrName)
|
||||
if err != nil {
|
||||
return api.WorkspaceExportResult{}, err
|
||||
}
|
||||
if !d.vmAlive(vm) {
|
||||
if !s.aliveChecker(vm) {
|
||||
return api.WorkspaceExportResult{}, fmt.Errorf("vm %q is not running", vm.Name)
|
||||
}
|
||||
// Serialise with any in-flight workspace.prepare on the same VM so
|
||||
// we never snapshot a half-streamed tar. Does not block vm stop /
|
||||
// delete / restart — those only take the VM mutex.
|
||||
unlock := d.workspaceLocks.lock(vm.ID)
|
||||
unlock := s.workspaceLocks.lock(vm.ID)
|
||||
defer unlock()
|
||||
|
||||
client, err := d.dialGuest(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"))
|
||||
client, err := s.dialGuest(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"))
|
||||
if err != nil {
|
||||
return api.WorkspaceExportResult{}, fmt.Errorf("dial guest: %w", err)
|
||||
}
|
||||
|
|
@ -120,7 +120,7 @@ func exportScript(guestPath, diffRef, diffFlag string) string {
|
|||
)
|
||||
}
|
||||
|
||||
func (d *Daemon) PrepareVMWorkspace(ctx context.Context, params api.VMWorkspacePrepareParams) (model.WorkspacePrepareResult, error) {
|
||||
func (s *WorkspaceService) PrepareVMWorkspace(ctx context.Context, params api.VMWorkspacePrepareParams) (model.WorkspacePrepareResult, error) {
|
||||
mode, err := ws.ParsePrepareMode(params.Mode)
|
||||
if err != nil {
|
||||
return model.WorkspacePrepareResult{}, err
|
||||
|
|
@ -142,8 +142,8 @@ func (d *Daemon) PrepareVMWorkspace(ctx context.Context, params api.VMWorkspaceP
|
|||
// and snapshot the fields we need (IP, PID, api sock). Release it
|
||||
// before any SSH or tar I/O so this slow operation cannot block
|
||||
// vm stop / vm delete / vm restart on the same VM.
|
||||
vm, err := d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
|
||||
if !d.vmAlive(vm) {
|
||||
vm, err := s.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
|
||||
if !s.aliveChecker(vm) {
|
||||
return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name)
|
||||
}
|
||||
return vm, nil
|
||||
|
|
@ -157,17 +157,17 @@ func (d *Daemon) PrepareVMWorkspace(ctx context.Context, params api.VMWorkspaceP
|
|||
// block lifecycle ops. If the VM gets stopped or deleted mid-
|
||||
// flight, the SSH dial or stream will fail naturally; ctx
|
||||
// cancellation propagates through.
|
||||
unlock := d.workspaceLocks.lock(vm.ID)
|
||||
unlock := s.workspaceLocks.lock(vm.ID)
|
||||
defer unlock()
|
||||
|
||||
return d.prepareVMWorkspaceGuestIO(ctx, vm, strings.TrimSpace(params.SourcePath), guestPath, branchName, fromRef, mode, params.ReadOnly)
|
||||
return s.prepareVMWorkspaceGuestIO(ctx, vm, strings.TrimSpace(params.SourcePath), guestPath, branchName, fromRef, mode, params.ReadOnly)
|
||||
}
|
||||
|
||||
// prepareVMWorkspaceGuestIO performs the actual guest-side work:
|
||||
// inspect the local repo, dial SSH, stream the tar, optionally chmod
|
||||
// readonly. It is called without holding the VM mutex.
|
||||
func (d *Daemon) prepareVMWorkspaceGuestIO(ctx context.Context, vm model.VMRecord, sourcePath, guestPath, branchName, fromRef string, mode model.WorkspacePrepareMode, readOnly bool) (model.WorkspacePrepareResult, error) {
|
||||
spec, err := d.workspaceInspectRepoHook(ctx, sourcePath, branchName, fromRef)
|
||||
func (s *WorkspaceService) prepareVMWorkspaceGuestIO(ctx context.Context, vm model.VMRecord, sourcePath, guestPath, branchName, fromRef string, mode model.WorkspacePrepareMode, readOnly bool) (model.WorkspacePrepareResult, error) {
|
||||
spec, err := s.workspaceInspectRepoHook(ctx, sourcePath, branchName, fromRef)
|
||||
if err != nil {
|
||||
return model.WorkspacePrepareResult{}, err
|
||||
}
|
||||
|
|
@ -175,15 +175,15 @@ func (d *Daemon) prepareVMWorkspaceGuestIO(ctx context.Context, vm model.VMRecor
|
|||
return model.WorkspacePrepareResult{}, fmt.Errorf("workspace mode %q does not support git submodules in %s (%s); use --mode full_copy", mode, spec.RepoRoot, strings.Join(spec.Submodules, ", "))
|
||||
}
|
||||
address := net.JoinHostPort(vm.Runtime.GuestIP, "22")
|
||||
if err := d.waitForGuestSSH(ctx, address, 250*time.Millisecond); err != nil {
|
||||
if err := s.waitGuestSSH(ctx, address, 250*time.Millisecond); err != nil {
|
||||
return model.WorkspacePrepareResult{}, fmt.Errorf("guest ssh unavailable: %w", err)
|
||||
}
|
||||
client, err := d.dialGuest(ctx, address)
|
||||
client, err := s.dialGuest(ctx, address)
|
||||
if err != nil {
|
||||
return model.WorkspacePrepareResult{}, fmt.Errorf("dial guest ssh: %w", err)
|
||||
}
|
||||
defer client.Close()
|
||||
if err := d.workspaceImportHook(ctx, client, spec, guestPath, mode); err != nil {
|
||||
if err := s.workspaceImportHook(ctx, client, spec, guestPath, mode); err != nil {
|
||||
return model.WorkspacePrepareResult{}, err
|
||||
}
|
||||
if readOnly {
|
||||
|
|
|
|||
110
internal/daemon/workspace_service.go
Normal file
110
internal/daemon/workspace_service.go
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
ws "banger/internal/daemon/workspace"
|
||||
"banger/internal/model"
|
||||
"banger/internal/paths"
|
||||
"banger/internal/store"
|
||||
"banger/internal/system"
|
||||
)
|
||||
|
||||
// WorkspaceService owns workspace.prepare / workspace.export plus the
|
||||
// ssh-key + git-identity sync that runs as part of VM start's
|
||||
// prepare_work_disk capability hook. The workspaceLocks set lives here
|
||||
// so its scope (serialise concurrent tar imports on the same VM) is
|
||||
// obvious at the field definition.
|
||||
//
|
||||
// The inspect/import test seams are per-service fields so tests inject
|
||||
// fakes without mutating package-level state.
|
||||
type WorkspaceService struct {
|
||||
runner system.CommandRunner
|
||||
logger *slog.Logger
|
||||
config model.DaemonConfig
|
||||
layout paths.Layout
|
||||
store *store.Store
|
||||
|
||||
// workspaceLocks serialises concurrent workspace.prepare /
|
||||
// workspace.export on the same VM. Separate from vmLocks so slow
|
||||
// guest I/O doesn't block lifecycle ops.
|
||||
workspaceLocks vmLockSet
|
||||
|
||||
// Peer-service access via narrow function-typed dependencies.
|
||||
// WorkspaceService doesn't hold pointers to the full VMService or
|
||||
// HostNetwork; it only sees the exact operations it needs.
|
||||
vmResolver func(ctx context.Context, idOrName string) (model.VMRecord, error)
|
||||
aliveChecker func(vm model.VMRecord) bool
|
||||
waitGuestSSH func(ctx context.Context, address string, interval time.Duration) error
|
||||
dialGuest func(ctx context.Context, address string) (guestSSHClient, error)
|
||||
imageResolver func(ctx context.Context, idOrName string) (model.Image, error)
|
||||
imageWorkSeed func(ctx context.Context, image model.Image, fingerprint string) error
|
||||
withVMLockByRef func(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error)
|
||||
|
||||
beginOperation func(name string, attrs ...any) *operationLog
|
||||
|
||||
// Test seams.
|
||||
workspaceInspectRepo func(ctx context.Context, sourcePath, branchName, fromRef string) (ws.RepoSpec, error)
|
||||
workspaceImport func(ctx context.Context, client ws.GuestClient, spec ws.RepoSpec, guestPath string, mode model.WorkspacePrepareMode) error
|
||||
}
|
||||
|
||||
type workspaceServiceDeps struct {
|
||||
runner system.CommandRunner
|
||||
logger *slog.Logger
|
||||
config model.DaemonConfig
|
||||
layout paths.Layout
|
||||
store *store.Store
|
||||
vmResolver func(ctx context.Context, idOrName string) (model.VMRecord, error)
|
||||
aliveChecker func(vm model.VMRecord) bool
|
||||
waitGuestSSH func(ctx context.Context, address string, interval time.Duration) error
|
||||
dialGuest func(ctx context.Context, address string) (guestSSHClient, error)
|
||||
imageResolver func(ctx context.Context, idOrName string) (model.Image, error)
|
||||
imageWorkSeed func(ctx context.Context, image model.Image, fingerprint string) error
|
||||
withVMLockByRef func(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error)
|
||||
beginOperation func(name string, attrs ...any) *operationLog
|
||||
}
|
||||
|
||||
func newWorkspaceService(deps workspaceServiceDeps) *WorkspaceService {
|
||||
return &WorkspaceService{
|
||||
runner: deps.runner,
|
||||
logger: deps.logger,
|
||||
config: deps.config,
|
||||
layout: deps.layout,
|
||||
store: deps.store,
|
||||
vmResolver: deps.vmResolver,
|
||||
aliveChecker: deps.aliveChecker,
|
||||
waitGuestSSH: deps.waitGuestSSH,
|
||||
dialGuest: deps.dialGuest,
|
||||
imageResolver: deps.imageResolver,
|
||||
imageWorkSeed: deps.imageWorkSeed,
|
||||
withVMLockByRef: deps.withVMLockByRef,
|
||||
beginOperation: deps.beginOperation,
|
||||
}
|
||||
}
|
||||
|
||||
// workspaceSvc is Daemon's lazy-init getter. Mirrors hostNet() /
|
||||
// imageSvc() so test literals like &Daemon{store: db, runner: r, ...}
|
||||
// still get a functional WorkspaceService without spelling one out.
|
||||
func (d *Daemon) workspaceSvc() *WorkspaceService {
|
||||
if d.ws != nil {
|
||||
return d.ws
|
||||
}
|
||||
d.ws = newWorkspaceService(workspaceServiceDeps{
|
||||
runner: d.runner,
|
||||
logger: d.logger,
|
||||
config: d.config,
|
||||
layout: d.layout,
|
||||
store: d.store,
|
||||
vmResolver: d.FindVM,
|
||||
aliveChecker: d.vmAlive,
|
||||
waitGuestSSH: d.waitForGuestSSH,
|
||||
dialGuest: d.dialGuest,
|
||||
imageResolver: d.FindImage,
|
||||
imageWorkSeed: d.imageSvc().refreshManagedWorkSeedFingerprint,
|
||||
withVMLockByRef: d.withVMLockByRef,
|
||||
beginOperation: d.beginOperation,
|
||||
})
|
||||
return d.ws
|
||||
}
|
||||
|
|
@ -96,7 +96,7 @@ func TestExportVMWorkspace_HappyPath(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
result, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
GuestPath: "/root/repo",
|
||||
})
|
||||
|
|
@ -158,7 +158,7 @@ func TestExportVMWorkspace_WithBaseCommit(t *testing.T) {
|
|||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
const prepareCommit = "abc1234deadbeef"
|
||||
result, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
BaseCommit: prepareCommit,
|
||||
})
|
||||
|
|
@ -204,7 +204,7 @@ func TestExportVMWorkspace_BaseCommitFallsBackToHEAD(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
result, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
BaseCommit: "", // omitted
|
||||
})
|
||||
|
|
@ -244,7 +244,7 @@ func TestExportVMWorkspace_NoChanges(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
result, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -284,7 +284,7 @@ func TestExportVMWorkspace_DefaultGuestPath(t *testing.T) {
|
|||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
// GuestPath omitted — should default to /root/repo.
|
||||
result, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -307,7 +307,7 @@ func TestExportVMWorkspace_VMNotRunning(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
// VM is stopped — no handle seed; vmAlive must return false.
|
||||
|
||||
_, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
_, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
})
|
||||
if err == nil || !strings.Contains(err.Error(), "not running") {
|
||||
|
|
@ -343,7 +343,7 @@ func TestExportVMWorkspace_MultipleChangedFiles(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
result, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{
|
||||
IDOrName: vm.Name,
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -398,10 +398,10 @@ func TestPrepareVMWorkspace_ReleasesVMLockDuringGuestIO(t *testing.T) {
|
|||
// Import blocks until we say go.
|
||||
importStarted := make(chan struct{})
|
||||
releaseImport := make(chan struct{})
|
||||
d.workspaceInspectRepo = func(context.Context, string, string, string) (workspace.RepoSpec, error) {
|
||||
d.workspaceSvc().workspaceInspectRepo = func(context.Context, string, string, string) (workspace.RepoSpec, error) {
|
||||
return workspace.RepoSpec{RepoName: "fake", RepoRoot: "/tmp/fake"}, nil
|
||||
}
|
||||
d.workspaceImport = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error {
|
||||
d.workspaceSvc().workspaceImport = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error {
|
||||
close(importStarted)
|
||||
<-releaseImport
|
||||
return nil
|
||||
|
|
@ -410,7 +410,7 @@ func TestPrepareVMWorkspace_ReleasesVMLockDuringGuestIO(t *testing.T) {
|
|||
// Kick off prepare in a goroutine. It will block inside the import.
|
||||
prepareDone := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := d.PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{
|
||||
_, err := d.workspaceSvc().PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{
|
||||
IDOrName: vm.Name,
|
||||
SourcePath: "/tmp/fake",
|
||||
})
|
||||
|
|
@ -480,7 +480,7 @@ func TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
d.workspaceInspectRepo = func(context.Context, string, string, string) (workspace.RepoSpec, error) {
|
||||
d.workspaceSvc().workspaceInspectRepo = func(context.Context, string, string, string) (workspace.RepoSpec, error) {
|
||||
return workspace.RepoSpec{RepoName: "fake", RepoRoot: "/tmp/fake"}, nil
|
||||
}
|
||||
|
||||
|
|
@ -488,7 +488,7 @@ func TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM(t *testing.T) {
|
|||
var active int32
|
||||
var maxObserved int32
|
||||
release := make(chan struct{})
|
||||
d.workspaceImport = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error {
|
||||
d.workspaceSvc().workspaceImport = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error {
|
||||
n := atomic.AddInt32(&active, 1)
|
||||
for {
|
||||
prev := atomic.LoadInt32(&maxObserved)
|
||||
|
|
@ -505,7 +505,7 @@ func TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM(t *testing.T) {
|
|||
done := make(chan error, n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
_, err := d.PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{
|
||||
_, err := d.workspaceSvc().PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{
|
||||
IDOrName: vm.Name,
|
||||
SourcePath: "/tmp/fake",
|
||||
})
|
||||
|
|
@ -567,7 +567,7 @@ func TestExportVMWorkspace_DoesNotMutateRealIndex(t *testing.T) {
|
|||
upsertDaemonVM(t, ctx, d.store, vm)
|
||||
d.setVMHandlesInMemory(vm.ID, model.VMHandles{PID: firecracker.Process.Pid})
|
||||
|
||||
if _, err := d.ExportVMWorkspace(ctx, api.WorkspaceExportParams{IDOrName: vm.Name}); err != nil {
|
||||
if _, err := d.workspaceSvc().ExportVMWorkspace(ctx, api.WorkspaceExportParams{IDOrName: vm.Name}); err != nil {
|
||||
t.Fatalf("ExportVMWorkspace: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue