concurrent uploads

This commit is contained in:
Michael Yang
2023-10-09 10:24:27 -07:00
parent 3a1ed9ff70
commit 4e09aab8b9
4 changed files with 280 additions and 202 deletions

View File

@@ -981,46 +981,7 @@ func PushModel(ctx context.Context, name string, regOpts *RegistryOptions, fn fu
layers = append(layers, &manifest.Config)
for _, layer := range layers {
exists, err := checkBlobExistence(ctx, mp, layer.Digest, regOpts)
if err != nil {
return err
}
if exists {
fn(api.ProgressResponse{
Status: "using existing layer",
Digest: layer.Digest,
Total: layer.Size,
Completed: layer.Size,
})
log.Printf("Layer %s already exists", layer.Digest)
continue
}
fn(api.ProgressResponse{
Status: "starting upload",
Digest: layer.Digest,
Total: layer.Size,
})
location, chunkSize, err := startUpload(ctx, mp, layer, regOpts)
if err != nil {
log.Printf("couldn't start upload: %v", err)
return err
}
if strings.HasPrefix(filepath.Base(location.Path), "sha256:") {
layer.Digest = filepath.Base(location.Path)
fn(api.ProgressResponse{
Status: "using existing layer",
Digest: layer.Digest,
Total: layer.Size,
Completed: layer.Size,
})
continue
}
if err := uploadBlob(ctx, location, layer, chunkSize, regOpts, fn); err != nil {
if err := uploadBlob(ctx, mp, layer, regOpts, fn); err != nil {
log.Printf("error uploading blob: %v", err)
return err
}
@@ -1218,24 +1179,7 @@ func GetSHA256Digest(r io.Reader) (string, int64) {
return fmt.Sprintf("sha256:%x", h.Sum(nil)), n
}
// Function to check if a blob already exists in the Docker registry
func checkBlobExistence(ctx context.Context, mp ModelPath, digest string, regOpts *RegistryOptions) (bool, error) {
requestURL := mp.BaseURL()
requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", digest)
resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, regOpts)
if err != nil {
log.Printf("couldn't check for blob: %v", err)
return false, err
}
defer resp.Body.Close()
// Check for success: If the blob exists, the Docker registry will respond with a 200 OK
return resp.StatusCode < http.StatusBadRequest, nil
}
func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.ReadSeeker, regOpts *RegistryOptions) (*http.Response, error) {
var status string
for try := 0; try < maxRetries; try++ {
resp, err := makeRequest(ctx, method, requestURL, headers, body, regOpts)
if err != nil {
@@ -1243,8 +1187,6 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR
return nil, err
}
status = resp.Status
switch {
case resp.StatusCode == http.StatusUnauthorized:
auth := resp.Header.Get("www-authenticate")
@@ -1270,7 +1212,7 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR
}
}
return nil, fmt.Errorf("max retry exceeded: %v", status)
return nil, errMaxRetriesExceeded
}
func makeRequest(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.Reader, regOpts *RegistryOptions) (*http.Response, error) {