From faa5074570d2e191f12d532a529ce231c48dea39 Mon Sep 17 00:00:00 2001 From: zareck <121526696+cassiozareck@users.noreply.github.com> Date: Sun, 3 Sep 2023 19:44:07 -0300 Subject: [PATCH 1/4] Expanding documentation in queue.go A set of terminology, along with a broader description, can help more people engage with the Gitea queue system, providing insights and ensuring its correct use. --- modules/queue/queue.go | 59 +++++++++++++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 0ab8dd4ae4773..34ca42ce33f78 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -1,27 +1,62 @@ // Copyright 2023 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -// Package queue implements a specialized queue system for Gitea. +// Package queue implements a specialized concurrent queue system for Gitea. // -// There are two major kinds of concepts: +// Terminology: // -// * The "base queue": channel, level, redis: -// - They have the same abstraction, the same interface, and they are tested by the same testing code. -// - The dummy(immediate) queue is special, it's not a real queue, it's only used as a no-op queue or a testing queue. +// 1. Task: +// - A task can be a simple value, such as an integer, or a more complex structure that has multiple fields and +// methods. The aim of a task is to be a unit of work, a set of tasks will be sent to a handler to be processed. // -// * The WorkerPoolQueue: it uses the "base queue" to provide "worker pool" function. -// - It calls the "handler" to process the data in the base queue. -// - Its "Push" function doesn't block forever, -// it will return an error if the queue is full after the timeout. +// 2. Batch: +// - A collection of tasks that are grouped together for processing. Each worker receives a batch of tasks. +// +// 3. Worker: +// - Individual unit of execution designed to process tasks from the queue. It's a goroutine that calls the Handler +// - Workers will get new tasks through a channel (WorkerPoolQueue is responsible for the distribution) +// - As workers operate in parallel, the default value of max workers is n/2, where n is the number of logical CPUs +// +// 4. Handler (represented by HandlerFuncT type): +// - It's the function responsible to process tasks. Each active worker will call this. +// - When processing these batches, there might be instances where certain tasks remain unprocessed or "unhandled". +// In such scenarios, the Handler ensures these unhandled tasks are returned to the base queue after a brief delay. +// This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is +// temporarily unavailable. It ensures that no task is skipped or lost due to transient failures in the processing +// mechanism. +// +// 5. Base queue: +// - Represents the underlying storage mechanism for the queue. There are several implementations: +// - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing. +// - Level, Redis: Especially useful in persistent queues and clusters, where we may have multiple nodes. +// - Dummy: This is special, it's not a real queue, it's only used as a no-op queue or a testing queue. +// - They all have the same abstraction, the same interface, and they are tested by the same testing code. +// +// 6. WorkerPoolQueue: +// - It's responsible to glue all together, using the "base queue" to provide "worker pool" functionality. He creates +// new workers if needed and can flush the queue, running all the tasks synchronously till it finishes. +// - Its "Push" function doesn't block forever, it will return an error if the queue is full after the timeout. +// +// 7. Manager: +// - The purpose of it is to serve as a centralized manager for multiple WorkerPoolQueue instances. Whenever we want +// to create a new queue, flush, or get a specific queue, we have to use it. // // A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items. // Unique queue's "Has" function can be used to check whether an item is already in the queue, // although it's not 100% reliable due to there is no proper transaction support. // Simple queue's "Has" function always returns "has=false". // -// The HandlerFuncT function is called by the WorkerPoolQueue to process the data in the base queue. -// If the handler returns "unhandled" items, they will be re-queued to the base queue after a slight delay, -// in case the item processor (eg: document indexer) is not available. +// A WorkerPoolQueue is a generic struct; this means it will work with any type but just for that type. +// If you want another kind of tasks to run, you would have to call the manager to create a new WorkerPoolQueue for you +// with a different handler that works with this new type of task. As an example of this: +// +// func Init() error { +// taskQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "task", handler) +// ... +// } +// func handler(items ...*admin_model.Task) []*admin_model.Task { ... } +// +// As you can see, the handler defined the admin_model.Task type for the queue package queue import "code.gitea.io/gitea/modules/util" From 99cdbaa860413bc9a730ad8051b453c6671f1d92 Mon Sep 17 00:00:00 2001 From: zareck <121526696+cassiozareck@users.noreply.github.com> Date: Wed, 6 Sep 2023 14:00:54 -0300 Subject: [PATCH 2/4] Update queue.go --- modules/queue/queue.go | 67 ++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 34ca42ce33f78..11bc257ff9c6e 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -2,61 +2,64 @@ // SPDX-License-Identifier: MIT // Package queue implements a specialized concurrent queue system for Gitea. -// + // Terminology: -// -// 1. Task: -// - A task can be a simple value, such as an integer, or a more complex structure that has multiple fields and -// methods. The aim of a task is to be a unit of work, a set of tasks will be sent to a handler to be processed. + +// 1. Item: +// - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields and +// methods. Usually a item serves as a unit of work, sets of items will be sent to a handler to be processed. +// - It's represented as a binary slice on the queue +// - When an item serve as a unit of work generally it will be referred as a task // // 2. Batch: -// - A collection of tasks that are grouped together for processing. Each worker receives a batch of tasks. +// - A collection of items that are grouped together for processing. Each worker receives a batch of items. // // 3. Worker: -// - Individual unit of execution designed to process tasks from the queue. It's a goroutine that calls the Handler -// - Workers will get new tasks through a channel (WorkerPoolQueue is responsible for the distribution) -// - As workers operate in parallel, the default value of max workers is n/2, where n is the number of logical CPUs +// - Individual unit of execution designed to process items from the queue. It's a goroutine that calls the Handler. +// - Workers will get new items through a channel (WorkerPoolQueue is responsible for the distribution). +// - Workers operate in parallel. The default value of max workers is determined by the setting system. // // 4. Handler (represented by HandlerFuncT type): -// - It's the function responsible to process tasks. Each active worker will call this. -// - When processing these batches, there might be instances where certain tasks remain unprocessed or "unhandled". -// In such scenarios, the Handler ensures these unhandled tasks are returned to the base queue after a brief delay. -// This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is -// temporarily unavailable. It ensures that no task is skipped or lost due to transient failures in the processing -// mechanism. +// - It's the function responsible for processing items. Each active worker will call it. +// - When processing batches, there might be instances where certain items remain unprocessed or "unhandled". +// In such scenarios, the Handler ensures these unhandled items are returned to the base queue after a brief delay. +// This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is +// temporarily unavailable. It ensures that no item is skipped or lost due to transient failures in the processing +// mechanism. // // 5. Base queue: -// - Represents the underlying storage mechanism for the queue. There are several implementations: -// - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing. -// - Level, Redis: Especially useful in persistent queues and clusters, where we may have multiple nodes. -// - Dummy: This is special, it's not a real queue, it's only used as a no-op queue or a testing queue. -// - They all have the same abstraction, the same interface, and they are tested by the same testing code. +// - Represents the underlying storage mechanism for the queue. There are several implementations: +// - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing. +// - LevelDB: Especially useful in persistent queues for single instances. +// - Redis: Suitable for clusters, where we may have multiple nodes. +// - Dummy: This is special, it's not a real queue, it's a immediate no-op queue, which is useful for tests. +// - They all have the same abstraction, the same interface, and they are tested by the same testing code. // // 6. WorkerPoolQueue: -// - It's responsible to glue all together, using the "base queue" to provide "worker pool" functionality. He creates -// new workers if needed and can flush the queue, running all the tasks synchronously till it finishes. +// - It's responsible to glue all together, using the "base queue" to provide "worker pool" functionality. It creates +// new workers if needed and can flush the queue, running all the items synchronously till it finishes. // - Its "Push" function doesn't block forever, it will return an error if the queue is full after the timeout. // // 7. Manager: // - The purpose of it is to serve as a centralized manager for multiple WorkerPoolQueue instances. Whenever we want -// to create a new queue, flush, or get a specific queue, we have to use it. +// to create a new queue, flush, or get a specific queue, we have to use it. // // A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items. // Unique queue's "Has" function can be used to check whether an item is already in the queue, -// although it's not 100% reliable due to there is no proper transaction support. +// although it's not 100% reliable due to the lack of proper transaction support. // Simple queue's "Has" function always returns "has=false". // // A WorkerPoolQueue is a generic struct; this means it will work with any type but just for that type. -// If you want another kind of tasks to run, you would have to call the manager to create a new WorkerPoolQueue for you -// with a different handler that works with this new type of task. As an example of this: +// If you want another kind of items to run, you would have to call the manager to create a new WorkerPoolQueue for you +// with a different handler that works with this new type of item. As an example of this: // -// func Init() error { -// taskQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "task", handler) -// ... -// } -// func handler(items ...*admin_model.Task) []*admin_model.Task { ... } +// func Init() error { +// itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "item", handler) +// ... +// } +// func handler(items ...*admin_model.Item) []*admin_model.Item { ... } // -// As you can see, the handler defined the admin_model.Task type for the queue +// As you can see, the handler defined the admin_model.Item type for the queue package queue import "code.gitea.io/gitea/modules/util" From 3fdcd8e8a7cd8b0d424228a3ba276e9f100835dd Mon Sep 17 00:00:00 2001 From: zareck Date: Thu, 7 Sep 2023 14:21:42 -0300 Subject: [PATCH 3/4] Update doc --- modules/queue/queue.go | 58 +++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 11bc257ff9c6e..a1256b405a0ea 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -2,47 +2,47 @@ // SPDX-License-Identifier: MIT // Package queue implements a specialized concurrent queue system for Gitea. - +// // Terminology: - +// // 1. Item: -// - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields and -// methods. Usually a item serves as a unit of work, sets of items will be sent to a handler to be processed. -// - It's represented as a binary slice on the queue -// - When an item serve as a unit of work generally it will be referred as a task +// - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields and +// methods. Usually a item serves as a unit of work, sets of items will be sent to a handler to be processed. +// - It's represented as a binary slice on the queue +// - When an item serve as a unit of work generally it will be referred as a task // // 2. Batch: -// - A collection of items that are grouped together for processing. Each worker receives a batch of items. +// - A collection of items that are grouped together for processing. Each worker receives a batch of items. // // 3. Worker: -// - Individual unit of execution designed to process items from the queue. It's a goroutine that calls the Handler. -// - Workers will get new items through a channel (WorkerPoolQueue is responsible for the distribution). -// - Workers operate in parallel. The default value of max workers is determined by the setting system. +// - Individual unit of execution designed to process items from the queue. It's a goroutine that calls the Handler. +// - Workers will get new items through a channel (WorkerPoolQueue is responsible for the distribution). +// - Workers operate in parallel. The default value of max workers is determined by the setting system. // // 4. Handler (represented by HandlerFuncT type): -// - It's the function responsible for processing items. Each active worker will call it. -// - When processing batches, there might be instances where certain items remain unprocessed or "unhandled". -// In such scenarios, the Handler ensures these unhandled items are returned to the base queue after a brief delay. -// This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is -// temporarily unavailable. It ensures that no item is skipped or lost due to transient failures in the processing -// mechanism. +// - It's the function responsible for processing items. Each active worker will call it. +// - When processing batches, there might be instances where certain items remain unprocessed or "unhandled". +// In such scenarios, the Handler ensures these unhandled items are returned to the base queue after a brief delay. +// This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is +// temporarily unavailable. It ensures that no item is skipped or lost due to transient failures in the processing +// mechanism. // // 5. Base queue: -// - Represents the underlying storage mechanism for the queue. There are several implementations: -// - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing. -// - LevelDB: Especially useful in persistent queues for single instances. -// - Redis: Suitable for clusters, where we may have multiple nodes. -// - Dummy: This is special, it's not a real queue, it's a immediate no-op queue, which is useful for tests. -// - They all have the same abstraction, the same interface, and they are tested by the same testing code. +// - Represents the underlying storage mechanism for the queue. There are several implementations: +// - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing. +// - LevelDB: Especially useful in persistent queues for single instances. +// - Redis: Suitable for clusters, where we may have multiple nodes. +// - Dummy: This is special, it's not a real queue, it's a immediate no-op queue, which is useful for tests. +// - They all have the same abstraction, the same interface, and they are tested by the same testing code. // // 6. WorkerPoolQueue: // - It's responsible to glue all together, using the "base queue" to provide "worker pool" functionality. It creates -// new workers if needed and can flush the queue, running all the items synchronously till it finishes. +// new workers if needed and can flush the queue, running all the items synchronously till it finishes. // - Its "Push" function doesn't block forever, it will return an error if the queue is full after the timeout. // // 7. Manager: // - The purpose of it is to serve as a centralized manager for multiple WorkerPoolQueue instances. Whenever we want -// to create a new queue, flush, or get a specific queue, we have to use it. +// to create a new queue, flush, or get a specific queue, we have to use it. // // A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items. // Unique queue's "Has" function can be used to check whether an item is already in the queue, @@ -53,11 +53,11 @@ // If you want another kind of items to run, you would have to call the manager to create a new WorkerPoolQueue for you // with a different handler that works with this new type of item. As an example of this: // -// func Init() error { -// itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "item", handler) -// ... -// } -// func handler(items ...*admin_model.Item) []*admin_model.Item { ... } +// func Init() error { +// itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "item", handler) +// ... +// } +// func handler(items ...*admin_model.Item) []*admin_model.Item { ... } // // As you can see, the handler defined the admin_model.Item type for the queue package queue From 79935b0ba1f93c89e3345ab4abd3e8edc81bf552 Mon Sep 17 00:00:00 2001 From: wxiaoguang Date: Fri, 8 Sep 2023 02:34:38 +0800 Subject: [PATCH 4/4] Update queue.go --- modules/queue/queue.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/modules/queue/queue.go b/modules/queue/queue.go index a1256b405a0ea..1ad7320e31f01 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -6,10 +6,9 @@ // Terminology: // // 1. Item: -// - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields and -// methods. Usually a item serves as a unit of work, sets of items will be sent to a handler to be processed. -// - It's represented as a binary slice on the queue -// - When an item serve as a unit of work generally it will be referred as a task +// - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields. +// Usually a item serves as a task or a message. Sets of items will be sent to a queue handler to be processed. +// - It's represented as a JSON-marshaled binary slice in the queue // // 2. Batch: // - A collection of items that are grouped together for processing. Each worker receives a batch of items. @@ -21,8 +20,8 @@ // // 4. Handler (represented by HandlerFuncT type): // - It's the function responsible for processing items. Each active worker will call it. -// - When processing batches, there might be instances where certain items remain unprocessed or "unhandled". -// In such scenarios, the Handler ensures these unhandled items are returned to the base queue after a brief delay. +// - If an item or some items are not psuccessfully rocessed, the handler could return them as "unhandled items". +// In such scenarios, the queue system ensures these unhandled items are returned to the base queue after a brief delay. // This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is // temporarily unavailable. It ensures that no item is skipped or lost due to transient failures in the processing // mechanism. @@ -42,7 +41,7 @@ // // 7. Manager: // - The purpose of it is to serve as a centralized manager for multiple WorkerPoolQueue instances. Whenever we want -// to create a new queue, flush, or get a specific queue, we have to use it. +// to create a new queue, flush, or get a specific queue, we could use it. // // A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items. // Unique queue's "Has" function can be used to check whether an item is already in the queue, @@ -54,12 +53,10 @@ // with a different handler that works with this new type of item. As an example of this: // // func Init() error { -// itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "item", handler) +// itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "queue-name", handler) // ... // } -// func handler(items ...*admin_model.Item) []*admin_model.Item { ... } -// -// As you can see, the handler defined the admin_model.Item type for the queue +// func handler(items ...*mypkg.QueueItem) []*mypkg.QueueItem { ... } package queue import "code.gitea.io/gitea/modules/util"