Skip to content

Added hook to lifecycle events #1150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions Sources/DistributedCluster/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,9 @@ extension ClusterSystem {
if let wellKnownName = actor.id.metadata.wellKnown {
self._managedWellKnownDistributedActors[wellKnownName] = actor
}
for hook in self.settings.plugins.actorLifecycleHooks {
hook.onActorReady(actor)
}
}

/// Advertise to the cluster system that a "well known" distributed actor has become ready.
Expand Down Expand Up @@ -1125,6 +1128,9 @@ extension ClusterSystem {

// Well-known actors are held strongly and should be released using `releaseWellKnownActorID`
}
for hook in self.settings.plugins.actorLifecycleHooks {
hook.onResignID(id)
}
}

public func releaseWellKnownActorID(_ id: ActorID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public actor ClusterSingletonPlugin {
private var actorSystem: ClusterSystem!

public init() {
self.actorSystem = nil // 'actorSystem' is filled in later on in _Plugin.start()
self.actorSystem = nil // 'actorSystem' is filled in later on in Plugin.start()
}

public func proxy<Act>(
Expand Down Expand Up @@ -101,7 +101,7 @@ public actor ClusterSingletonPlugin {
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Plugin protocol conformance

extension ClusterSingletonPlugin: _Plugin {
extension ClusterSingletonPlugin: Plugin {
static let pluginKey: Key = "$clusterSingleton"

public nonisolated var key: Key {
Expand Down
31 changes: 21 additions & 10 deletions Sources/DistributedCluster/Plugins/ClusterSystem+Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Plugin protocol

public protocol _AnyPlugin {
public protocol AnyPlugin {
/// Starts the plugin.
func start(_ system: ClusterSystem) async throws

Expand All @@ -24,24 +24,24 @@ public protocol _AnyPlugin {
}

/// A plugin provides specific features and capabilities (e.g., singleton) to a `ClusterSystem`.
public protocol _Plugin: _AnyPlugin {
typealias Key = _PluginKey<Self>
public protocol Plugin: AnyPlugin {
typealias Key = PluginKey<Self>

/// The plugin's unique identifier
var key: Key { get }
}

internal struct BoxedPlugin: _AnyPlugin {
private let underlying: _AnyPlugin
internal struct BoxedPlugin: AnyPlugin {
private let underlying: AnyPlugin

let key: AnyPluginKey

init<P: _Plugin>(_ plugin: P) {
init<P: Plugin>(_ plugin: P) {
self.underlying = plugin
self.key = AnyPluginKey(plugin.key)
}

func unsafeUnwrapAs<P: _Plugin>(_: P.Type) -> P {
func unsafeUnwrapAs<P: Plugin>(_: P.Type) -> P {
guard let unwrapped = self.underlying as? P else {
fatalError("Type mismatch, expected: [\(String(reflecting: P.self))] got [\(self.underlying)]")
}
Expand All @@ -60,7 +60,7 @@ internal struct BoxedPlugin: _AnyPlugin {
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Plugin key

public struct _PluginKey<P: _Plugin>: CustomStringConvertible, ExpressibleByStringLiteral {
public struct PluginKey<P: Plugin>: CustomStringConvertible, ExpressibleByStringLiteral {
public let plugin: String
public let sub: String?

Expand All @@ -81,7 +81,7 @@ public struct _PluginKey<P: _Plugin>: CustomStringConvertible, ExpressibleByStri
self.sub = sub
}

public func makeSub(_ sub: String) -> _PluginKey {
public func makeSub(_ sub: String) -> PluginKey {
precondition(self.sub == nil, "Cannot make a sub plugin key from \(self) (sub MUST be nil)")
return .init(plugin: self.plugin, sub: sub)
}
Expand All @@ -100,7 +100,7 @@ internal struct AnyPluginKey: Hashable, CustomStringConvertible {
let plugin: String
let sub: String?

init<P: _Plugin>(_ key: _PluginKey<P>) {
init<P: Plugin>(_ key: PluginKey<P>) {
self.pluginTypeId = ObjectIdentifier(P.self)
self.plugin = key.plugin
self.sub = key.sub
Expand All @@ -114,3 +114,14 @@ internal struct AnyPluginKey: Hashable, CustomStringConvertible {
}
}
}

/// Kind of `ClusterSystem` plugin which will be invoked during an actor's `actorReady`
/// and `resignID` lifecycle hooks.
///
/// The ready hook is allowed to modify the ID, e.g. by adding additional metadata to it.
/// The plugin should carefully manage retaining actors and document if it does have strong references to them,
/// and how end-users should go about releasing them.
public protocol ActorLifecyclePlugin: Plugin {
func onActorReady<Act: DistributedActor>(_ actor: Act) where Act: DistributedActor, Act.ID == ClusterSystem.ActorID
func onResignID(_ id: ClusterSystem.ActorID)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ public struct PluginsSettings {
}

internal var plugins: [BoxedPlugin] = []
internal var actorLifecycleHooks: [any ActorLifecyclePlugin] = []

public init() {}

/// Adds a `_Plugin`.
/// Adds a `Plugin`.
///
/// - Note: A plugin that depends on others should be added *after* its dependencies.
/// - Faults, when plugin of the exact same `PluginKey` is already included in the settings.
@available(*, deprecated, message: "use settings.install(plugin:) instead")
public mutating func add<P: _Plugin>(_ plugin: P) {
public mutating func add<P: Plugin>(_ plugin: P) {
self.install(plugin: plugin)
}

/// Returns `Plugin` identified by `key`.
public subscript<P: _Plugin>(_ key: _PluginKey<P>) -> P? {
public subscript<P: Plugin>(_ key: PluginKey<P>) -> P? {
self.plugins.first { $0.key == key.asAny }?.unsafeUnwrapAs(P.self)
}

Expand Down Expand Up @@ -69,30 +70,32 @@ extension PluginsSettings {
/// and stopped as the system is shut down.
///
/// - Parameter plugin: plugin to install in the actor system
public mutating func install<P: _Plugin>(plugin: P) {
public mutating func install<P: Plugin>(plugin: P) {
precondition(
!self.isInstalled(plugin: plugin),
"Attempted to add plugin \(plugin.key) but key already used! Plugin [\(plugin)], installed plugins: \(self.plugins)."
)

if let plugin = plugin as? (any ActorLifecyclePlugin) {
self.actorLifecycleHooks.append(plugin)
}
return self.plugins.append(BoxedPlugin(plugin))
}

/// Returns `true` if the given plugin is installed in the settings.
///
/// - Parameter plugin: plugin to check if it is installed
public func isInstalled<P: _Plugin>(plugin: P) -> Bool {
public func isInstalled<P: Plugin>(plugin: P) -> Bool {
self.plugins.contains(where: { $0.key == plugin.key.asAny })
}

@available(*, deprecated, message: "use settings.install(plugin:) instead")
public static func += <P: _Plugin>(plugins: inout PluginsSettings, plugin: P) {
public static func += <P: Plugin>(plugins: inout PluginsSettings, plugin: P) {
plugins.add(plugin)
}
}

extension ClusterSystemSettings {
public static func += <P: _Plugin>(settings: inout ClusterSystemSettings, plugin: P) {
public static func += <P: Plugin>(settings: inout ClusterSystemSettings, plugin: P) {
settings.plugins.install(plugin: plugin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,50 @@ final class ClusterSingletonPluginTests: SingleClusterSystemXCTestCase {
singletonID.shouldNotEqual(greeterID)
}

func test_plugin_hooks() async throws {
let actorID = "actorHookID"
let hookFulfillment = self.expectation(description: "actor-hook")
let plugin = TestActorLifecyclePlugin { actor in
/// There are multiple internal actors fired, we only checking for `ActorWithId`
guard let actor = actor as? ActorWithID else { return }
Task {
let id = try? await actor.getID()
XCTAssertEqual(id, actorID, "Expected \(actorID) as an ID")
hookFulfillment.fulfill()
}
}
let testNode = await setUpNode("test-hook") { settings in
settings.enabled = false
settings += plugin
}

let id = ActorWithID(actorSystem: testNode, customID: actorID)
await fulfillment(of: [hookFulfillment], timeout: 3.0)
}

final class TestActorLifecyclePlugin: ActorLifecyclePlugin {
var key: Key { "$testClusterHook" }

let onActorReady: (any DistributedActor) -> Void
let _lock: _Mutex = .init()

init(
onActorReady: @escaping (any DistributedActor) -> Void
) {
self.onActorReady = onActorReady
}

func onActorReady<Act: DistributedActor>(_ actor: Act) where Act.ID == ClusterSystem.ActorID {
self._lock.lock()
self.onActorReady(actor)
self._lock.unlock()
}

func onResignID(_ id: ClusterSystem.ActorID) {}
func start(_ system: ClusterSystem) async throws {}
func stop(_ system: ClusterSystem) async {}
}

distributed actor SingletonWhichCreatesDistributedActorDuringInit: ClusterSingleton {
typealias ActorSystem = ClusterSystem

Expand Down Expand Up @@ -90,4 +134,20 @@ final class ClusterSingletonPluginTests: SingleClusterSystemXCTestCase {
print("Hello!")
}
}

distributed actor ActorWithID {
let customID: String

init(
actorSystem: ActorSystem,
customID: String
) {
self.actorSystem = actorSystem
self.customID = customID
}

distributed func getID() -> String {
self.customID
}
}
}
Loading