diff --git a/ml/backend/ggml/ggml/src/ggml-cuda/ggml-cuda.cu b/ml/backend/ggml/ggml/src/ggml-cuda/ggml-cuda.cu index 0241d09a..8c13f119 100644 --- a/ml/backend/ggml/ggml/src/ggml-cuda/ggml-cuda.cu +++ b/ml/backend/ggml/ggml/src/ggml-cuda/ggml-cuda.cu @@ -598,41 +598,106 @@ static enum ggml_status ggml_backend_cuda_buffer_init_tensor(ggml_backend_buffer static void ggml_backend_cuda_buffer_memset_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, uint8_t value, size_t offset, size_t size) { ggml_backend_cuda_buffer_context * ctx = (ggml_backend_cuda_buffer_context *)buffer->context; - ggml_cuda_set_device(ctx->device); - CUDA_CHECK(cudaMemsetAsync((char *)tensor->data + offset, value, size, cudaStreamPerThread)); - CUDA_CHECK(cudaStreamSynchronize(cudaStreamPerThread)); + // Device context must be set correctly - critical for model functionality + CUDA_CHECK(cudaSetDevice(ctx->device)); + + cudaError_t memset_result = cudaMemsetAsync((char *)tensor->data + offset, value, size, cudaStreamPerThread); + if (memset_result != cudaSuccess) { + GGML_LOG_ERROR("cudaMemsetAsync failed on device %d: %s\n", + ctx->device, cudaGetErrorString(memset_result)); + cudaGetLastError(); // Clear error state + return; + } + + cudaError_t sync_result = cudaStreamSynchronize(cudaStreamPerThread); + if (sync_result != cudaSuccess) { + GGML_LOG_ERROR("Stream synchronization failed on device %d in buffer_memset_tensor: %s\n", + ctx->device, cudaGetErrorString(sync_result)); + cudaGetLastError(); // Clear error state + return; + } } static void ggml_backend_cuda_buffer_set_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, const void * data, size_t offset, size_t size) { ggml_backend_cuda_buffer_context * ctx = (ggml_backend_cuda_buffer_context *)buffer->context; - ggml_cuda_set_device(ctx->device); - CUDA_CHECK(cudaMemcpyAsync((char *)tensor->data + offset, data, size, cudaMemcpyHostToDevice, cudaStreamPerThread)); - CUDA_CHECK(cudaStreamSynchronize(cudaStreamPerThread)); + // Device context must be set correctly - critical for model functionality + CUDA_CHECK(cudaSetDevice(ctx->device)); + + cudaError_t copy_result = cudaMemcpyAsync((char *)tensor->data + offset, data, size, cudaMemcpyHostToDevice, cudaStreamPerThread); + if (copy_result != cudaSuccess) { + GGML_LOG_ERROR("cudaMemcpyAsync failed on device %d: %s\n", + ctx->device, cudaGetErrorString(copy_result)); + cudaGetLastError(); // Clear error state + return; + } + + cudaError_t sync_result = cudaStreamSynchronize(cudaStreamPerThread); + if (sync_result != cudaSuccess) { + GGML_LOG_ERROR("Stream synchronization failed on device %d in buffer_set_tensor: %s\n", + ctx->device, cudaGetErrorString(sync_result)); + cudaGetLastError(); // Clear error state + return; + } } static void ggml_backend_cuda_buffer_get_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * tensor, void * data, size_t offset, size_t size) { ggml_backend_cuda_buffer_context * ctx = (ggml_backend_cuda_buffer_context *)buffer->context; - ggml_cuda_set_device(ctx->device); - CUDA_CHECK(cudaMemcpyAsync(data, (const char *)tensor->data + offset, size, cudaMemcpyDeviceToHost, cudaStreamPerThread)); - CUDA_CHECK(cudaStreamSynchronize(cudaStreamPerThread)); + // Device context must be set correctly - critical for model functionality + CUDA_CHECK(cudaSetDevice(ctx->device)); + + cudaError_t copy_result = cudaMemcpyAsync(data, (const char *)tensor->data + offset, size, cudaMemcpyDeviceToHost, cudaStreamPerThread); + if (copy_result != cudaSuccess) { + GGML_LOG_ERROR("cudaMemcpyAsync failed on device %d: %s\n", + ctx->device, cudaGetErrorString(copy_result)); + cudaGetLastError(); // Clear error state + return; + } + + cudaError_t sync_result = cudaStreamSynchronize(cudaStreamPerThread); + if (sync_result != cudaSuccess) { + GGML_LOG_ERROR("Stream synchronization failed on device %d in buffer_get_tensor: %s\n", + ctx->device, cudaGetErrorString(sync_result)); + cudaGetLastError(); // Clear error state + return; + } } static bool ggml_backend_cuda_buffer_cpy_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * src, ggml_tensor * dst) { if (ggml_backend_buffer_is_cuda(src->buffer)) { ggml_backend_cuda_buffer_context * src_ctx = (ggml_backend_cuda_buffer_context *)src->buffer->context; ggml_backend_cuda_buffer_context * dst_ctx = (ggml_backend_cuda_buffer_context *)dst->buffer->context; + + // Device context must be set correctly - critical for model functionality + CUDA_CHECK(cudaSetDevice(dst_ctx->device)); + + cudaError_t copy_result; if (src_ctx->device == dst_ctx->device) { - CUDA_CHECK(cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(src), cudaMemcpyDeviceToDevice, cudaStreamPerThread)); + copy_result = cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(src), cudaMemcpyDeviceToDevice, cudaStreamPerThread); } else { #ifdef GGML_CUDA_NO_PEER_COPY return false; #else - CUDA_CHECK(cudaMemcpyPeerAsync(dst->data, dst_ctx->device, src->data, src_ctx->device, ggml_nbytes(src), cudaStreamPerThread)); + copy_result = cudaMemcpyPeerAsync(dst->data, dst_ctx->device, src->data, src_ctx->device, ggml_nbytes(src), cudaStreamPerThread); #endif } - CUDA_CHECK(cudaStreamSynchronize(cudaStreamPerThread)); + + if (copy_result != cudaSuccess) { + GGML_LOG_ERROR("cudaMemcpy%sAsync failed (src device %d -> dst device %d): %s\n", + (src_ctx->device == dst_ctx->device) ? "" : "Peer", + src_ctx->device, dst_ctx->device, cudaGetErrorString(copy_result)); + cudaGetLastError(); // Clear error state + return false; + } + + cudaError_t sync_result = cudaStreamSynchronize(cudaStreamPerThread); + if (sync_result != cudaSuccess) { + GGML_LOG_ERROR("Stream synchronization failed on device %d in buffer_cpy_tensor: %s\n", + dst_ctx->device, cudaGetErrorString(sync_result)); + cudaGetLastError(); // Clear error state + return false; + } return true; } return false; @@ -2503,6 +2568,18 @@ static bool ggml_backend_cuda_cpy_tensor_async(ggml_backend_t backend_src, ggml_ static void ggml_backend_cuda_synchronize(ggml_backend_t backend) { ggml_backend_cuda_context * cuda_ctx = (ggml_backend_cuda_context *)backend->context; + // Device context must be set correctly - critical for model functionality + CUDA_CHECK(cudaSetDevice(cuda_ctx->device)); + + // Check if stream is still valid before synchronization + cudaError_t query_result = cudaStreamQuery(cuda_ctx->stream()); + if (query_result != cudaSuccess && query_result != cudaErrorNotReady) { + GGML_LOG_ERROR("Stream validation failed on device %d: %s\n", + cuda_ctx->device, cudaGetErrorString(query_result)); + return; + } + + // Use CUDA_CHECK for inference operations - we want to crash on errors, not silently fail CUDA_CHECK(cudaStreamSynchronize(cuda_ctx->stream())); GGML_UNUSED(backend); diff --git a/server/sched.go b/server/sched.go index 2842bb3a..f556bc19 100644 --- a/server/sched.go +++ b/server/sched.go @@ -246,18 +246,96 @@ func (s *Scheduler) processPending(ctx context.Context) { // Update free memory from currently loaded models s.updateFreeSpace(availGpus) + + // Check if this model requires multiple GPUs (Tesla K80 fix) + // If so, we need to ensure ALL required GPUs are clear of other models fitGpus := pickBestFullFitByLibrary(pending, ggml, availGpus, &numParallel) if fitGpus != nil { - slog.Debug("new model fits with existing models, loading") - s.loadFn(pending, ggml, fitGpus, numParallel) - break + // Check if this is a multi-GPU model request + if len(fitGpus) > 1 { + slog.Debug("multi-GPU model detected, checking for conflicts", + "target_model", pending.model.ModelPath, + "gpu_count", len(fitGpus)) + // Check if any of the target GPUs have loaded models + hasConflict := false + s.loadedMu.Lock() + for _, loadedRunner := range s.loaded { + if loadedRunner.loading { + slog.Debug("skipping loading model", "model", loadedRunner.modelPath) + continue // Skip models that are still loading + } + slog.Debug("checking loaded model for conflicts", + "loaded_model", loadedRunner.modelPath, + "loaded_gpus", len(loadedRunner.gpus)) + // Check if any loaded model is using any of our target GPUs + for _, targetGpu := range fitGpus { + for _, loadedGpu := range loadedRunner.gpus { + if targetGpu.ID == loadedGpu.ID { + slog.Warn("multi-GPU model conflicts with loaded model", + "target_model", pending.model.ModelPath, + "loaded_model", loadedRunner.modelPath, + "conflicting_gpu", targetGpu.ID) + hasConflict = true + break + } + } + if hasConflict { + break + } + } + if hasConflict { + break + } + } + s.loadedMu.Unlock() + + if hasConflict { + // Check if conflicting models are still active (have refCount > 0) + conflictingRunner := s.findConflictingRunnerToUnload(fitGpus) + if conflictingRunner != nil { + conflictingRunner.refMu.Lock() + isActive := conflictingRunner.refCount > 0 + conflictingRunner.refMu.Unlock() + + if isActive { + // Conflicting model is still processing, delay this request + slog.Warn("conflicting model is still active, delaying multi-GPU request", + "conflicting_model", conflictingRunner.modelPath, + "target_model", pending.model.ModelPath) + go func() { + time.Sleep(s.reschedDelay) + s.pendingReqCh <- pending + }() + break + } else { + // Conflicting model is idle, can unload it + slog.Warn("found idle conflicting runner to unload", + "runner", conflictingRunner.modelPath, + "refCount", conflictingRunner.refCount) + runnerToExpire = conflictingRunner + slog.Warn("setting runnerToExpire to trigger unload", "runner", runnerToExpire.modelPath) + // Don't break here - let the normal flow handle the unload + } + } else { + slog.Error("failed to find conflicting runner despite detecting conflict!") + } + } else { + slog.Debug("no conflicts detected for multi-GPU model") + } + } + + if runnerToExpire == nil { + slog.Debug("new model fits with existing models, loading") + s.loadFn(pending, ggml, fitGpus, numParallel) + break + } } // We couldn't find a set of GPUs to fully load the new // model. If no other models are loading (both GPU lists // are the same) then we need to unload another model to // make room - if len(availGpus) < len(gpus) { + if runnerToExpire == nil && len(availGpus) < len(gpus) { // There are other requests pending, and this one // needs more time, so put it on the back of the // queue so that we might satisfy other pending @@ -271,25 +349,32 @@ func (s *Scheduler) processPending(ctx context.Context) { }() break } - runnerToExpire = s.findRunnerToUnload() + if runnerToExpire == nil { + runnerToExpire = s.findRunnerToUnload() + } } } + slog.Warn("exited model selection, checking runnerToExpire", "runnerToExpire", runnerToExpire != nil) if runnerToExpire == nil { // Shouildn't happen slog.Error("runner to expire was nil!") continue } // Trigger an expiration to unload once it's done + slog.Warn("attempting to unload runner", "runner", runnerToExpire.modelPath) runnerToExpire.refMu.Lock() - slog.Debug("resetting model to expire immediately to make room", "runner", runnerToExpire, "refCount", runnerToExpire.refCount) + slog.Warn("resetting model to expire immediately to make room", "runner", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount) if runnerToExpire.expireTimer != nil { runnerToExpire.expireTimer.Stop() runnerToExpire.expireTimer = nil } runnerToExpire.sessionDuration = 0 if runnerToExpire.refCount <= 0 { + slog.Warn("sending idle runner to expired channel", "runner", runnerToExpire.modelPath) s.expiredCh <- runnerToExpire + } else { + slog.Warn("runner still has references, waiting for refCount to reach 0", "runner", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount) } runnerToExpire.refMu.Unlock() // Wait for the unload to happen @@ -301,7 +386,7 @@ func (s *Scheduler) processPending(ctx context.Context) { slog.Debug("shutting down scheduler pending loop") return case <-s.unloadedCh: - slog.Debug("unload completed", "runner", runnerToExpire) + slog.Warn("unload completed, retrying model load", "runner", runnerToExpire) continue } } @@ -830,6 +915,34 @@ func pickBestPartialFitByLibrary(req *LlmRequest, f *ggml.GGML, gpus discover.Gp return byLibrary[bestFit] } +// findConflictingRunnerToUnload finds a specific runner that conflicts with target GPUs +func (s *Scheduler) findConflictingRunnerToUnload(targetGpus discover.GpuInfoList) *runnerRef { + s.loadedMu.Lock() + defer s.loadedMu.Unlock() + + // Find the first loaded model that uses any of our target GPUs + for _, loadedRunner := range s.loaded { + if loadedRunner.loading { + continue // Skip models that are still loading + } + + // Check if this loaded model is using any of our target GPUs + for _, targetGpu := range targetGpus { + for _, loadedGpu := range loadedRunner.gpus { + if targetGpu.ID == loadedGpu.ID { + slog.Debug("found conflicting runner using GPU", + "runner", loadedRunner.modelPath, + "gpu", targetGpu.ID) + return loadedRunner + } + } + } + } + + slog.Debug("no conflicting runner found for target GPUs") + return nil +} + // findRunnerToUnload finds a runner to unload to make room for a new model func (s *Scheduler) findRunnerToUnload() *runnerRef { s.loadedMu.Lock()