From 28d8af3569ea8553de19695259eb0abe9167ca73 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Fri, 16 Aug 2024 14:24:18 -0700 Subject: [PATCH 1/5] Fix upload maven pacakge parallelly --- routers/api/packages/maven/maven.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/routers/api/packages/maven/maven.go b/routers/api/packages/maven/maven.go index 1486e83c57db5..bf0e2b4e71a4a 100644 --- a/routers/api/packages/maven/maven.go +++ b/routers/api/packages/maven/maven.go @@ -24,6 +24,7 @@ import ( "code.gitea.io/gitea/modules/log" packages_module "code.gitea.io/gitea/modules/packages" maven_module "code.gitea.io/gitea/modules/packages/maven" + "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/routers/api/packages/helper" "code.gitea.io/gitea/services/context" packages_service "code.gitea.io/gitea/services/packages" @@ -223,6 +224,9 @@ func servePackageFile(ctx *context.Context, params parameters, serveContent bool helper.ServePackageFile(ctx, s, u, pf, opts) } +// Prevent duplicate upload for the same file. +var uploadLock = sync.NewStatusTable() + // UploadPackageFile adds a file to the package. If the package does not exist, it gets created. func UploadPackageFile(ctx *context.Context) { params, err := extractPathParameters(ctx) @@ -241,6 +245,10 @@ func UploadPackageFile(ctx *context.Context) { packageName := params.GroupID + "-" + params.ArtifactID + // for the same package, only one upload at a time + uploadLock.Start(packageName) + defer uploadLock.Stop(packageName) + buf, err := packages_module.CreateHashedBufferFromReader(ctx.Req.Body) if err != nil { apiError(ctx, http.StatusInternalServerError, err) From dddf5592a860b2affe1ee16366cf1a1c227469b3 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 19 Aug 2024 08:41:16 -0700 Subject: [PATCH 2/5] Fix bug and add tests for maven uploads --- routers/api/packages/maven/maven.go | 6 ++-- tests/integration/api_packages_maven_test.go | 33 ++++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/routers/api/packages/maven/maven.go b/routers/api/packages/maven/maven.go index bf0e2b4e71a4a..b72eec6a39085 100644 --- a/routers/api/packages/maven/maven.go +++ b/routers/api/packages/maven/maven.go @@ -225,7 +225,7 @@ func servePackageFile(ctx *context.Context, params parameters, serveContent bool } // Prevent duplicate upload for the same file. -var uploadLock = sync.NewStatusTable() +var uploadLock = sync.NewExclusivePool() // UploadPackageFile adds a file to the package. If the package does not exist, it gets created. func UploadPackageFile(ctx *context.Context) { @@ -246,8 +246,8 @@ func UploadPackageFile(ctx *context.Context) { packageName := params.GroupID + "-" + params.ArtifactID // for the same package, only one upload at a time - uploadLock.Start(packageName) - defer uploadLock.Stop(packageName) + uploadLock.CheckIn(packageName) + defer uploadLock.CheckOut(packageName) buf, err := packages_module.CreateHashedBufferFromReader(ctx.Req.Body) if err != nil { diff --git a/tests/integration/api_packages_maven_test.go b/tests/integration/api_packages_maven_test.go index 0466a727b25f0..e54238858c2ea 100644 --- a/tests/integration/api_packages_maven_test.go +++ b/tests/integration/api_packages_maven_test.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "strings" + "sync" "testing" "code.gitea.io/gitea/models/db" @@ -252,3 +253,35 @@ func TestPackageMaven(t *testing.T) { assert.True(t, test.IsNormalPageCompleted(resp.Body.String())) }) } + +func TestPackageMavenConcurrent(t *testing.T) { + defer tests.PrepareTestEnv(t)() + + user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + groupID := "com.gitea" + artifactID := "test-project" + packageVersion := "1.0.1" + + root := fmt.Sprintf("/api/packages/%s/maven/%s/%s", user.Name, strings.ReplaceAll(groupID, ".", "/"), artifactID) + + putFile := func(t *testing.T, path, content string, expectedStatus int) { + req := NewRequestWithBody(t, "PUT", root+path, strings.NewReader(content)). + AddBasicAuth(user.Name) + MakeRequest(t, req, expectedStatus) + } + + t.Run("Concurrent Upload", func(t *testing.T) { + defer tests.PrintCurrentTest(t)() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + putFile(t, fmt.Sprintf("/%s/%s.jar", packageVersion, strconv.Itoa(i)), "test", http.StatusCreated) + wg.Done() + }(i) + } + wg.Wait() + }) +} From 8b6df3c15d903e4aa3449a855d51515d8ce6f598 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 26 Aug 2024 10:44:30 -0700 Subject: [PATCH 3/5] use globallock lock --- routers/api/packages/maven/maven.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/routers/api/packages/maven/maven.go b/routers/api/packages/maven/maven.go index b72eec6a39085..435666f21fcff 100644 --- a/routers/api/packages/maven/maven.go +++ b/routers/api/packages/maven/maven.go @@ -20,11 +20,11 @@ import ( "strings" packages_model "code.gitea.io/gitea/models/packages" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" packages_module "code.gitea.io/gitea/modules/packages" maven_module "code.gitea.io/gitea/modules/packages/maven" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/routers/api/packages/helper" "code.gitea.io/gitea/services/context" packages_service "code.gitea.io/gitea/services/packages" @@ -224,8 +224,9 @@ func servePackageFile(ctx *context.Context, params parameters, serveContent bool helper.ServePackageFile(ctx, s, u, pf, opts) } -// Prevent duplicate upload for the same file. -var uploadLock = sync.NewExclusivePool() +func mavenPkgNameKey(packageName string) string { + return "pkg_maven_" + packageName +} // UploadPackageFile adds a file to the package. If the package does not exist, it gets created. func UploadPackageFile(ctx *context.Context) { @@ -246,8 +247,12 @@ func UploadPackageFile(ctx *context.Context) { packageName := params.GroupID + "-" + params.ArtifactID // for the same package, only one upload at a time - uploadLock.CheckIn(packageName) - defer uploadLock.CheckOut(packageName) + ctx, releaser, err := globallock.Lock(ctx, mavenPkgNameKey(packageName)) + if err != nil { + apiError(ctx, http.StatusInternalServerError, err) + return + } + defer releaser() buf, err := packages_module.CreateHashedBufferFromReader(ctx.Req.Body) if err != nil { From b08a9165186b5923f2f271fb7f3abaa09ee28ac9 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 26 Aug 2024 11:04:31 -0700 Subject: [PATCH 4/5] use globallock lock --- routers/api/packages/maven/maven.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/routers/api/packages/maven/maven.go b/routers/api/packages/maven/maven.go index 435666f21fcff..6b3b31997be37 100644 --- a/routers/api/packages/maven/maven.go +++ b/routers/api/packages/maven/maven.go @@ -247,7 +247,7 @@ func UploadPackageFile(ctx *context.Context) { packageName := params.GroupID + "-" + params.ArtifactID // for the same package, only one upload at a time - ctx, releaser, err := globallock.Lock(ctx, mavenPkgNameKey(packageName)) + stdCtx, releaser, err := globallock.Lock(ctx, mavenPkgNameKey(packageName)) if err != nil { apiError(ctx, http.StatusInternalServerError, err) return @@ -276,7 +276,7 @@ func UploadPackageFile(ctx *context.Context) { // Do not upload checksum files but compare the hashes. if isChecksumExtension(ext) { - pv, err := packages_model.GetVersionByNameAndVersion(ctx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) + pv, err := packages_model.GetVersionByNameAndVersion(stdCtx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) if err != nil { if err == packages_model.ErrPackageNotExist { apiError(ctx, http.StatusNotFound, err) @@ -285,7 +285,7 @@ func UploadPackageFile(ctx *context.Context) { apiError(ctx, http.StatusInternalServerError, err) return } - pf, err := packages_model.GetFileForVersionByName(ctx, pv.ID, params.Filename[:len(params.Filename)-len(ext)], packages_model.EmptyFileKey) + pf, err := packages_model.GetFileForVersionByName(stdCtx, pv.ID, params.Filename[:len(params.Filename)-len(ext)], packages_model.EmptyFileKey) if err != nil { if err == packages_model.ErrPackageFileNotExist { apiError(ctx, http.StatusNotFound, err) @@ -294,7 +294,7 @@ func UploadPackageFile(ctx *context.Context) { apiError(ctx, http.StatusInternalServerError, err) return } - pb, err := packages_model.GetBlobByID(ctx, pf.BlobID) + pb, err := packages_model.GetBlobByID(stdCtx, pf.BlobID) if err != nil { apiError(ctx, http.StatusInternalServerError, err) return @@ -340,7 +340,7 @@ func UploadPackageFile(ctx *context.Context) { } if pvci.Metadata != nil { - pv, err := packages_model.GetVersionByNameAndVersion(ctx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) + pv, err := packages_model.GetVersionByNameAndVersion(stdCtx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) if err != nil && err != packages_model.ErrPackageNotExist { apiError(ctx, http.StatusInternalServerError, err) return @@ -352,7 +352,7 @@ func UploadPackageFile(ctx *context.Context) { return } pv.MetadataJSON = string(raw) - if err := packages_model.UpdateVersion(ctx, pv); err != nil { + if err := packages_model.UpdateVersion(stdCtx, pv); err != nil { apiError(ctx, http.StatusInternalServerError, err) return } @@ -366,7 +366,7 @@ func UploadPackageFile(ctx *context.Context) { } _, _, err = packages_service.CreatePackageOrAddFileToExisting( - ctx, + stdCtx, pvci, pfci, ) From 8b91adc35c83e24666056a84e02318e293154990 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Thu, 29 Aug 2024 13:23:17 -0700 Subject: [PATCH 5/5] Update because globallock updated --- routers/api/packages/maven/maven.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/routers/api/packages/maven/maven.go b/routers/api/packages/maven/maven.go index 6b3b31997be37..07e34da8e2df5 100644 --- a/routers/api/packages/maven/maven.go +++ b/routers/api/packages/maven/maven.go @@ -247,7 +247,7 @@ func UploadPackageFile(ctx *context.Context) { packageName := params.GroupID + "-" + params.ArtifactID // for the same package, only one upload at a time - stdCtx, releaser, err := globallock.Lock(ctx, mavenPkgNameKey(packageName)) + releaser, err := globallock.Lock(ctx, mavenPkgNameKey(packageName)) if err != nil { apiError(ctx, http.StatusInternalServerError, err) return @@ -276,7 +276,7 @@ func UploadPackageFile(ctx *context.Context) { // Do not upload checksum files but compare the hashes. if isChecksumExtension(ext) { - pv, err := packages_model.GetVersionByNameAndVersion(stdCtx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) + pv, err := packages_model.GetVersionByNameAndVersion(ctx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) if err != nil { if err == packages_model.ErrPackageNotExist { apiError(ctx, http.StatusNotFound, err) @@ -285,7 +285,7 @@ func UploadPackageFile(ctx *context.Context) { apiError(ctx, http.StatusInternalServerError, err) return } - pf, err := packages_model.GetFileForVersionByName(stdCtx, pv.ID, params.Filename[:len(params.Filename)-len(ext)], packages_model.EmptyFileKey) + pf, err := packages_model.GetFileForVersionByName(ctx, pv.ID, params.Filename[:len(params.Filename)-len(ext)], packages_model.EmptyFileKey) if err != nil { if err == packages_model.ErrPackageFileNotExist { apiError(ctx, http.StatusNotFound, err) @@ -294,7 +294,7 @@ func UploadPackageFile(ctx *context.Context) { apiError(ctx, http.StatusInternalServerError, err) return } - pb, err := packages_model.GetBlobByID(stdCtx, pf.BlobID) + pb, err := packages_model.GetBlobByID(ctx, pf.BlobID) if err != nil { apiError(ctx, http.StatusInternalServerError, err) return @@ -340,7 +340,7 @@ func UploadPackageFile(ctx *context.Context) { } if pvci.Metadata != nil { - pv, err := packages_model.GetVersionByNameAndVersion(stdCtx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) + pv, err := packages_model.GetVersionByNameAndVersion(ctx, pvci.Owner.ID, pvci.PackageType, pvci.Name, pvci.Version) if err != nil && err != packages_model.ErrPackageNotExist { apiError(ctx, http.StatusInternalServerError, err) return @@ -352,7 +352,7 @@ func UploadPackageFile(ctx *context.Context) { return } pv.MetadataJSON = string(raw) - if err := packages_model.UpdateVersion(stdCtx, pv); err != nil { + if err := packages_model.UpdateVersion(ctx, pv); err != nil { apiError(ctx, http.StatusInternalServerError, err) return } @@ -366,7 +366,7 @@ func UploadPackageFile(ctx *context.Context) { } _, _, err = packages_service.CreatePackageOrAddFileToExisting( - stdCtx, + ctx, pvci, pfci, )