Merge branch 'main' into drifkin/array-head-count-simple

This commit is contained in:
Devon Rifkin
2025-05-08 11:46:52 -07:00
committed by GitHub
156 changed files with 6327 additions and 3282 deletions

View File

@@ -81,6 +81,10 @@ func InitScheduler(ctx context.Context) *Scheduler {
// context must be canceled to decrement ref count and release the runner
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration *api.Duration) (chan *runnerRef, chan error) {
if opts.NumCtx < 4 {
opts.NumCtx = 4
}
req := &LlmRequest{
ctx: c,
model: model,
@@ -110,11 +114,6 @@ func (s *Scheduler) Run(ctx context.Context) {
}()
}
const (
defaultContextLength = 4096
smallGpuContextLength = 2048
)
func (s *Scheduler) processPending(ctx context.Context) {
for {
select {
@@ -148,6 +147,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
s.loadedMu.Unlock()
if runner != nil {
if runner.needsReload(ctx, pending) {
slog.Debug("reloading", "runner", runner)
runnerToExpire = runner
} else {
// Runner is usable, return it
@@ -167,17 +167,6 @@ func (s *Scheduler) processPending(ctx context.Context) {
gpus = s.getGpuFn()
}
if pending.origNumCtx == -1 {
if len(gpus) == 1 && gpus[0].Library != "cpu" && gpus[0].TotalMemory <= 4096*1024*1024 {
slog.Info("GPU is small, limiting default context window", "num_ctx", smallGpuContextLength)
pending.opts.NumCtx = smallGpuContextLength
pending.origNumCtx = smallGpuContextLength
} else {
pending.opts.NumCtx = defaultContextLength
pending.origNumCtx = defaultContextLength
}
}
if envconfig.MaxRunners() <= 0 {
// No user specified MaxRunners, so figure out what automatic setting to use
// If all GPUs have reliable free memory reporting, defaultModelsPerGPU * the number of GPUs
@@ -294,7 +283,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
}
// Trigger an expiration to unload once it's done
runnerToExpire.refMu.Lock()
slog.Debug("resetting model to expire immediately to make room", "modelPath", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount)
slog.Debug("resetting model to expire immediately to make room", "runner", runnerToExpire, "refCount", runnerToExpire.refCount)
if runnerToExpire.expireTimer != nil {
runnerToExpire.expireTimer.Stop()
runnerToExpire.expireTimer = nil
@@ -307,13 +296,13 @@ func (s *Scheduler) processPending(ctx context.Context) {
// Wait for the unload to happen
// Note: at this point we're queueing up all incoming requests, even if they were for
// a different model that's loaded and not scheduled to be removed.
slog.Debug("waiting for pending requests to complete and unload to occur", "modelPath", runnerToExpire.modelPath)
slog.Debug("waiting for pending requests to complete and unload to occur", "runner", runnerToExpire)
select {
case <-ctx.Done():
slog.Debug("shutting down scheduler pending loop")
return
case <-s.unloadedCh:
slog.Debug("unload completed", "modelPath", runnerToExpire.modelPath)
slog.Debug("unload completed", "runner", runnerToExpire)
continue
}
}
@@ -343,16 +332,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
runner.refCount--
if runner.refCount <= 0 {
if runner.sessionDuration <= 0 {
slog.Debug("runner with zero duration has gone idle, expiring to unload", "modelPath", runner.modelPath)
slog.Debug("runner with zero duration has gone idle, expiring to unload", "runner", runner)
if runner.expireTimer != nil {
runner.expireTimer.Stop()
runner.expireTimer = nil
}
s.expiredCh <- runner
} else if runner.expireTimer == nil {
slog.Debug("runner with non-zero duration has gone idle, adding timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration)
slog.Debug("runner with non-zero duration has gone idle, adding timer", "runner", runner, "duration", runner.sessionDuration)
runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
slog.Debug("timer expired, expiring to unload", "modelPath", runner.modelPath)
slog.Debug("timer expired, expiring to unload", "runner", runner)
runner.refMu.Lock()
defer runner.refMu.Unlock()
if runner.expireTimer != nil {
@@ -363,18 +352,18 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
})
runner.expiresAt = time.Now().Add(runner.sessionDuration)
} else {
slog.Debug("runner with non-zero duration has gone idle, resetting timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration)
slog.Debug("runner with non-zero duration has gone idle, resetting timer", "runner", runner, "duration", runner.sessionDuration)
runner.expireTimer.Reset(runner.sessionDuration)
runner.expiresAt = time.Now().Add(runner.sessionDuration)
}
}
slog.Debug("after processing request finished event", "modelPath", runner.modelPath, "refCount", runner.refCount)
slog.Debug("after processing request finished event", "runner", runner, "refCount", runner.refCount)
runner.refMu.Unlock()
case runner := <-s.expiredCh:
slog.Debug("runner expired event received", "modelPath", runner.modelPath)
slog.Debug("runner expired event received", "runner", runner)
runner.refMu.Lock()
if runner.refCount > 0 {
slog.Debug("expired event with positive ref count, retrying", "modelPath", runner.modelPath, "refCount", runner.refCount)
slog.Debug("expired event with positive ref count, retrying", "runner", runner, "refCount", runner.refCount)
go func(runner *runnerRef) {
// We can't unload yet, but want to as soon as the current request completes
// So queue up another expired event
@@ -386,17 +375,29 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
}
s.loadedMu.Lock()
slog.Debug("got lock to unload", "modelPath", runner.modelPath)
finished := runner.waitForVRAMRecovery()
runner.unload()
delete(s.loaded, runner.modelPath)
s.loadedMu.Unlock()
slog.Debug("runner released", "modelPath", runner.modelPath)
runner.refMu.Unlock()
<-finished
slog.Debug("sending an unloaded event", "modelPath", runner.modelPath)
s.unloadedCh <- struct{}{}
slog.Debug("got lock to unload expired event", "runner", runner)
runnerToUnload := s.loaded[runner.modelPath]
if runnerToUnload == nil {
// If runnerToUnload is nil, we already processed an event and
// unloaded it. This double unload can happen if the initial
// request is canceled and we're trying to load another model
// that requires this one to be evicted, or the settings change
// and require a reload
s.loadedMu.Unlock()
runner.refMu.Unlock()
slog.Debug("duplicate expired event, ignoring", "runner", runner)
} else {
slog.Debug("starting background wait for VRAM recovery", "runner", runner)
finished := runner.waitForVRAMRecovery()
runner.unload()
delete(s.loaded, runner.modelPath)
s.loadedMu.Unlock()
slog.Debug("runner terminated and removed from list, blocking for VRAM recovery", "runner", runner)
<-finished
runner.refMu.Unlock()
slog.Debug("sending an unloaded event", "runner", runner)
s.unloadedCh <- struct{}{}
}
}
}
}
@@ -418,7 +419,7 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
pending.successCh <- runner
go func() {
<-pending.ctx.Done()
slog.Debug("context for request finished")
slog.Debug("context for request finished", "runner", runner)
finished <- pending
}()
}
@@ -453,12 +454,19 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
estimatedVRAM: llama.EstimatedVRAM(),
estimatedTotal: llama.EstimatedTotal(),
loading: true,
refCount: 1,
pid: llama.Pid(),
}
runner.numParallel = numParallel
runner.refMu.Lock()
runner.refMu.Lock() // hold lock until running or aborted
s.loadedMu.Lock()
if oldRunner, ok := s.loaded[req.model.ModelPath]; ok {
// Shouldn't happen, but safeguard against leaking a runner
slog.Warn("model was still loaded", "old_runner", oldRunner, "new_runner", runner)
oldRunner.refMu.Lock()
oldRunner.unload()
oldRunner.refMu.Unlock()
}
s.loaded[req.model.ModelPath] = runner
slog.Info("loaded runners", "count", len(s.loaded))
s.loadedMu.Unlock()
@@ -467,13 +475,16 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
defer runner.refMu.Unlock()
if err = llama.WaitUntilRunning(req.ctx); err != nil {
slog.Error("error loading llama server", "error", err)
runner.refCount--
req.errCh <- err
slog.Debug("triggering expiration for failed load", "model", runner.modelPath)
slog.Debug("triggering expiration for failed load", "runner", runner)
s.expiredCh <- runner
return
}
slog.Debug("finished setting up runner", "model", req.model.ModelPath)
slog.Debug("finished setting up", "runner", runner)
if runner.pid < 0 {
runner.pid = llama.Pid()
}
runner.refCount++
runner.loading = false
go func() {
<-req.ctx.Done()
@@ -491,7 +502,12 @@ func (s *Scheduler) updateFreeSpace(allGpus discover.GpuInfoList) {
}
predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
s.loadedMu.Lock()
runners := make([]*runnerRef, 0, len(s.loaded))
for _, r := range s.loaded {
runners = append(runners, r)
}
s.loadedMu.Unlock()
for _, r := range runners {
r.refMu.Lock()
if r.llama != nil {
for _, gpu := range allGpus {
@@ -502,7 +518,6 @@ func (s *Scheduler) updateFreeSpace(allGpus discover.GpuInfoList) {
}
r.refMu.Unlock()
}
s.loadedMu.Unlock()
// Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
for i := range allGpus {
@@ -549,12 +564,11 @@ func (s *Scheduler) filterGPUsWithoutLoadingModels(allGpus discover.GpuInfoList)
// TODO consolidate sched_types.go
type runnerRef struct {
refMu sync.Mutex
// refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
refMu sync.Mutex
refCount uint // prevent unloading if > 0
// unloading bool // set to true when we are trying to unload the runner
llama llm.LlamaServer
pid int
loading bool // True only during initial load, then false forever
gpus discover.GpuInfoList // Recorded at time of provisioning
estimatedVRAM uint64
@@ -639,6 +653,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
(len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) ||
(runtime.GOOS == "windows" && runner.gpus[0].Library != "cuda") {
finished <- struct{}{}
slog.Debug("no need to wait for VRAM recovery", "runner", runner)
return finished
}
start := time.Now()
@@ -657,7 +672,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
for {
<-ticker.C
if time.Now().After(expiresAt) {
slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath)
slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "runner", runner)
finished <- struct{}{}
}
@@ -670,7 +685,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
}
// If we're within ~80% of the estimated memory usage recovered, bail out
if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 {
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath)
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "runner", runner)
finished <- struct{}{}
return
}
@@ -679,6 +694,33 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
return finished
}
func (runner *runnerRef) LogValue() slog.Value {
if runner == nil {
return slog.StringValue("nil")
}
attrs := []slog.Attr{}
if runner.model != nil {
attrs = append(attrs, slog.String("name", runner.model.Name))
}
if len(runner.gpus) > 0 {
attrs = append(attrs,
slog.String("inference", runner.gpus[0].Library),
slog.Int("devices", len(runner.gpus)),
)
}
attrs = append(attrs,
slog.String("size", format.HumanBytes2(runner.estimatedTotal)),
slog.String("vram", format.HumanBytes2(runner.estimatedVRAM)),
slog.Int("parallel", runner.numParallel),
slog.Int("pid", runner.pid),
slog.String("model", runner.modelPath),
)
if runner.Options != nil {
attrs = append(attrs, slog.Int("num_ctx", runner.Options.NumCtx))
}
return slog.GroupValue(attrs...)
}
type ByDurationAndName []*runnerRef
func (a ByDurationAndName) Len() int { return len(a) }
@@ -801,12 +843,12 @@ func (s *Scheduler) findRunnerToUnload() *runnerRef {
rc := runner.refCount
runner.refMu.Unlock()
if rc == 0 {
slog.Debug("found an idle runner to unload")
slog.Debug("found an idle runner to unload", "runner", runner)
return runner
}
}
// None appear idle, just wait for the one with the shortest duration
slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
slog.Debug("no idle runners, picking the shortest duration", "runner_count", len(runnerList), "runner", runnerList[0])
return runnerList[0]
}
@@ -823,8 +865,8 @@ func (s *Scheduler) unloadAllRunners() {
func (s *Scheduler) expireRunner(model *Model) {
s.loadedMu.Lock()
defer s.loadedMu.Unlock()
runner, ok := s.loaded[model.ModelPath]
s.loadedMu.Unlock()
if ok {
runner.refMu.Lock()
runner.expiresAt = time.Now()