mirror of
https://github.com/dogkeeper886/ollama37.git
synced 2025-12-09 23:37:06 +00:00
This commit fixes the issue where large models (>10B parameters) fail to load due to underestimated compute buffer memory requirements, causing allocation failures when the model should use multiple GPUs. Problem: - deepseek-r1:14b (14B, qwen2 architecture) failed with "failed to allocate compute buffers" error - System has 2×Tesla K80 GPUs (24GB total) but tried to fit 12GB model in 1×11GB GPU - Root cause: Memory estimation underestimated compute buffers by 3-4× (estimated 916 MB, actual requirement ~3-4 GB) Solution: 1. Added model-family-specific batch size defaults (llm/memory.go) - Different architectures have different optimal batch sizes - deepseek2: 2048/256, qwen2: 512/512, llama: 512/512, etc. - Ensures accurate memory estimation based on architecture 2. Updated server to use architecture-specific batch sizes (llm/server.go) - Detects model architecture from GGUF metadata - Uses family defaults when user doesn't specify - Ensures consistency between estimation and allocation 3. Applied 3.5× safety margin to compute buffer estimates (llm/memory.go) - Accounts for temporary tensors not captured in GraphSize formulas - Conservative approach prevents allocation failures - Documented with detailed analysis of underestimation causes 4. Implemented measurement API for future use (llama-context.cpp, llama.go) - C++ function to measure actual memory requirements - Go wrapper for integration into GPU selection - Foundation for future measurement-based approach - Currently unused but documented for future improvement Results: - deepseek-r1:14b now loads successfully using both GPUs - Proper distribution: 25 layers on GPU0, 24 layers on GPU1 - Total memory: 16.2 GB across 2×11 GB GPUs (8.4 + 7.8 GB) - Compute buffers: 3.1 GB per GPU (with safety margin applied) - All other models continue to work correctly Comprehensive documentation added to all modified code explaining: - Problem analysis with real examples - Solution rationale and trade-offs - Future improvement paths Tested with: deepseek-r1:14b, deepseek-r1:8b, gemma3:4b, gpt-oss 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1911 lines
58 KiB
Go
1911 lines
58 KiB
Go
package llm
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"log/slog"
|
||
"math/rand"
|
||
"net"
|
||
"net/http"
|
||
"os"
|
||
"os/exec"
|
||
"path/filepath"
|
||
"runtime"
|
||
"slices"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"golang.org/x/sync/semaphore"
|
||
|
||
"github.com/ollama/ollama/api"
|
||
"github.com/ollama/ollama/envconfig"
|
||
"github.com/ollama/ollama/format"
|
||
"github.com/ollama/ollama/fs/ggml"
|
||
"github.com/ollama/ollama/llama"
|
||
"github.com/ollama/ollama/logutil"
|
||
"github.com/ollama/ollama/ml"
|
||
"github.com/ollama/ollama/model"
|
||
)
|
||
|
||
type filteredEnv []string
|
||
|
||
func (e filteredEnv) LogValue() slog.Value {
|
||
var attrs []slog.Attr
|
||
for _, env := range e {
|
||
if key, value, ok := strings.Cut(env, "="); ok {
|
||
switch {
|
||
case strings.HasPrefix(key, "OLLAMA_"),
|
||
strings.HasPrefix(key, "CUDA_"),
|
||
strings.HasPrefix(key, "ROCR_"),
|
||
strings.HasPrefix(key, "ROCM_"),
|
||
strings.HasPrefix(key, "HIP_"),
|
||
strings.HasPrefix(key, "GPU_"),
|
||
strings.HasPrefix(key, "HSA_"),
|
||
strings.HasPrefix(key, "GGML_"),
|
||
slices.Contains([]string{
|
||
"PATH",
|
||
"LD_LIBRARY_PATH",
|
||
"DYLD_LIBRARY_PATH",
|
||
}, key):
|
||
attrs = append(attrs, slog.String(key, value))
|
||
}
|
||
}
|
||
}
|
||
return slog.GroupValue(attrs...)
|
||
}
|
||
|
||
type LlamaServer interface {
|
||
ModelPath() string
|
||
Load(ctx context.Context, systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, requireFull bool) ([]ml.DeviceID, error)
|
||
Ping(ctx context.Context) error
|
||
WaitUntilRunning(ctx context.Context) error
|
||
Completion(ctx context.Context, req CompletionRequest, fn func(CompletionResponse)) error
|
||
Embedding(ctx context.Context, input string) ([]float32, error)
|
||
Tokenize(ctx context.Context, content string) ([]int, error)
|
||
Detokenize(ctx context.Context, tokens []int) (string, error)
|
||
Close() error
|
||
VRAMSize() uint64 // Total VRAM across all GPUs
|
||
TotalSize() uint64
|
||
VRAMByGPU(id ml.DeviceID) uint64
|
||
Pid() int
|
||
GetPort() int
|
||
GetDeviceInfos(ctx context.Context) []ml.DeviceInfo
|
||
HasExited() bool
|
||
}
|
||
|
||
// llmServer is an instance of a runner hosting a single model
|
||
type llmServer struct {
|
||
port int
|
||
cmd *exec.Cmd
|
||
done chan error // Channel to signal when the process exits
|
||
status *StatusWriter
|
||
options api.Options
|
||
numParallel int
|
||
modelPath string
|
||
|
||
loadRequest LoadRequest // Parameters used to initialize the runner
|
||
|
||
// llamaModel is an instance of the cgo llama.cpp model definition
|
||
// nil if this server is running the new engine
|
||
llamaModel *llama.Model
|
||
llamaModelLock *sync.Mutex
|
||
|
||
// textProcessor handles text encoding/decoding for the model in the Ollama engine
|
||
// nil if this server is running the llama.cpp based engine
|
||
textProcessor model.TextProcessor
|
||
|
||
totalLayers uint64
|
||
loadStart time.Time // Record how long it took the model to load
|
||
loadProgress float32
|
||
|
||
sem *semaphore.Weighted
|
||
}
|
||
|
||
type llamaServer struct {
|
||
llmServer
|
||
|
||
ggml *ggml.GGML
|
||
gpus []ml.DeviceInfo // The set of GPUs covered by the memory estimate
|
||
estimate MemoryEstimate
|
||
}
|
||
|
||
type ollamaServer struct {
|
||
llmServer
|
||
|
||
mem *ml.BackendMemory
|
||
}
|
||
|
||
// LoadModel will load a model from disk. The model must be in the GGML format.
|
||
//
|
||
// It collects array values for arrays with a size less than or equal to
|
||
// maxArraySize. If maxArraySize is 0, the default value of 1024 is used. If
|
||
// the maxArraySize is negative, all arrays are collected.
|
||
func LoadModel(model string, maxArraySize int) (*ggml.GGML, error) {
|
||
if _, err := os.Stat(model); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
f, err := os.Open(model)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer f.Close()
|
||
|
||
ggml, err := ggml.Decode(f, maxArraySize)
|
||
return ggml, err
|
||
}
|
||
|
||
// NewLlamaServer will run a server for the given GPUs
|
||
func NewLlamaServer(systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, modelPath string, f *ggml.GGML, adapters, projectors []string, opts api.Options, numParallel int) (LlamaServer, error) {
|
||
var llamaModel *llama.Model
|
||
var textProcessor model.TextProcessor
|
||
var err error
|
||
if envconfig.NewEngine() || f.KV().OllamaEngineRequired() {
|
||
if len(projectors) == 0 {
|
||
textProcessor, err = model.NewTextProcessor(modelPath)
|
||
} else {
|
||
err = errors.New("split vision models aren't supported")
|
||
}
|
||
if err != nil {
|
||
// To prepare for opt-out mode, instead of treating this as an error, we fallback to the old runner
|
||
slog.Debug("model not yet supported by Ollama engine, switching to compatibility mode", "model", modelPath, "error", err)
|
||
}
|
||
}
|
||
if textProcessor == nil {
|
||
llamaModel, err = llama.LoadModelFromFile(modelPath, llama.ModelParams{VocabOnly: true})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
// Verify the requested context size is <= the model training size
|
||
trainCtx := f.KV().ContextLength()
|
||
if opts.NumCtx > int(trainCtx) && trainCtx > 0 {
|
||
slog.Warn("requested context size too large for model", "num_ctx", opts.NumCtx, "n_ctx_train", trainCtx)
|
||
opts.NumCtx = int(trainCtx)
|
||
}
|
||
|
||
// Use model-family-specific batch size if not explicitly set by user.
|
||
//
|
||
// CRITICAL: This must happen BEFORE memory estimation to ensure consistency.
|
||
//
|
||
// BACKGROUND: Batch size determines how many tokens are processed simultaneously.
|
||
// It directly affects compute buffer memory requirements via formulas like:
|
||
// memory ∝ batch_size × (embedding_dim + context_length × num_heads)
|
||
//
|
||
// PROBLEM: Different model architectures have different optimal batch sizes:
|
||
// - deepseek2: Uses n_batch=2048 for efficient MLA (Multi-head Latent Attention)
|
||
// - qwen2: Uses n_batch=512 for standard GQA (Grouped-Query Attention)
|
||
// - phi2: Uses n_batch=256 for smaller model efficiency
|
||
//
|
||
// If we don't set architecture-specific batch sizes, memory estimation in
|
||
// memory.go will use wrong values, causing:
|
||
// 1. Underestimation → allocation failure → model won't load
|
||
// 2. Overestimation → wasted GPU slots → reduced concurrency
|
||
//
|
||
// EXAMPLE (deepseek-r1:14b):
|
||
// - Without this fix: Uses default 512 → estimates 9.7 GB → tries 1 GPU → FAILS
|
||
// - With this fix: Uses qwen2's 512 → applies 3.5× margin → estimates 16.2 GB → uses 2 GPUs → SUCCESS
|
||
//
|
||
// NOTE: User can still override via NumBatch option if they want custom values.
|
||
architecture := f.KV().Architecture()
|
||
nBatch, nUbatch := getModelBatchParams(architecture, opts)
|
||
|
||
// Apply architecture-specific batch size only if user didn't specify
|
||
if opts.NumBatch == 0 {
|
||
opts.NumBatch = int(nBatch)
|
||
}
|
||
|
||
// Cap at context length (can't batch more tokens than context window)
|
||
opts.NumBatch = min(opts.NumBatch, opts.NumCtx)
|
||
|
||
slog.Debug("using batch size for model",
|
||
"architecture", architecture,
|
||
"n_batch", opts.NumBatch,
|
||
"n_ubatch", nUbatch)
|
||
|
||
loadRequest := LoadRequest{LoraPath: adapters, KvSize: opts.NumCtx * numParallel, BatchSize: opts.NumBatch, Parallel: numParallel, MultiUserCache: envconfig.MultiUserCache()}
|
||
|
||
defaultThreads := systemInfo.ThreadCount
|
||
if opts.NumThread > 0 {
|
||
loadRequest.NumThreads = opts.NumThread
|
||
} else if defaultThreads > 0 {
|
||
loadRequest.NumThreads = defaultThreads
|
||
}
|
||
|
||
// TODO - NUMA support currently doesn't work properly
|
||
|
||
if opts.MainGPU > 0 {
|
||
loadRequest.MainGPU = opts.MainGPU
|
||
}
|
||
|
||
if len(projectors) > 0 && llamaModel != nil {
|
||
loadRequest.ProjectorPath = projectors[0]
|
||
}
|
||
|
||
fa := envconfig.FlashAttention(f.FlashAttention())
|
||
|
||
// This will disable flash attention unless all GPUs on the system support it, even if we end up selecting a subset
|
||
// that can handle it.
|
||
if fa && !ml.FlashAttentionSupported(gpus) {
|
||
slog.Warn("flash attention enabled but not supported by gpu")
|
||
fa = false
|
||
}
|
||
|
||
if fa && !f.SupportsFlashAttention() {
|
||
slog.Warn("flash attention enabled but not supported by model")
|
||
fa = false
|
||
}
|
||
|
||
kvct := strings.ToLower(envconfig.KvCacheType())
|
||
|
||
if fa {
|
||
slog.Info("enabling flash attention")
|
||
loadRequest.FlashAttention = true
|
||
|
||
// Flash Attention also supports kv cache quantization
|
||
// Enable if the requested and kv cache type is supported by the model
|
||
if f.SupportsKVCacheType(kvct) {
|
||
loadRequest.KvCacheType = kvct
|
||
} else {
|
||
slog.Warn("kv cache type not supported by model", "type", kvct)
|
||
}
|
||
} else if kvct != "" && kvct != "f16" {
|
||
slog.Warn("quantized kv cache requested but flash attention disabled", "type", kvct)
|
||
}
|
||
|
||
gpuLibs := ml.LibraryPaths(gpus)
|
||
status := NewStatusWriter(os.Stderr)
|
||
cmd, port, err := StartRunner(
|
||
textProcessor != nil,
|
||
modelPath,
|
||
gpuLibs,
|
||
status,
|
||
ml.GetVisibleDevicesEnv(gpus),
|
||
)
|
||
|
||
s := llmServer{
|
||
port: port,
|
||
cmd: cmd,
|
||
status: status,
|
||
options: opts,
|
||
modelPath: modelPath,
|
||
loadRequest: loadRequest,
|
||
llamaModel: llamaModel,
|
||
llamaModelLock: &sync.Mutex{},
|
||
textProcessor: textProcessor,
|
||
numParallel: numParallel,
|
||
sem: semaphore.NewWeighted(int64(numParallel)),
|
||
totalLayers: f.KV().BlockCount() + 1,
|
||
loadStart: time.Now(),
|
||
done: make(chan error, 1),
|
||
}
|
||
|
||
if err != nil {
|
||
var msg string
|
||
if s.status != nil && s.status.LastErrMsg != "" {
|
||
msg = s.status.LastErrMsg
|
||
}
|
||
err := fmt.Errorf("error starting runner: %v %s", err, msg)
|
||
if llamaModel != nil {
|
||
llama.FreeModel(llamaModel)
|
||
}
|
||
return nil, err
|
||
}
|
||
|
||
// reap subprocess when it exits
|
||
go func() {
|
||
err := s.cmd.Wait()
|
||
// Favor a more detailed message over the process exit status
|
||
if err != nil && s.status != nil && s.status.LastErrMsg != "" {
|
||
slog.Error("llama runner terminated", "error", err)
|
||
if strings.Contains(s.status.LastErrMsg, "unknown model") {
|
||
s.status.LastErrMsg = "this model is not supported by your version of Ollama. You may need to upgrade"
|
||
}
|
||
s.done <- errors.New(s.status.LastErrMsg)
|
||
} else {
|
||
s.done <- err
|
||
}
|
||
}()
|
||
|
||
if textProcessor != nil {
|
||
return &ollamaServer{llmServer: s}, nil
|
||
} else {
|
||
return &llamaServer{llmServer: s, ggml: f}, nil
|
||
}
|
||
}
|
||
|
||
func StartRunner(ollamaEngine bool, modelPath string, gpuLibs []string, out io.Writer, extraEnvs map[string]string) (cmd *exec.Cmd, port int, err error) {
|
||
var exe string
|
||
exe, err = os.Executable()
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("unable to lookup executable path: %w", err)
|
||
}
|
||
|
||
if eval, err := filepath.EvalSymlinks(exe); err == nil {
|
||
exe = eval
|
||
}
|
||
|
||
port = 0
|
||
if a, err := net.ResolveTCPAddr("tcp", "localhost:0"); err == nil {
|
||
var l *net.TCPListener
|
||
if l, err = net.ListenTCP("tcp", a); err == nil {
|
||
port = l.Addr().(*net.TCPAddr).Port
|
||
l.Close()
|
||
}
|
||
}
|
||
if port == 0 {
|
||
slog.Debug("ResolveTCPAddr failed, using random port")
|
||
port = rand.Intn(65535-49152) + 49152 // get a random port in the ephemeral range
|
||
}
|
||
params := []string{"runner"}
|
||
if ollamaEngine {
|
||
params = append(params, "--ollama-engine")
|
||
}
|
||
if modelPath != "" {
|
||
params = append(params, "--model", modelPath)
|
||
}
|
||
params = append(params, "--port", strconv.Itoa(port))
|
||
|
||
var pathEnv string
|
||
switch runtime.GOOS {
|
||
case "windows":
|
||
pathEnv = "PATH"
|
||
case "darwin":
|
||
pathEnv = "DYLD_LIBRARY_PATH"
|
||
default:
|
||
pathEnv = "LD_LIBRARY_PATH"
|
||
}
|
||
|
||
// Note: we always put our dependency paths first
|
||
// since these are the exact version we compiled/linked against
|
||
libraryPaths := append([]string{}, gpuLibs...)
|
||
if libraryPath, ok := os.LookupEnv(pathEnv); ok {
|
||
libraryPaths = append(libraryPaths, filepath.SplitList(libraryPath)...)
|
||
}
|
||
|
||
cmd = exec.Command(exe, params...)
|
||
|
||
cmd.Env = os.Environ()
|
||
|
||
if out != nil {
|
||
stdout, err := cmd.StdoutPipe()
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("failed to spawn server stdout pipe: %w", err)
|
||
}
|
||
stderr, err := cmd.StderrPipe()
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("failed to spawn server stderr pipe: %w", err)
|
||
}
|
||
go func() {
|
||
io.Copy(out, stdout) //nolint:errcheck
|
||
}()
|
||
go func() {
|
||
io.Copy(out, stderr) //nolint:errcheck
|
||
}()
|
||
}
|
||
cmd.SysProcAttr = LlamaServerSysProcAttr
|
||
|
||
// Always filter down the set of GPUs in case there are any unsupported devices that might crash
|
||
pathEnvVal := strings.Join(libraryPaths, string(filepath.ListSeparator))
|
||
|
||
// Update or add the path variable with our adjusted version
|
||
pathNeeded := true
|
||
ollamaPathNeeded := true
|
||
extraEnvsDone := map[string]bool{}
|
||
for k := range extraEnvs {
|
||
extraEnvsDone[k] = false
|
||
}
|
||
for i := range cmd.Env {
|
||
cmp := strings.SplitN(cmd.Env[i], "=", 2)
|
||
if strings.EqualFold(cmp[0], pathEnv) {
|
||
cmd.Env[i] = pathEnv + "=" + pathEnvVal
|
||
pathNeeded = false
|
||
} else if strings.EqualFold(cmp[0], "OLLAMA_LIBRARY_PATH") {
|
||
cmd.Env[i] = "OLLAMA_LIBRARY_PATH=" + strings.Join(gpuLibs, string(filepath.ListSeparator))
|
||
ollamaPathNeeded = false
|
||
} else if len(extraEnvs) != 0 {
|
||
for k, v := range extraEnvs {
|
||
if strings.EqualFold(cmp[0], k) {
|
||
cmd.Env[i] = k + "=" + v
|
||
extraEnvsDone[k] = true
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if pathNeeded {
|
||
cmd.Env = append(cmd.Env, pathEnv+"="+pathEnvVal)
|
||
}
|
||
if ollamaPathNeeded {
|
||
cmd.Env = append(cmd.Env, "OLLAMA_LIBRARY_PATH="+strings.Join(gpuLibs, string(filepath.ListSeparator)))
|
||
}
|
||
for k, done := range extraEnvsDone {
|
||
if !done {
|
||
cmd.Env = append(cmd.Env, k+"="+extraEnvs[k])
|
||
}
|
||
}
|
||
|
||
slog.Info("starting runner", "cmd", cmd)
|
||
slog.Debug("subprocess", "", filteredEnv(cmd.Env))
|
||
|
||
if err = cmd.Start(); err != nil {
|
||
return nil, 0, err
|
||
}
|
||
err = nil
|
||
return
|
||
}
|
||
|
||
func (s *llmServer) ModelPath() string {
|
||
return s.modelPath
|
||
}
|
||
|
||
type LoadOperation int
|
||
|
||
// The order of these constants are significant because we iterate over the operations. They
|
||
// should be in order of increasingly loading the model.
|
||
const (
|
||
LoadOperationFit LoadOperation = iota // Return memory requirements but do not allocate
|
||
LoadOperationAlloc // Allocate memory but do not load the weights
|
||
LoadOperationCommit // Load weights - further changes cannot be made after this
|
||
LoadOperationClose // Close model and free memory
|
||
)
|
||
|
||
func (o LoadOperation) String() string {
|
||
switch o {
|
||
case LoadOperationFit:
|
||
return "fit"
|
||
case LoadOperationAlloc:
|
||
return "alloc"
|
||
case LoadOperationCommit:
|
||
return "commit"
|
||
case LoadOperationClose:
|
||
return "close"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
|
||
type LoadRequest struct {
|
||
Operation LoadOperation
|
||
|
||
LoraPath []string
|
||
Parallel int
|
||
BatchSize int
|
||
FlashAttention bool
|
||
KvSize int
|
||
KvCacheType string
|
||
NumThreads int
|
||
GPULayers ml.GPULayersList
|
||
MultiUserCache bool
|
||
|
||
// Legacy fields - not used with the Ollama engine
|
||
ProjectorPath string
|
||
MainGPU int
|
||
UseMmap bool
|
||
}
|
||
|
||
type LoadResponse struct {
|
||
Success bool
|
||
Memory ml.BackendMemory
|
||
}
|
||
|
||
var ErrLoadRequiredFull = errors.New("unable to load full model on GPU")
|
||
|
||
func (s *llamaServer) Load(ctx context.Context, systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, requireFull bool) ([]ml.DeviceID, error) {
|
||
systemTotalMemory := systemInfo.TotalMemory
|
||
systemFreeMemory := systemInfo.FreeMemory
|
||
systemSwapFreeMemory := systemInfo.FreeSwap
|
||
slog.Info("system memory", "total", format.HumanBytes2(systemTotalMemory), "free", format.HumanBytes2(systemFreeMemory), "free_swap", format.HumanBytes2(systemSwapFreeMemory))
|
||
|
||
if len(gpus) == 0 || s.options.NumGPU == 0 {
|
||
if !verifyCPUFit(s.ggml, s.modelPath, []string{s.loadRequest.ProjectorPath}, s.loadRequest.LoraPath, s.options, systemInfo, s.numParallel) {
|
||
slog.Info("model requires more memory than is currently available, evicting a model to make space", "estimate", s.estimate)
|
||
return nil, fmt.Errorf("model requires more system memory than is currently available %w", ErrLoadRequiredFull)
|
||
}
|
||
} else {
|
||
g := pickBestFullFitByLibrary(s.ggml, s.modelPath, []string{s.loadRequest.ProjectorPath}, s.loadRequest.LoraPath, s.options, gpus, s.numParallel)
|
||
if g == nil {
|
||
if !requireFull {
|
||
g = pickBestPartialFitByLibrary(s.ggml, []string{s.loadRequest.ProjectorPath}, s.loadRequest.LoraPath, s.options, gpus, s.numParallel)
|
||
} else {
|
||
slog.Info("model requires more memory than is currently available, evicting a model to make space", "estimate", s.estimate)
|
||
return nil, ErrLoadRequiredFull
|
||
}
|
||
}
|
||
gpus = g
|
||
}
|
||
|
||
s.estimate = estimateGPULayers(gpus, s.ggml, []string{s.loadRequest.ProjectorPath}, s.options, s.numParallel)
|
||
|
||
if len(gpus) >= 1 {
|
||
switch {
|
||
case s.options.NumGPU == 0:
|
||
gpus = []ml.DeviceInfo{}
|
||
case gpus[0].Library == "Metal" && s.estimate.VRAMSize > systemInfo.TotalMemory:
|
||
// disable partial offloading when model is greater than total system memory as this
|
||
// can lead to locking up the system
|
||
s.options.NumGPU = 0
|
||
gpus = []ml.DeviceInfo{}
|
||
case gpus[0].Library != "Metal" && s.estimate.Layers == 0:
|
||
// Don't bother loading into the GPU if no layers can fit
|
||
gpus = []ml.DeviceInfo{}
|
||
case s.options.NumGPU < 0 && s.estimate.Layers > 0:
|
||
s.options.NumGPU = s.estimate.Layers
|
||
}
|
||
} else {
|
||
s.options.NumGPU = 0
|
||
}
|
||
|
||
// On linux and windows, over-allocating CPU memory will almost always result in an error
|
||
// Darwin has fully dynamic swap so has no direct concept of free swap space
|
||
if runtime.GOOS != "darwin" {
|
||
systemMemoryRequired := s.estimate.TotalSize - s.estimate.VRAMSize
|
||
available := systemInfo.FreeMemory + systemInfo.FreeSwap
|
||
if systemMemoryRequired > available {
|
||
slog.Warn("model request too large for system", "requested", format.HumanBytes2(systemMemoryRequired), "available", format.HumanBytes2(available), "total", format.HumanBytes2(systemInfo.TotalMemory), "free", format.HumanBytes2(systemInfo.FreeMemory), "swap", format.HumanBytes2(systemInfo.FreeSwap))
|
||
return nil, fmt.Errorf("model requires more system memory (%s) than is available (%s)", format.HumanBytes2(systemMemoryRequired), format.HumanBytes2(available))
|
||
}
|
||
}
|
||
|
||
slog.Info("offload", "", s.estimate)
|
||
|
||
s.gpus = gpus
|
||
s.loadRequest.GPULayers = createGPULayers(s.estimate, s.ggml, gpus, s.options.NumGPU)
|
||
|
||
// Mmap is only supported on the llama engine
|
||
if s.textProcessor == nil {
|
||
s.loadRequest.UseMmap = true
|
||
|
||
// mmap has issues with partial offloading on metal
|
||
for _, g := range gpus {
|
||
if g.Library == "Metal" &&
|
||
uint64(s.options.NumGPU) > 0 &&
|
||
uint64(s.options.NumGPU) < s.ggml.KV().BlockCount()+1 {
|
||
s.options.UseMMap = new(bool)
|
||
*s.options.UseMMap = false
|
||
}
|
||
}
|
||
|
||
// Windows CUDA should not use mmap for best performance
|
||
// Linux with a model larger than free space, mmap leads to thrashing
|
||
// For CPU loads we want the memory to be allocated, not FS cache
|
||
if (runtime.GOOS == "windows" && len(gpus) > 0 && gpus[0].Library == "CUDA" && s.options.UseMMap == nil) ||
|
||
(runtime.GOOS == "linux" && systemInfo.FreeMemory < s.estimate.TotalSize && s.options.UseMMap == nil) ||
|
||
(len(gpus) == 0 && s.options.UseMMap == nil) ||
|
||
(len(gpus) > 0 && gpus[0].Library == "Vulkan" && s.options.UseMMap == nil) ||
|
||
(s.options.UseMMap != nil && !*s.options.UseMMap) {
|
||
s.loadRequest.UseMmap = false
|
||
}
|
||
}
|
||
|
||
if err := s.waitUntilRunnerLaunched(ctx); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
resp, err := s.initModel(ctx, s.loadRequest, LoadOperationCommit)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// On the Ollama engine, we can print out a summary of the memory allocations.
|
||
// We don't have this for the llama engine but it does something similar itself.
|
||
if s.textProcessor != nil {
|
||
resp.Memory.Log(slog.LevelInfo)
|
||
}
|
||
|
||
if !resp.Success {
|
||
slog.Warn("failed to allocate memory for model", "memory", resp.Memory)
|
||
return nil, errors.New("failed to allocate memory for model")
|
||
}
|
||
|
||
// The llama engine does its memory allocations together with model loading, so we
|
||
// need to wait until it is done to ensure that we have accurate memory data before
|
||
// loading the next model
|
||
if s.textProcessor == nil {
|
||
return uniqueDeviceIDs(s.loadRequest.GPULayers), s.WaitUntilRunning(ctx)
|
||
} else {
|
||
return uniqueDeviceIDs(s.loadRequest.GPULayers), nil
|
||
}
|
||
}
|
||
|
||
// createGPULayers maps from the tensor splits assigned by the memory estimates to explicit assignment
|
||
// of particular layers onto GPUs
|
||
func createGPULayers(estimate MemoryEstimate, ggml *ggml.GGML, gpus []ml.DeviceInfo, numGPU int) ml.GPULayersList {
|
||
if numGPU <= 0 || len(gpus) == 0 {
|
||
return nil
|
||
}
|
||
|
||
gpuLayers := make(ml.GPULayersList, len(gpus))
|
||
for i := range gpuLayers {
|
||
gpuLayers[i].DeviceID = gpus[i].DeviceID
|
||
}
|
||
|
||
var sum float32
|
||
splits := make([]float32, len(estimate.TensorSplit))
|
||
// cumulative sum of all splits
|
||
for i := range splits {
|
||
sum += float32(estimate.TensorSplit[i])
|
||
splits[i] = sum
|
||
}
|
||
|
||
if sum <= 0 {
|
||
return nil
|
||
}
|
||
|
||
// normalize splits
|
||
for i := range splits {
|
||
splits[i] /= sum
|
||
}
|
||
|
||
blocks := int(ggml.KV().BlockCount())
|
||
gpuRangeStart := max(0, blocks-numGPU)
|
||
gpuRangeStop := min(gpuRangeStart+numGPU, blocks+1)
|
||
for i := range blocks + 1 {
|
||
if i < gpuRangeStart || i >= gpuRangeStop {
|
||
continue
|
||
}
|
||
|
||
index := slices.IndexFunc(splits, func(f float32) bool { return float32(i-gpuRangeStart)/float32(gpuRangeStop-gpuRangeStart) < f })
|
||
if index < 0 || index >= len(gpus) {
|
||
continue
|
||
}
|
||
|
||
gpuLayers[index].Layers = append(gpuLayers[index].Layers, i)
|
||
}
|
||
|
||
return gpuLayers
|
||
}
|
||
|
||
// Load finds the optimal layout of layers to offload on GPUs based on no initial information about the size of the model
|
||
// It does this by:
|
||
// 1. Assigning the full model to the GPU with the largest available free memory
|
||
// 2. Attempting to allocate the layout and receiving the memory requirements in response
|
||
// 3. Creating a new layout based on the updated memory information
|
||
// 4. Going back to step 2 and looping until we either stabilize on a particular layout or discover that we have entered a cycle
|
||
//
|
||
// This process is repeated for higher levels of loading the model (fit, allocate, commit). The earlier levels are quicker,
|
||
// allowing for faster iteration, but may return less information.
|
||
//
|
||
// Returns the list of GPU IDs that were used in the final allocation on success
|
||
func (s *ollamaServer) Load(ctx context.Context, systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, requireFull bool) ([]ml.DeviceID, error) {
|
||
var success bool
|
||
defer func() {
|
||
if !success {
|
||
s.initModel(ctx, LoadRequest{}, LoadOperationClose)
|
||
}
|
||
if s.mem != nil {
|
||
s.mem.Log(slog.LevelInfo)
|
||
}
|
||
}()
|
||
|
||
slog.Info("loading model", "model layers", s.totalLayers, "requested", s.options.NumGPU)
|
||
|
||
systemTotalMemory := systemInfo.TotalMemory
|
||
systemFreeMemory := systemInfo.FreeMemory
|
||
systemSwapFreeMemory := systemInfo.FreeSwap
|
||
slog.Info("system memory", "total", format.HumanBytes2(systemTotalMemory), "free", format.HumanBytes2(systemFreeMemory), "free_swap", format.HumanBytes2(systemSwapFreeMemory))
|
||
|
||
for _, gpu := range gpus {
|
||
available := gpu.FreeMemory - envconfig.GpuOverhead() - gpu.MinimumMemory()
|
||
if gpu.FreeMemory < envconfig.GpuOverhead()+gpu.MinimumMemory() {
|
||
available = 0
|
||
}
|
||
slog.Info("gpu memory", "id", gpu.ID, "library", gpu.Library,
|
||
"available", format.HumanBytes2(available),
|
||
"free", format.HumanBytes2(gpu.FreeMemory),
|
||
"minimum", format.HumanBytes2(gpu.MinimumMemory()),
|
||
"overhead", format.HumanBytes2(envconfig.GpuOverhead()))
|
||
}
|
||
|
||
pastAllocations := make(map[uint64]struct{})
|
||
var backoff float32
|
||
|
||
gpuLayers, err := s.createLayout(systemInfo, gpus, s.mem, requireFull, backoff)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if err := s.waitUntilRunnerLaunched(ctx); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
nextOperation:
|
||
for operation := LoadOperationFit; operation < LoadOperationCommit; operation++ {
|
||
nextLoad:
|
||
for {
|
||
s.loadRequest.GPULayers = gpuLayers
|
||
resp, err := s.initModel(ctx, s.loadRequest, operation)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
resp.Memory.Log(slog.LevelDebug)
|
||
slog.Debug("memory", "success", resp.Success, "required", resp.Memory)
|
||
|
||
pastAllocations[gpuLayers.Hash()] = struct{}{}
|
||
s.mem = &resp.Memory
|
||
|
||
for {
|
||
newGPULayers, err := s.createLayout(systemInfo, gpus, s.mem, requireFull, backoff)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
slog.Debug("new layout created", "layers", newGPULayers)
|
||
|
||
// We get additional memory information over time, which will reduce the number of
|
||
// layers that can fit, so fewer layers is actually better. As long as we haven't seen
|
||
// this layout before and it doesn't have more layers than the last one, we can keep
|
||
// trying to see if we can do better.
|
||
if _, ok := pastAllocations[newGPULayers.Hash()]; !ok && newGPULayers.Sum() <= gpuLayers.Sum() {
|
||
gpuLayers = newGPULayers
|
||
continue nextLoad
|
||
}
|
||
|
||
// If we are looping around a few different layouts due to graphs moving off and on
|
||
// GPUs, make sure that we try out the intermediate states. For example, if we are
|
||
// looping between offloading 39 and 41 layers, we should also check 40.
|
||
//
|
||
// This switches strategies to force an incremental number of layers to be offloaded
|
||
// and checking the memory layout. If the allocation succeeds and creating a new layout
|
||
// without forcing offload yields the same or greater number of layers offloaded, then
|
||
// the trial is successful.
|
||
//
|
||
// This alternate strategy does not introduce the possibility of loops with the overall
|
||
// state machine, as it exits this code block either with a successful result, moving
|
||
// to the next operation or the original number of layers offloaded.
|
||
if s.options.NumGPU < 0 && newGPULayers.Sum()-gpuLayers.Sum() > 1 {
|
||
for i := newGPULayers.Sum() - 1; i >= gpuLayers.Sum(); i-- {
|
||
slog.Debug("exploring intermediate layers", "layer", i)
|
||
|
||
s.options.NumGPU = i
|
||
newGPULayers, err = s.createLayout(systemInfo, gpus, s.mem, requireFull, backoff)
|
||
s.options.NumGPU = -1
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
slog.Debug("new layout created", "layers", newGPULayers)
|
||
|
||
s.loadRequest.GPULayers = newGPULayers
|
||
resp, err = s.initModel(ctx, s.loadRequest, operation)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
resp.Memory.Log(slog.LevelDebug)
|
||
slog.Debug("memory", "success", resp.Success, "required", resp.Memory)
|
||
|
||
if resp.Success {
|
||
verifyGPULayers, err := s.createLayout(systemInfo, gpus, &resp.Memory, requireFull, backoff)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
slog.Debug("verifying layout", "layers", verifyGPULayers)
|
||
|
||
if newGPULayers.Sum() <= verifyGPULayers.Sum() {
|
||
gpuLayers = newGPULayers
|
||
|
||
// Since we are going backwards (increasing the number of layers), ensure that
|
||
// we can come back down if needed
|
||
clear(pastAllocations)
|
||
|
||
continue nextOperation
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// If we generated a layout a second time or go backwards, then we've converged. Use the last
|
||
// layout before the repeat, which is already allocated.
|
||
if resp.Success {
|
||
continue nextOperation
|
||
}
|
||
|
||
if s.options.NumGPU >= 0 {
|
||
return nil, fmt.Errorf("memory layout cannot be allocated with num_gpu = %v", s.options.NumGPU)
|
||
}
|
||
|
||
// Memory allocation failed even though we created a layout that we thought should
|
||
// fit in available memory. This could happen if either our free memory reports
|
||
// are incorrect or if available memory is changing between layout and allocation
|
||
// time. Apply a backoff to try to find the real amount of available space.
|
||
if backoff > 1 {
|
||
slog.Warn("memory layout cannot be allocated", "memory", resp.Memory)
|
||
return nil, errors.New("memory layout cannot be allocated")
|
||
} else {
|
||
backoff += 0.1
|
||
}
|
||
|
||
slog.Info("model layout did not fit, applying backoff", "backoff", fmt.Sprintf("%.2f", backoff))
|
||
}
|
||
}
|
||
}
|
||
|
||
s.loadRequest.GPULayers = gpuLayers
|
||
resp, err := s.initModel(ctx, s.loadRequest, LoadOperationCommit)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
success = resp.Success
|
||
s.mem = &resp.Memory
|
||
|
||
if !success {
|
||
slog.Warn("failed to commit memory for model", "memory", resp.Memory)
|
||
return nil, errors.New("failed to commit memory for model")
|
||
}
|
||
|
||
return uniqueDeviceIDs(gpuLayers), nil
|
||
}
|
||
|
||
func uniqueDeviceIDs(gpuLayers ml.GPULayersList) []ml.DeviceID {
|
||
devices := []ml.DeviceID{}
|
||
for _, layer := range gpuLayers {
|
||
new := true
|
||
for _, ID := range devices {
|
||
if layer.DeviceID == ID {
|
||
new = false
|
||
break
|
||
}
|
||
}
|
||
if new {
|
||
devices = append(devices, layer.DeviceID)
|
||
}
|
||
}
|
||
return devices
|
||
}
|
||
|
||
// createLayout uses the current best view of memory requirements and creates a layout of model layers on GPUs.
|
||
// It does this by:
|
||
// - Calculating how much space each layer requires
|
||
// - Calculating how much space each GPU has available for layers, based on free memory and space occupied by the graph
|
||
// - Assigning layers
|
||
// - Ensuring that we don't exceed limits, such as requirements about partial offloading or system memory
|
||
func (s *ollamaServer) createLayout(systemInfo ml.SystemInfo, systemGPUs []ml.DeviceInfo, memory *ml.BackendMemory, requireFull bool, backoff float32) (ml.GPULayersList, error) {
|
||
if memory == nil {
|
||
memory = &ml.BackendMemory{CPU: ml.DeviceMemory{
|
||
Weights: make([]uint64, s.totalLayers),
|
||
Cache: make([]uint64, s.totalLayers),
|
||
}}
|
||
}
|
||
gpuLayers, layers, err := s.buildLayout(systemGPUs, memory, requireFull, backoff)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
err = s.verifyLayout(systemInfo, memory, requireFull, gpuLayers, layers)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return gpuLayers, nil
|
||
}
|
||
|
||
func (s *ollamaServer) buildLayout(systemGPUs []ml.DeviceInfo, memory *ml.BackendMemory, requireFull bool, backoff float32) (ml.GPULayersList, []uint64, error) {
|
||
gpus := append(make([]ml.DeviceInfo, 0, len(systemGPUs)), systemGPUs...)
|
||
sort.Sort(sort.Reverse(ml.ByFreeMemory(gpus)))
|
||
|
||
layers := make([]uint64, len(memory.CPU.Weights))
|
||
for i := range layers {
|
||
for j := range memory.GPUs {
|
||
layers[i] += memory.GPUs[j].Weights[i]
|
||
layers[i] += memory.GPUs[j].Cache[i]
|
||
}
|
||
layers[i] += memory.CPU.Weights[i]
|
||
layers[i] += memory.CPU.Cache[i]
|
||
logutil.Trace("layer to assign", "layer", i, "size", format.HumanBytes2(layers[i]))
|
||
}
|
||
|
||
gpuLayers := ml.GPULayersList{}
|
||
for _, gl := range ml.ByLibrary(gpus) {
|
||
// If a GPU already has a graph allocated on it, then we should continue to use it.
|
||
// Otherwise, we lose information that we got from previous allocations, which can
|
||
// cause cycling. Plus, we get more information about required allocation from each
|
||
// iteration, so it doesn't make sense that a later iteration would use fewer GPUs.
|
||
lastUsedGPU := 0
|
||
for i := range gl {
|
||
found := false
|
||
for j := range memory.GPUs {
|
||
if gl[i].DeviceID == memory.GPUs[j].DeviceID {
|
||
if memory.GPUs[j].Graph != 0 {
|
||
lastUsedGPU = i
|
||
}
|
||
|
||
reserved := uint64(float32(gl[i].FreeMemory)*backoff) + gl[i].MinimumMemory() + envconfig.GpuOverhead() + memory.GPUs[j].Graph
|
||
if gl[i].FreeMemory > reserved {
|
||
gl[i].FreeMemory -= reserved
|
||
} else {
|
||
gl[i].FreeMemory = 0
|
||
}
|
||
|
||
slog.Debug("available gpu", "id", gl[i].ID, "library", gl[i].Library,
|
||
"available layer vram", format.HumanBytes2(gl[i].FreeMemory),
|
||
"backoff", fmt.Sprintf("%.2f", backoff), "minimum", format.HumanBytes2(gl[i].MinimumMemory()),
|
||
"overhead", format.HumanBytes2(envconfig.GpuOverhead()),
|
||
"graph", format.HumanBytes2(memory.GPUs[j].Graph))
|
||
|
||
found = true
|
||
break
|
||
}
|
||
}
|
||
if !found {
|
||
// The runner doesn't report seeing this GPU
|
||
gl[i].FreeMemory = 0
|
||
}
|
||
}
|
||
|
||
libraryGpuLayers := assignLayers(layers, gl, requireFull, s.options.NumGPU, lastUsedGPU)
|
||
if libraryGpuLayers.Sum() > gpuLayers.Sum() {
|
||
gpuLayers = libraryGpuLayers
|
||
}
|
||
}
|
||
return gpuLayers, layers, nil
|
||
}
|
||
|
||
// verifyLayout ensures that we don't exceed limits, such as requirements about partial offloading or system memory
|
||
func (s *ollamaServer) verifyLayout(systemInfo ml.SystemInfo, memory *ml.BackendMemory, requireFull bool, gpuLayers ml.GPULayersList, layers []uint64) error {
|
||
// These sizes will only increase as we go through additional iterations and get additional information.
|
||
cpuSize := memory.InputWeights + memory.CPU.Graph
|
||
var vramSize uint64
|
||
for _, gl := range gpuLayers {
|
||
for _, gpu := range memory.GPUs {
|
||
if gl.DeviceID == gpu.DeviceID {
|
||
vramSize += gpu.Graph
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
nextLayer:
|
||
for i := range layers {
|
||
for _, g := range gpuLayers {
|
||
for _, gl := range g.Layers {
|
||
if i == gl {
|
||
vramSize += layers[i]
|
||
continue nextLayer
|
||
}
|
||
}
|
||
}
|
||
cpuSize += layers[i]
|
||
}
|
||
|
||
if requireFull {
|
||
if gpuLayers.Sum() < len(layers) && (s.options.NumGPU < 0 || gpuLayers.Sum() < s.options.NumGPU) {
|
||
return ErrLoadRequiredFull
|
||
}
|
||
|
||
if cpuSize > systemInfo.FreeMemory {
|
||
return ErrLoadRequiredFull
|
||
}
|
||
}
|
||
|
||
// On linux and windows, over-allocating CPU memory will almost always result in an error
|
||
// Darwin has fully dynamic swap so has no direct concept of free swap space
|
||
if runtime.GOOS != "darwin" {
|
||
available := systemInfo.FreeMemory + systemInfo.FreeSwap
|
||
if cpuSize > available {
|
||
slog.Warn("model request too large for system", "requested", format.HumanBytes2(cpuSize), "available", format.HumanBytes2(available), "total", format.HumanBytes2(systemInfo.TotalMemory), "free", format.HumanBytes2(systemInfo.FreeMemory), "swap", format.HumanBytes2(systemInfo.FreeSwap))
|
||
return fmt.Errorf("model requires more system memory (%s) than is available (%s)", format.HumanBytes2(cpuSize), format.HumanBytes2(available))
|
||
}
|
||
} else {
|
||
if vramSize > systemInfo.TotalMemory {
|
||
// disable partial offloading when model is greater than total system memory as this
|
||
// can lead to locking up the system
|
||
s.options.NumGPU = 0
|
||
gpuLayers = ml.GPULayersList{}
|
||
}
|
||
}
|
||
|
||
if gpuLayers.Sum() == 0 {
|
||
slog.Debug("insufficient VRAM to load any model layers")
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// assignLayers packs the maximum number of layers onto the smallest set of GPUs and comes up with a layer assignment
|
||
func assignLayers(layers []uint64, gpus []ml.DeviceInfo, requireFull bool, requestedLayers int, lastUsedGPU int) (gpuLayers ml.GPULayersList) {
|
||
// If we can't fit everything then prefer offloading layers other than the output layer
|
||
for range 2 {
|
||
// requestedLayers may be -1 if nothing was requested
|
||
requestedLayers = min(len(layers), requestedLayers)
|
||
|
||
if !envconfig.SchedSpread() {
|
||
for i := lastUsedGPU; i < len(gpus); i++ {
|
||
// Try to pack things into as few GPUs as possible
|
||
forceRequest := i == len(gpus)-1 && !requireFull
|
||
gpuLayers = findBestFit(layers, gpus[:i+1], requestedLayers, forceRequest)
|
||
if gpuLayers.Sum() == len(layers) || gpuLayers.Sum() == requestedLayers {
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
gpuLayers = findBestFit(layers, gpus, requestedLayers, !requireFull)
|
||
}
|
||
|
||
// We only stop if we've gotten all of the layers - even if we got requestedLayers, we still
|
||
// might want to try dropping the output layer.
|
||
if gpuLayers.Sum() == len(layers) {
|
||
return gpuLayers
|
||
}
|
||
|
||
layers = layers[:len(layers)-1]
|
||
}
|
||
|
||
return gpuLayers
|
||
}
|
||
|
||
// findBestFit binary searches to find the smallest capacity factor that can fit
|
||
// the max number of layers. The capacity factor is multiplied by the free space on
|
||
// each GPU and a small one will force even balancing.
|
||
func findBestFit(layers []uint64, gpus []ml.DeviceInfo, requestedLayers int, forceRequest bool) (gpuLayers ml.GPULayersList) {
|
||
var high float32 = 1
|
||
var low float32 = 0
|
||
|
||
// If we need to fulfill the requested number of layers, pretend we have almost infinite VRAM
|
||
if requestedLayers >= 0 && forceRequest {
|
||
high = 1000
|
||
}
|
||
|
||
bestAssignments := greedyFit(layers, gpus, high, requestedLayers)
|
||
maxNumGPU := bestAssignments.Sum()
|
||
if maxNumGPU == 0 {
|
||
return bestAssignments
|
||
}
|
||
|
||
for high-low > 1e-6 {
|
||
mid := (low + high) / 2
|
||
assignments := greedyFit(layers, gpus, mid, requestedLayers)
|
||
if assignments.Sum() == maxNumGPU {
|
||
high = mid
|
||
bestAssignments = assignments
|
||
} else {
|
||
low = mid
|
||
}
|
||
}
|
||
return bestAssignments
|
||
}
|
||
|
||
// greedyFit assigns layers incrementally to GPUs, spilling over as each runs out of free space
|
||
func greedyFit(layers []uint64, gpus []ml.DeviceInfo, capacity float32, requestedLayers int) (gpuLayers ml.GPULayersList) {
|
||
device := len(gpus) - 1
|
||
gpuLayers = ml.GPULayersList{{DeviceID: gpus[device].DeviceID}}
|
||
freeSpace := uint64(float32(gpus[device].FreeMemory) * capacity)
|
||
for i := len(layers) - 1; i >= 0; i-- {
|
||
if requestedLayers >= 0 && len(layers)-1-i >= requestedLayers {
|
||
break
|
||
}
|
||
|
||
for {
|
||
if layers[i] <= freeSpace {
|
||
gpuLayers[0].Layers = append([]int{i}, gpuLayers[0].Layers...)
|
||
freeSpace -= layers[i]
|
||
break
|
||
}
|
||
|
||
device--
|
||
if device < 0 {
|
||
return gpuLayers
|
||
}
|
||
gpuLayers = append(ml.GPULayersList{{DeviceID: gpus[device].DeviceID}}, gpuLayers...)
|
||
freeSpace = uint64(float32(gpus[device].FreeMemory) * capacity)
|
||
}
|
||
}
|
||
return gpuLayers
|
||
}
|
||
|
||
// waitUntilRunnerLaunched sleeps until the runner subprocess is alive enough
|
||
// to respond to status requests
|
||
func (s *llmServer) waitUntilRunnerLaunched(ctx context.Context) error {
|
||
for {
|
||
_, err := s.getServerStatus(ctx)
|
||
if err == nil {
|
||
break
|
||
}
|
||
|
||
t := time.NewTimer(10 * time.Millisecond)
|
||
select {
|
||
case <-t.C:
|
||
continue
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// initModel sends a load request to the runner based on the request operation (fit, alloc, commit)
|
||
// and parameters
|
||
func (s *llmServer) initModel(ctx context.Context, req LoadRequest, operation LoadOperation) (*LoadResponse, error) {
|
||
req.Operation = operation
|
||
|
||
data, err := json.Marshal(req)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error marshaling load data: %w", err)
|
||
}
|
||
|
||
r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/load", s.port), bytes.NewBuffer(data))
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error creating load request: %w", err)
|
||
}
|
||
r.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := http.DefaultClient.Do(r)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("do load request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("read load request: %w", err)
|
||
}
|
||
|
||
if resp.StatusCode >= 400 {
|
||
log.Printf("llm load error: %s", body)
|
||
return nil, fmt.Errorf("%s", body)
|
||
}
|
||
|
||
var llmResp LoadResponse
|
||
if err := json.Unmarshal(body, &llmResp); err != nil {
|
||
return nil, fmt.Errorf("load unmarshal encode response: %w", err)
|
||
}
|
||
|
||
return &llmResp, nil
|
||
}
|
||
|
||
type ServerStatus int
|
||
|
||
const ( // iota is reset to 0
|
||
ServerStatusReady ServerStatus = iota
|
||
ServerStatusNoSlotsAvailable
|
||
ServerStatusLaunched
|
||
ServerStatusLoadingModel
|
||
ServerStatusNotResponding
|
||
ServerStatusError
|
||
)
|
||
|
||
func (s ServerStatus) String() string {
|
||
switch s {
|
||
case ServerStatusReady:
|
||
return "llm server ready"
|
||
case ServerStatusNoSlotsAvailable:
|
||
return "llm busy - no slots available"
|
||
case ServerStatusLaunched:
|
||
return "llm server launched"
|
||
case ServerStatusLoadingModel:
|
||
return "llm server loading model"
|
||
case ServerStatusNotResponding:
|
||
return "llm server not responding"
|
||
default:
|
||
return "llm server error"
|
||
}
|
||
}
|
||
|
||
type ServerStatusResponse struct {
|
||
Status ServerStatus `json:"status"`
|
||
Progress float32 `json:"progress"`
|
||
}
|
||
|
||
func (s *llmServer) getServerStatus(ctx context.Context) (ServerStatus, error) {
|
||
// Fail fast if its exited
|
||
if s.cmd.ProcessState != nil {
|
||
msg := ""
|
||
if s.status != nil && s.status.LastErrMsg != "" {
|
||
msg = s.status.LastErrMsg
|
||
}
|
||
if s.cmd.ProcessState.ExitCode() == -1 {
|
||
// Most likely a signal killed it, log some more details to try to help troubleshoot
|
||
slog.Warn("llama runner process no longer running", "sys", s.cmd.ProcessState.Sys(), "string", s.cmd.ProcessState)
|
||
}
|
||
return ServerStatusError, fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg)
|
||
}
|
||
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", s.port), nil)
|
||
if err != nil {
|
||
return ServerStatusError, fmt.Errorf("error creating GET request: %v", err)
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := http.DefaultClient.Do(req)
|
||
if err != nil {
|
||
if errors.Is(err, context.DeadlineExceeded) {
|
||
return ServerStatusNotResponding, errors.New("server not responding")
|
||
}
|
||
if strings.Contains(err.Error(), "connection refused") {
|
||
return ServerStatusNotResponding, errors.New("connection refused")
|
||
}
|
||
return ServerStatusError, fmt.Errorf("health resp: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return ServerStatusError, fmt.Errorf("read health request: %w", err)
|
||
}
|
||
|
||
var ssr ServerStatusResponse
|
||
if err := json.Unmarshal(body, &ssr); err != nil {
|
||
return ServerStatusError, fmt.Errorf("health unmarshal encode response: %w", err)
|
||
}
|
||
|
||
switch ssr.Status {
|
||
case ServerStatusLoadingModel:
|
||
s.loadProgress = ssr.Progress
|
||
return ssr.Status, nil
|
||
case ServerStatusLaunched, ServerStatusReady, ServerStatusNoSlotsAvailable:
|
||
return ssr.Status, nil
|
||
default:
|
||
return ssr.Status, fmt.Errorf("server error: %+v", ssr)
|
||
}
|
||
}
|
||
|
||
// getServerStatusRetry will retry if ServerStatusNoSlotsAvailable is received
|
||
func (s *llmServer) getServerStatusRetry(ctx context.Context) (ServerStatus, error) {
|
||
var retries int
|
||
for {
|
||
status, err := s.getServerStatus(ctx)
|
||
if err != nil {
|
||
return status, err
|
||
}
|
||
|
||
if status == ServerStatusNoSlotsAvailable {
|
||
if retries >= 10 {
|
||
return status, fmt.Errorf("no slots available after %d retries", retries)
|
||
}
|
||
|
||
time.Sleep(5 * time.Millisecond)
|
||
retries++
|
||
continue
|
||
}
|
||
|
||
return status, nil
|
||
}
|
||
}
|
||
|
||
func (s *llmServer) Ping(ctx context.Context) error {
|
||
_, err := s.getServerStatus(ctx)
|
||
if err != nil {
|
||
slog.Debug("server unhealthy", "error", err)
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
|
||
stallDuration := envconfig.LoadTimeout() // If no progress happens
|
||
stallTimer := time.Now().Add(stallDuration) // give up if we stall
|
||
|
||
slog.Info("waiting for llama runner to start responding")
|
||
var lastStatus ServerStatus = -1
|
||
fullyLoaded := false
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
slog.Warn("client connection closed before server finished loading, aborting load")
|
||
return fmt.Errorf("timed out waiting for llama runner to start: %w", ctx.Err())
|
||
case err := <-s.done:
|
||
return fmt.Errorf("llama runner process has terminated: %w", err)
|
||
default:
|
||
}
|
||
if time.Now().After(stallTimer) {
|
||
// timeout
|
||
msg := ""
|
||
if s.status != nil && s.status.LastErrMsg != "" {
|
||
msg = s.status.LastErrMsg
|
||
}
|
||
return fmt.Errorf("timed out waiting for llama runner to start - progress %0.2f - %s", s.loadProgress, msg)
|
||
}
|
||
if s.cmd.ProcessState != nil {
|
||
msg := ""
|
||
if s.status != nil && s.status.LastErrMsg != "" {
|
||
msg = s.status.LastErrMsg
|
||
}
|
||
return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg)
|
||
}
|
||
ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
|
||
defer cancel()
|
||
priorProgress := s.loadProgress
|
||
status, _ := s.getServerStatus(ctx)
|
||
if lastStatus != status && status != ServerStatusReady {
|
||
// Only log on status changes
|
||
slog.Info("waiting for server to become available", "status", status)
|
||
}
|
||
switch status {
|
||
case ServerStatusReady:
|
||
slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", time.Since(s.loadStart).Seconds()))
|
||
return nil
|
||
default:
|
||
lastStatus = status
|
||
// Reset the timer as long as we're making forward progress on the load
|
||
if priorProgress != s.loadProgress {
|
||
slog.Debug(fmt.Sprintf("model load progress %0.2f", s.loadProgress))
|
||
stallTimer = time.Now().Add(stallDuration)
|
||
} else if !fullyLoaded && int(s.loadProgress*100.0) >= 100 {
|
||
slog.Debug("model load completed, waiting for server to become available", "status", status)
|
||
stallTimer = time.Now().Add(stallDuration)
|
||
fullyLoaded = true
|
||
}
|
||
time.Sleep(time.Millisecond * 250)
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *llmServer) Pid() int {
|
||
if s.cmd != nil && s.cmd.Process != nil {
|
||
return s.cmd.Process.Pid
|
||
}
|
||
return -1
|
||
}
|
||
|
||
func (s *llmServer) GetPort() int {
|
||
return s.port
|
||
}
|
||
|
||
func (s *llmServer) HasExited() bool {
|
||
if s.cmd != nil && s.cmd.ProcessState != nil && s.cmd.ProcessState.ExitCode() >= 0 {
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
var grammarJSON = `
|
||
root ::= object
|
||
value ::= object | array | string | number | ("true" | "false" | "null") ws
|
||
object ::=
|
||
"{" ws (
|
||
string ":" ws value
|
||
("," ws string ":" ws value)*
|
||
)? ws "}"
|
||
array ::=
|
||
"[" ws (
|
||
value
|
||
("," ws value)*
|
||
)? ws "]"
|
||
string ::=
|
||
"\"" (
|
||
[^"\\\x7F\x00-\x1F] |
|
||
"\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) # escapes
|
||
)* "\""
|
||
number ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([eE] [-+]? [0-9]+)?
|
||
# Optional space: by convention, applied in this grammar after literal chars when allowed
|
||
ws ::= ([ \t\n] ws)?
|
||
`
|
||
|
||
const maxBufferSize = 512 * format.KiloByte
|
||
|
||
type ImageData struct {
|
||
Data []byte `json:"data"`
|
||
ID int `json:"id"`
|
||
}
|
||
|
||
type CompletionRequest struct {
|
||
Prompt string
|
||
Format json.RawMessage
|
||
Images []ImageData
|
||
Options *api.Options
|
||
|
||
Grammar string // set before sending the request to the subprocess
|
||
Shift bool
|
||
Truncate bool
|
||
}
|
||
|
||
// DoneReason represents the reason why a completion response is done
|
||
type DoneReason int
|
||
|
||
const (
|
||
// DoneReasonStop indicates the completion stopped naturally
|
||
DoneReasonStop DoneReason = iota
|
||
// DoneReasonLength indicates the completion stopped due to length limits
|
||
DoneReasonLength
|
||
// DoneReasonConnectionClosed indicates the completion stopped due to the connection being closed
|
||
DoneReasonConnectionClosed
|
||
)
|
||
|
||
func (d DoneReason) String() string {
|
||
switch d {
|
||
case DoneReasonLength:
|
||
return "length"
|
||
case DoneReasonStop:
|
||
return "stop"
|
||
default:
|
||
return "" // closed
|
||
}
|
||
}
|
||
|
||
type CompletionResponse struct {
|
||
Content string `json:"content"`
|
||
DoneReason DoneReason `json:"done_reason"`
|
||
Done bool `json:"done"`
|
||
PromptEvalCount int `json:"prompt_eval_count"`
|
||
PromptEvalDuration time.Duration `json:"prompt_eval_duration"`
|
||
EvalCount int `json:"eval_count"`
|
||
EvalDuration time.Duration `json:"eval_duration"`
|
||
}
|
||
|
||
// Completion is the bridge between Go and the runner subprocess
|
||
//
|
||
// This function sends an HTTP POST request to the runner subprocess
|
||
// and streams back the generated tokens in real-time.
|
||
//
|
||
// Flow:
|
||
// 1. Validate and prepare request (grammar, options)
|
||
// 2. Acquire semaphore (limit concurrent requests to runner)
|
||
// 3. Send HTTP POST to http://127.0.0.1:<port>/completion
|
||
// 4. Runner subprocess receives request and starts generation loop
|
||
// 5. Stream back CompletionResponse objects (one per token or batch)
|
||
// 6. Call callback function fn() for each response chunk
|
||
//
|
||
// Parameters:
|
||
// - ctx: Context (for cancellation)
|
||
// - req: CompletionRequest (prompt, images, options)
|
||
// - fn: Callback function called for each generated token/chunk
|
||
//
|
||
// Returns:
|
||
// - error: If request fails or is cancelled
|
||
func (s *llmServer) Completion(ctx context.Context, req CompletionRequest, fn func(CompletionResponse)) error {
|
||
slog.Debug("completion request", "images", len(req.Images), "prompt", len(req.Prompt), "format", string(req.Format))
|
||
logutil.Trace("completion request", "prompt", req.Prompt)
|
||
|
||
// Handle JSON output format constraints
|
||
// If user requests JSON output, apply grammar to constrain generation
|
||
if len(req.Format) > 0 {
|
||
switch string(req.Format) {
|
||
case `null`, `""`:
|
||
// Field was set, but "missing" a value. We accept
|
||
// these as "not set".
|
||
break
|
||
case `"json"`:
|
||
// Use built-in JSON grammar
|
||
req.Grammar = grammarJSON
|
||
default:
|
||
if req.Format[0] != '{' {
|
||
return fmt.Errorf("invalid format: %q; expected \"json\" or a valid JSON Schema object", req.Format)
|
||
}
|
||
|
||
// User provided a JSON schema - convert to GBNF grammar
|
||
// This constrains the model to only generate valid JSON matching the schema
|
||
g := llama.SchemaToGrammar(req.Format)
|
||
if g == nil {
|
||
return fmt.Errorf("invalid JSON schema in format")
|
||
}
|
||
req.Grammar = string(g)
|
||
}
|
||
}
|
||
|
||
if req.Options == nil {
|
||
opts := api.DefaultOptions()
|
||
req.Options = &opts
|
||
}
|
||
|
||
// Acquire semaphore to limit concurrent requests
|
||
// The runner subprocess can only handle numParallel requests at once
|
||
// (each parallel slot requires separate KV cache allocation)
|
||
if err := s.sem.Acquire(ctx, 1); err != nil {
|
||
if errors.Is(err, context.Canceled) {
|
||
slog.Info("aborting completion request due to client closing the connection")
|
||
} else {
|
||
slog.Error("Failed to acquire semaphore", "error", err)
|
||
}
|
||
return err
|
||
}
|
||
defer s.sem.Release(1)
|
||
|
||
// put an upper limit on num_predict to avoid the model running on forever
|
||
if req.Options.NumPredict < 0 || req.Options.NumPredict > 10*s.options.NumCtx {
|
||
req.Options.NumPredict = 10 * s.options.NumCtx
|
||
}
|
||
|
||
// Wait for runner subprocess to be ready
|
||
// The subprocess may still be loading weights into memory
|
||
status, err := s.getServerStatusRetry(ctx)
|
||
if err != nil {
|
||
return err
|
||
} else if status != ServerStatusReady {
|
||
return fmt.Errorf("unexpected server status: %s", status)
|
||
}
|
||
|
||
// Marshal CompletionRequest to JSON
|
||
// Use SetEscapeHTML(false) to avoid escaping special characters
|
||
buffer := &bytes.Buffer{}
|
||
enc := json.NewEncoder(buffer)
|
||
enc.SetEscapeHTML(false)
|
||
|
||
if err := enc.Encode(req); err != nil {
|
||
return fmt.Errorf("failed to marshal data: %v", err)
|
||
}
|
||
|
||
// *** SEND HTTP POST TO RUNNER SUBPROCESS ***
|
||
// This is IPC (Inter-Process Communication) between:
|
||
// - Parent: Ollama server (this process)
|
||
// - Child: Runner subprocess (spawned by scheduler)
|
||
//
|
||
// The runner listens on 127.0.0.1:<random_port> with endpoints:
|
||
// - POST /completion - text generation (this call)
|
||
// - POST /embedding - generate embeddings
|
||
// - POST /tokenize - tokenize text
|
||
// - GET /health - health check
|
||
endpoint := fmt.Sprintf("http://127.0.0.1:%d/completion", s.port)
|
||
serverReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, buffer)
|
||
if err != nil {
|
||
return fmt.Errorf("error creating POST request: %v", err)
|
||
}
|
||
serverReq.Header.Set("Content-Type", "application/json")
|
||
|
||
// Execute HTTP request to runner subprocess
|
||
res, err := http.DefaultClient.Do(serverReq)
|
||
if err != nil && errors.Is(err, context.Canceled) {
|
||
// client closed connection
|
||
return err
|
||
} else if err != nil {
|
||
slog.Error("post predict", "error", err)
|
||
return errors.New("model runner has unexpectedly stopped, this may be due to resource limitations or an internal error, check ollama server logs for details")
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.StatusCode >= 400 {
|
||
bodyBytes, err := io.ReadAll(res.Body)
|
||
if err != nil {
|
||
return fmt.Errorf("failed reading llm error response: %w", err)
|
||
}
|
||
log.Printf("llm predict error: %s", bodyBytes)
|
||
return api.StatusError{StatusCode: res.StatusCode, ErrorMessage: strings.TrimSpace(string(bodyBytes))}
|
||
}
|
||
|
||
// *** STREAM RESPONSE BACK FROM RUNNER ***
|
||
// The runner subprocess streams generated tokens back line-by-line
|
||
// Each line is a JSON CompletionResponse object
|
||
//
|
||
// Response flow:
|
||
// 1. Runner tokenizes prompt
|
||
// 2. Runner creates inference batch
|
||
// 3. For each generation step:
|
||
// a. Call context.Decode() -> C.llama_decode() -> CUDA kernel
|
||
// b. Get logits from model output
|
||
// c. Apply sampling (temperature, top_p, top_k)
|
||
// d. Select next token
|
||
// e. Stream CompletionResponse{"content": "token_text", "done": false}
|
||
// 4. When complete, send final response with "done": true and metrics
|
||
scanner := bufio.NewScanner(res.Body)
|
||
buf := make([]byte, 0, maxBufferSize)
|
||
scanner.Buffer(buf, maxBufferSize)
|
||
|
||
// keep track of the last token generated, this is used to abort if the model starts looping
|
||
var lastToken string
|
||
var tokenRepeat int
|
||
|
||
for scanner.Scan() {
|
||
select {
|
||
case <-ctx.Done():
|
||
// This handles the request cancellation
|
||
return ctx.Err()
|
||
default:
|
||
line := scanner.Bytes()
|
||
if len(line) == 0 {
|
||
continue
|
||
}
|
||
|
||
// Handle Server-Sent Events format (optional "data: " prefix)
|
||
evt, ok := bytes.CutPrefix(line, []byte("data: "))
|
||
if !ok {
|
||
evt = line
|
||
}
|
||
|
||
// Parse CompletionResponse from JSON
|
||
// Fields:
|
||
// - Content: generated token(s) as string
|
||
// - Done: true if generation complete
|
||
// - DoneReason: "stop", "length", or "connection_closed"
|
||
// - PromptEvalCount: tokens in prompt
|
||
// - PromptEvalDuration: time to process prompt
|
||
// - EvalCount: tokens generated
|
||
// - EvalDuration: time to generate tokens
|
||
var c CompletionResponse
|
||
if err := json.Unmarshal(evt, &c); err != nil {
|
||
return fmt.Errorf("error unmarshalling llm prediction response: %v", err)
|
||
}
|
||
|
||
// Detect infinite loops (model repeating same token)
|
||
switch {
|
||
case strings.TrimSpace(c.Content) == lastToken:
|
||
tokenRepeat++
|
||
default:
|
||
lastToken = strings.TrimSpace(c.Content)
|
||
tokenRepeat = 0
|
||
}
|
||
|
||
// 30 picked as an arbitrary max token repeat limit, modify as needed
|
||
if tokenRepeat > 30 {
|
||
slog.Debug("prediction aborted, token repeat limit reached")
|
||
return ctx.Err()
|
||
}
|
||
|
||
// Call callback function for each generated token
|
||
if c.Content != "" {
|
||
fn(CompletionResponse{
|
||
Content: c.Content,
|
||
})
|
||
}
|
||
|
||
// Final response includes all metrics
|
||
if c.Done {
|
||
fn(c)
|
||
return nil
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
if strings.Contains(err.Error(), "unexpected EOF") || strings.Contains(err.Error(), "forcibly closed") {
|
||
s.Close()
|
||
var msg string
|
||
if s.status != nil && s.status.LastErrMsg != "" {
|
||
msg = s.status.LastErrMsg
|
||
} else {
|
||
msg = err.Error()
|
||
}
|
||
return fmt.Errorf("an error was encountered while running the model: %s", msg)
|
||
}
|
||
|
||
return fmt.Errorf("error reading llm response: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
type EmbeddingRequest struct {
|
||
Content string `json:"content"`
|
||
}
|
||
|
||
type EmbeddingResponse struct {
|
||
Embedding []float32 `json:"embedding"`
|
||
}
|
||
|
||
func (s *llmServer) Embedding(ctx context.Context, input string) ([]float32, error) {
|
||
logutil.Trace("embedding request", "input", input)
|
||
|
||
if err := s.sem.Acquire(ctx, 1); err != nil {
|
||
if errors.Is(err, context.Canceled) {
|
||
slog.Info("aborting embedding request due to client closing the connection")
|
||
} else {
|
||
slog.Error("Failed to acquire semaphore", "error", err)
|
||
}
|
||
return nil, err
|
||
}
|
||
defer s.sem.Release(1)
|
||
|
||
// Make sure the server is ready
|
||
status, err := s.getServerStatusRetry(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
} else if status != ServerStatusReady {
|
||
return nil, fmt.Errorf("unexpected server status: %s", status)
|
||
}
|
||
|
||
data, err := json.Marshal(EmbeddingRequest{Content: input})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error marshaling embed data: %w", err)
|
||
}
|
||
|
||
r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/embedding", s.port), bytes.NewBuffer(data))
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error creating embed request: %w", err)
|
||
}
|
||
r.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := http.DefaultClient.Do(r)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("do embedding request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error reading embed response: %w", err)
|
||
}
|
||
|
||
if resp.StatusCode >= 400 {
|
||
log.Printf("llm embedding error: %s", body)
|
||
return nil, fmt.Errorf("%s", body)
|
||
}
|
||
|
||
var e EmbeddingResponse
|
||
if err := json.Unmarshal(body, &e); err != nil {
|
||
return nil, fmt.Errorf("unmarshal tokenize response: %w", err)
|
||
}
|
||
|
||
return e.Embedding, nil
|
||
}
|
||
|
||
type TokenizeRequest struct {
|
||
Content string `json:"content"`
|
||
}
|
||
|
||
type TokenizeResponse struct {
|
||
Tokens []int `json:"tokens"`
|
||
}
|
||
|
||
func (s *llmServer) Tokenize(ctx context.Context, content string) ([]int, error) {
|
||
s.llamaModelLock.Lock()
|
||
defer s.llamaModelLock.Unlock()
|
||
|
||
if s.llamaModel != nil {
|
||
return s.llamaModel.Tokenize(content, false, true)
|
||
}
|
||
if s.textProcessor != nil {
|
||
tokens, err := s.textProcessor.Encode(content, false)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
toks := make([]int, len(tokens))
|
||
for i, t := range tokens {
|
||
toks[i] = int(t)
|
||
}
|
||
return toks, nil
|
||
}
|
||
// not reached
|
||
return nil, fmt.Errorf("no tokenizer configured")
|
||
}
|
||
|
||
type DetokenizeRequest struct {
|
||
Tokens []int `json:"tokens"`
|
||
}
|
||
|
||
type DetokenizeResponse struct {
|
||
Content string `json:"content"`
|
||
}
|
||
|
||
func (s *llmServer) Detokenize(ctx context.Context, tokens []int) (string, error) {
|
||
s.llamaModelLock.Lock()
|
||
defer s.llamaModelLock.Unlock()
|
||
|
||
if s.llamaModel != nil {
|
||
var resp string
|
||
for _, token := range tokens {
|
||
resp += s.llamaModel.TokenToPiece(token)
|
||
}
|
||
return resp, nil
|
||
}
|
||
if s.textProcessor != nil {
|
||
toks := make([]int32, len(tokens))
|
||
for i, t := range tokens {
|
||
toks[i] = int32(t)
|
||
}
|
||
content, err := s.textProcessor.Decode(toks)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return content, nil
|
||
}
|
||
// not reached
|
||
return "", fmt.Errorf("no tokenizer configured")
|
||
}
|
||
|
||
func (s *llmServer) Close() error {
|
||
s.llamaModelLock.Lock()
|
||
if s.llamaModel != nil {
|
||
llama.FreeModel(s.llamaModel)
|
||
s.llamaModel = nil
|
||
}
|
||
s.llamaModelLock.Unlock()
|
||
|
||
if s.cmd != nil {
|
||
slog.Debug("stopping llama server", "pid", s.Pid())
|
||
if err := s.cmd.Process.Kill(); err != nil {
|
||
return err
|
||
}
|
||
// if ProcessState is already populated, Wait already completed, no need to wait again
|
||
if s.cmd.ProcessState == nil {
|
||
slog.Debug("waiting for llama server to exit", "pid", s.Pid())
|
||
<-s.done
|
||
}
|
||
|
||
slog.Debug("llama server stopped", "pid", s.Pid())
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *llamaServer) VRAMSize() uint64 {
|
||
return s.estimate.VRAMSize
|
||
}
|
||
|
||
func (s *llamaServer) TotalSize() uint64 {
|
||
return s.estimate.TotalSize
|
||
}
|
||
|
||
func (s *llamaServer) VRAMByGPU(id ml.DeviceID) uint64 {
|
||
for i, gpu := range s.gpus {
|
||
if gpu.DeviceID == id {
|
||
if i < len(s.estimate.GPUSizes) {
|
||
return s.estimate.GPUSizes[i]
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func (s *llamaServer) GetDeviceInfos(ctx context.Context) []ml.DeviceInfo {
|
||
slog.Debug("llamarunner free vram reporting not supported")
|
||
return nil
|
||
}
|
||
|
||
func (s *ollamaServer) VRAMSize() uint64 {
|
||
if s.mem == nil {
|
||
return 0
|
||
}
|
||
|
||
var mem uint64
|
||
|
||
for _, g := range s.mem.GPUs {
|
||
mem += g.Size()
|
||
}
|
||
|
||
// Some elements are always on CPU. However, if we have allocated all layers
|
||
// on the GPU then include the CPU components as well, to represent complete offloading.
|
||
noCPULayers := true
|
||
for i := range s.mem.CPU.Weights {
|
||
if s.mem.CPU.Weights[i] != 0 || s.mem.CPU.Cache[i] != 0 {
|
||
noCPULayers = false
|
||
break
|
||
}
|
||
}
|
||
if noCPULayers {
|
||
mem += s.mem.InputWeights
|
||
mem += s.mem.CPU.Graph
|
||
}
|
||
|
||
return mem
|
||
}
|
||
|
||
func (s *ollamaServer) TotalSize() uint64 {
|
||
if s.mem == nil {
|
||
return 0
|
||
}
|
||
|
||
mem := s.mem.InputWeights
|
||
mem += s.mem.CPU.Size()
|
||
for _, g := range s.mem.GPUs {
|
||
mem += g.Size()
|
||
}
|
||
|
||
return mem
|
||
}
|
||
|
||
func (s *ollamaServer) VRAMByGPU(id ml.DeviceID) uint64 {
|
||
if s.mem == nil {
|
||
return 0
|
||
}
|
||
|
||
for _, g := range s.mem.GPUs {
|
||
if g.DeviceID == id {
|
||
return g.Size()
|
||
}
|
||
}
|
||
|
||
return 0
|
||
}
|
||
|
||
func (s *ollamaServer) GetDeviceInfos(ctx context.Context) []ml.DeviceInfo {
|
||
devices, err := ml.GetDevicesFromRunner(ctx, s)
|
||
if err != nil {
|
||
if s.cmd != nil && s.cmd.ProcessState == nil {
|
||
// Still running but hit an error, log
|
||
slog.Debug("failure refreshing GPU information", "error", err)
|
||
}
|
||
// else no longer running so suppress logging as a failure is expected
|
||
}
|
||
return devices
|
||
}
|