Refine CPU load behavior with system memory visibility

This commit is contained in:
Daniel Hiltgen
2024-06-03 19:09:23 -07:00
parent 434dfe30c5
commit fc37c192ae
7 changed files with 211 additions and 98 deletions

View File

@@ -7,7 +7,6 @@ import (
"log/slog"
"reflect"
"runtime"
"slices"
"sort"
"strings"
"sync"
@@ -41,6 +40,7 @@ type Scheduler struct {
loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
getGpuFn func() gpu.GpuInfoList
getCpuFn func() gpu.GpuInfoList
}
var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
@@ -54,6 +54,7 @@ func InitScheduler(ctx context.Context) *Scheduler {
loaded: make(map[string]*runnerRef),
newServerFn: llm.NewLlamaServer,
getGpuFn: gpu.GetGPUInfo,
getCpuFn: gpu.GetCPUInfo,
}
sched.loadFn = sched.load
return sched
@@ -131,7 +132,12 @@ func (s *Scheduler) processPending(ctx context.Context) {
} else {
// Either no models are loaded or below envconfig.MaxRunners
// Get a refreshed GPU list
gpus := s.getGpuFn()
var gpus gpu.GpuInfoList
if pending.opts.NumGPU == 0 {
gpus = s.getCpuFn()
} else {
gpus = s.getGpuFn()
}
// Load model for fitting
ggml, err := llm.LoadModel(pending.model.ModelPath)
@@ -140,16 +146,22 @@ func (s *Scheduler) processPending(ctx context.Context) {
break
}
// If we're CPU only mode, just limit by envconfig.MaxRunners above
// TODO handle system memory exhaustion
if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
slog.Debug("cpu mode with existing models, loading")
s.loadFn(pending, ggml, gpus)
break
}
// No models loaded. Load the model but prefer the best fit.
if loadedCount == 0 {
// Evaluate if the model will fit in the available system memory, or if we should unload a model first
if len(gpus) == 1 && gpus[0].Library == "cpu" {
if loadedCount == 0 {
slog.Debug("cpu mode with first model, loading")
s.loadFn(pending, ggml, gpus)
break
}
runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus)
if runnerToExpire == nil {
slog.Debug("cpu mode with available system memory or first model, loading")
s.loadFn(pending, ggml, gpus)
break
}
// else we need to expire a runner
} else if loadedCount == 0 {
// No models loaded. Load the model but prefer the best fit.
slog.Debug("loading first model", "model", pending.model.ModelPath)
g := pickBestFitGPUs(pending, ggml, gpus)
if g != nil {
@@ -159,16 +171,18 @@ func (s *Scheduler) processPending(ctx context.Context) {
break
}
// More than one loaded model, so we have to see if the new one fits
// Update free memory from currently loaded models
s.updateFreeSpace(gpus)
gpus = pickBestFitGPUs(pending, ggml, gpus)
if gpus != nil {
slog.Debug("new model fits with existing models, loading")
s.loadFn(pending, ggml, gpus)
break
if runnerToExpire == nil {
// More than one loaded model, so we have to see if the new one fits
// Update free memory from currently loaded models
s.updateFreeSpace(gpus)
gpus = pickBestFitGPUs(pending, ggml, gpus)
if gpus != nil {
slog.Debug("new model fits with existing models, loading")
s.loadFn(pending, ggml, gpus)
break
}
runnerToExpire = s.findRunnerToUnload()
}
runnerToExpire = s.findRunnerToUnload()
}
if runnerToExpire == nil {
@@ -368,17 +382,11 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
s.loadedMu.Lock()
for _, r := range s.loaded {
r.refMu.Lock()
gpuIDs := make([]string, 0, len(r.gpus))
if r.llama != nil {
// TODO this should be broken down by GPU instead of assuming uniform spread
estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
for _, gpu := range r.gpus {
gpuIDs = append(gpuIDs, gpu.ID)
}
for _, gpu := range allGpus {
if slices.Contains(gpuIDs, gpu.ID) {
predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
}
// if slices.Contains(gpuIDs, gpu.ID) {
predMap[predKey{gpu.Library, gpu.ID}] += r.llama.EstimagedVRAMByGPU(gpu.ID)
// }
}
} else {
slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
@@ -489,7 +497,8 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
// CPU or Metal don't need checking, so no waiting required
// windows can page VRAM, only cuda currently can report accurate used vram usage
if (len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) ||
if len(runner.gpus) == 0 ||
(len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) ||
(runtime.GOOS == "windows" && runner.gpus[0].Library != "cuda") {
finished <- struct{}{}
return finished
@@ -624,3 +633,19 @@ func (s *Scheduler) unloadAllRunners() {
}
}
}
// If other runners are loaded, make sure the pending request will fit in system memory
// If not, pick a runner to unload, else return nil and the request can be loaded
func (s *Scheduler) maybeFindCPURunnerToUnload(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) *runnerRef {
slog.Debug("evaluating if CPU model load will fit in available system memory")
estimate := llm.EstimateGPULayers(gpus, ggml, req.model.ProjectorPaths, req.opts)
if estimate.TotalSize <= gpus[0].FreeMemory {
slog.Debug("cpu inference mode, model fits in available system memory", "model", format.HumanBytes2(estimate.TotalSize), "available", format.HumanBytes2(gpus[0].FreeMemory))
return nil
}
// TODO - optimization: try to find CPU only runners first, or partial offloads with enough in system memory to make room
return s.findRunnerToUnload()
}

View File

@@ -60,7 +60,7 @@ func TestLoad(t *testing.T) {
err := <-req.errCh
require.Contains(t, err.Error(), "this model may be incompatible")
server := &mockLlm{estimatedVRAM: 10}
server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
return server, nil
}
@@ -146,7 +146,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1),
}
scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM}
scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}}
return scenario
}
@@ -182,6 +182,12 @@ func TestRequests(t *testing.T) {
g.FreeMemory = 12 * format.GigaByte
return []gpu.GpuInfo{g}
}
s.getCpuFn = func() gpu.GpuInfoList {
g := gpu.GpuInfo{Library: "cpu"}
g.TotalMemory = 32 * format.GigaByte
g.FreeMemory = 26 * format.GigaByte
return []gpu.GpuInfo{g}
}
s.newServerFn = scenario1a.newServer
slog.Info("scenario1a")
s.pendingReqCh <- scenario1a.req
@@ -420,7 +426,7 @@ func TestUseLoadedRunner(t *testing.T) {
sessionDuration: 2,
}
finished := make(chan *LlmRequest)
llm1 := &mockLlm{}
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
r1 := &runnerRef{llama: llm1, sessionDuration: 1}
req.useLoadedRunner(r1, finished)
require.Equal(t, uint(1), r1.refCount)
@@ -453,8 +459,8 @@ func TestUpdateFreeSpace(t *testing.T) {
gpus[0].FreeMemory = 900
gpus[1].TotalMemory = 2000
gpus[1].FreeMemory = 1900
llm1 := &mockLlm{estimatedVRAM: 100}
llm2 := &mockLlm{estimatedVRAM: 200}
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
r1 := &runnerRef{llama: llm1, gpus: gpus}
r2 := &runnerRef{llama: llm2, gpus: gpus}
@@ -465,8 +471,8 @@ func TestUpdateFreeSpace(t *testing.T) {
s.loadedMu.Unlock()
s.updateFreeSpace(gpus)
require.Equal(t, uint64(850), gpus[0].FreeMemory)
require.Equal(t, uint64(1850), gpus[1].FreeMemory)
require.Equal(t, uint64(1000-50-125), gpus[0].FreeMemory)
require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory)
}
func TestFindRunnerToUnload(t *testing.T) {
@@ -493,7 +499,7 @@ func TestNeedsReload(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done()
llm := &mockLlm{}
llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
do := api.DefaultOptions()
runner := &runnerRef{
model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}},
@@ -536,8 +542,8 @@ func TestUnloadAllRunners(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done()
llm1 := &mockLlm{}
llm2 := &mockLlm{}
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
s := InitScheduler(ctx)
s.unloadAllRunners()
@@ -555,7 +561,7 @@ func TestUnloadAllRunners(t *testing.T) {
}
func TestUnload(t *testing.T) {
llm1 := &mockLlm{}
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
r1 := &runnerRef{llama: llm1}
r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}}
r1.unload()
@@ -565,19 +571,20 @@ func TestUnload(t *testing.T) {
}
type mockLlm struct {
pingResp error
waitResp error
completionResp error
embeddingResp []float64
embeddingRespErr error
tokenizeResp []int
tokenizeRespErr error
detokenizeResp string
detonekizeRespErr error
closeResp error
closeCalled bool
estimatedVRAM uint64
estimatedTotal uint64
pingResp error
waitResp error
completionResp error
embeddingResp []float64
embeddingRespErr error
tokenizeResp []int
tokenizeRespErr error
detokenizeResp string
detonekizeRespErr error
closeResp error
closeCalled bool
estimatedVRAM uint64
estimatedTotal uint64
estimatedVRAMByGPU map[string]uint64
}
func (s *mockLlm) Ping(ctx context.Context) error { return s.pingResp }
@@ -598,5 +605,6 @@ func (s *mockLlm) Close() error {
s.closeCalled = true
return s.closeResp
}
func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal }
func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal }
func (s *mockLlm) EstimagedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] }