@@ -296,96 +296,73 @@ final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
296
296
try await self . awaitStatus ( . suspect( incarnation: 0 , suspectedBy: [ first. node] ) , for: targetPeer, on: first, within: . seconds( 1 ) )
297
297
}
298
298
299
- /*
300
- func test_swim_shouldNotMarkUnreachable_whenSuspectedByNotEnoughNodes_whenMinTimeoutReached() async throws {
301
- let first = await self.setUpFirst()
302
- let firstNode = first.cluster.uniqueNode
303
- let second = await self.setUpSecond()
304
-
305
- first.cluster.join(node: second.cluster.uniqueNode.node)
306
- try assertAssociated(first, withExactly: second.cluster.uniqueNode)
307
- try assertAssociated(second, withExactly: first.cluster.uniqueNode)
308
-
309
- let probeOnSecond = self.testKit(second).makeTestProbe(expecting: SWIM.Message.self)
310
- let remoteMemberRef = first._resolveKnownRemote(probeOnSecond.ref, onRemoteSystem: second)
311
- let maxIndependentSuspicions = 10
312
- let suspicionTimeoutPeriodsMax = 1000
313
- let suspicionTimeoutPeriodsMin = 1
314
- let timeSource = TestTimeSource()
315
-
316
- let ref = try first._spawn(
317
- "SWIM",
318
- SWIMActorShell.swimTestBehavior(members: [remoteMemberRef], clusterRef: self.firstClusterProbe.ref) { settings in
319
- settings.timeSourceNow = timeSource.now
320
- settings.lifeguard.suspicionTimeoutMin = .nanoseconds(suspicionTimeoutPeriodsMin)
321
- settings.lifeguard.suspicionTimeoutMax = .nanoseconds(suspicionTimeoutPeriodsMax)
322
- settings.lifeguard.maxIndependentSuspicions = maxIndependentSuspicions
323
- }
324
- )
325
- ref.tell(.local(.protocolPeriodTick))
326
- try self.expectPing(on: probeOnSecond, reply: false)
327
- timeSource.tick()
328
- let ackProbe = self.testKit(first).makeTestProbe(expecting: SWIM.Message.self)
329
- let suspectStatus: SWIM.Status = .suspect(incarnation: 0, suspectedBy: [firstNode.asSWIMNode])
330
- ref.tell(.remote(.ping(pingOrigin: ackProbe.ref, payload: .membership([SWIM.Member(peer: remoteMemberRef, status: suspectStatus, protocolPeriod: 0)]), sequenceNumber: 1)))
331
-
332
- try self.awaitStatus(suspectStatus, for: remoteMemberRef, on: ref, within: .seconds(1))
333
- timeSource.tick()
334
-
335
- for _ in 0 ..< suspicionTimeoutPeriodsMin {
336
- ref.tell(.local(.protocolPeriodTick))
337
- try self.expectPing(on: probeOnSecond, reply: false)
338
- timeSource.tick()
339
- }
340
-
341
- // We need to trigger an additional ping to advance the protocol period
342
- // and have the SWIM actor mark the remote node as dead
343
- ref.tell(.local(.protocolPeriodTick))
344
- try self.firstClusterProbe.expectNoMessage(for: .seconds(1))
345
- }
346
-
347
- /// Passed in `eventStreamProbe` is expected to have been subscribed to the event stream as early as possible,
348
- /// as we want to expect the specific reachability event to be sent
349
- private func expectReachabilityEvent(
350
- _ testKit: ActorTestKit, _ eventStreamProbe: ActorTestProbe<Cluster.Event>,
351
- node uniqueNode: UniqueNode, expect expected: Cluster.MemberReachability
352
- ) throws {
353
- let messages = try eventStreamProbe.fishFor(Cluster.ReachabilityChange.self, within: .seconds(10)) { event in
354
- switch event {
355
- case .reachabilityChange(let change):
356
- return .catchComplete(change)
357
- default:
358
- return .ignore
359
- }
360
- }
361
- messages.count.shouldEqual(1)
362
- guard let change: Cluster.ReachabilityChange = messages.first else {
363
- throw testKit.fail("Expected a reachability change, but did not get one on \(testKit.system.cluster.uniqueNode)")
364
- }
365
- change.member.uniqueNode.shouldEqual(uniqueNode)
366
- change.member.reachability.shouldEqual(expected)
367
- }
368
-
369
- private func expectReachabilityInSnapshot(_ testKit: ActorTestKit, node: UniqueNode, expect expected: Cluster.MemberReachability) throws {
370
- try testKit.eventually(within: .seconds(3)) {
371
- let p11 = testKit.spawnEventStreamTestProbe(subscribedTo: testKit.system.cluster.events)
372
- guard case .some(Cluster.Event.snapshot(let snapshot)) = try p11.maybeExpectMessage() else {
373
- throw testKit.error("Expected snapshot, was: \(String(reflecting: p11.lastMessage))")
374
- }
375
-
376
- if let secondMember = snapshot.uniqueMember(node) {
377
- if secondMember.reachability == expected {
378
- return
379
- } else {
380
- throw testKit.error("Expected \(node) on \(testKit.system.cluster.uniqueNode) to be [\(expected)] but was: \(secondMember)")
381
- }
382
- } else {
383
- pinfo("Unable to assert reachability of \(node) on \(testKit.system.cluster.uniqueNode) since membership did not contain it. Was: \(snapshot)")
384
- () // it may have technically been removed already, so this is "fine"
385
- }
386
- }
387
- }
388
- */
299
+ func test_swim_shouldNotMarkUnreachable_whenSuspectedByNotEnoughNodes_whenMinTimeoutReached( ) async throws {
300
+ let maxIndependentSuspicions = 10
301
+ let suspicionTimeoutPeriodsMax = 1000
302
+ let suspicionTimeoutPeriodsMin = 1
303
+ let timeSource = TestTimeSource ( )
304
+
305
+ let firstNode = await self . setUpFirst ( ) { settings in
306
+ settings. swim. timeSourceNow = timeSource. now
307
+ settings. swim. lifeguard. suspicionTimeoutMin = . nanoseconds( suspicionTimeoutPeriodsMin)
308
+ settings. swim. lifeguard. suspicionTimeoutMax = . nanoseconds( suspicionTimeoutPeriodsMax)
309
+ settings. swim. lifeguard. maxIndependentSuspicions = maxIndependentSuspicions
310
+ }
311
+ let secondNode = await self . setUpSecond ( )
312
+ let thirdNode = await self . setUpThird ( )
313
+
314
+ firstNode. cluster. join ( node: secondNode. cluster. uniqueNode. node)
315
+ thirdNode. cluster. join ( node: secondNode. cluster. uniqueNode. node)
316
+ try assertAssociated ( firstNode, withExactly: [ secondNode. cluster. uniqueNode, thirdNode. cluster. uniqueNode] )
317
+ try assertAssociated ( secondNode, withExactly: [ firstNode. cluster. uniqueNode, thirdNode. cluster. uniqueNode] )
318
+
319
+ guard let first = firstNode. _cluster? . _swimShell else {
320
+ throw testKit ( firstNode) . fail ( " SWIM shell of [ \( firstNode) ] should not be nil " )
321
+ }
322
+ guard let second = secondNode. _cluster? . _swimShell else {
323
+ throw testKit ( secondNode) . fail ( " SWIM shell of [ \( secondNode) ] should not be nil " )
324
+ }
325
+ guard let third = thirdNode. _cluster? . _swimShell else {
326
+ throw testKit ( thirdNode) . fail ( " SWIM shell of [ \( thirdNode) ] should not be nil " )
327
+ }
328
+
329
+ try await self . configureSWIM ( for: first, members: [ second, third] )
330
+
331
+ let originPeer = try SWIMActorShell . resolve ( id: third. id. _asRemote, using: secondNode)
332
+ let targetPeer = try SWIMActorShell . resolve ( id: second. id. _asRemote, using: firstNode)
333
+
334
+ _ = await first. whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
335
+ __secretlyKnownToBeLocal. handlePeriodicProtocolPeriodTick ( )
336
+ }
337
+ // FIXME: use a non-responsive test probe
338
+ // try self.expectPing(on: probeOnSecond, reply: false)
339
+ timeSource. tick ( )
340
+
341
+ let suspectStatus : SWIM . Status = . suspect( incarnation: 0 , suspectedBy: [ first. node] )
342
+
343
+ _ = try await first. ping ( origin: originPeer, payload: . membership( [ SWIM . Member ( peer: targetPeer, status: suspectStatus, protocolPeriod: 0 ) ] ) , sequenceNumber: 1 )
344
+
345
+ try await self . awaitStatus ( suspectStatus, for: targetPeer, on: first, within: . seconds( 1 ) )
346
+ timeSource. tick ( )
347
+
348
+ // for _ in 0 ..< suspicionTimeoutPeriodsMin {
349
+ // _ = await first.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
350
+ // __secretlyKnownToBeLocal.handlePeriodicProtocolPeriodTick()
351
+ // }
352
+ // // FIXME: use a non-responsive test probe
353
+ // try self.expectPing(on: probeOnSecond, reply: false)
354
+ // timeSource.tick()
355
+ // }
356
+ //
357
+ // // We need to trigger an additional ping to advance the protocol period
358
+ // // and have the SWIM actor mark the remote node as dead
359
+ // _ = await first.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
360
+ // __secretlyKnownToBeLocal.handlePeriodicProtocolPeriodTick()
361
+ // }
362
+ //
363
+ // // FIXME: would second end up with .dead status?
364
+ // try await self.awaitStatus(.dead, for: targetPeer, on: first, within: .seconds(1))
365
+ }
389
366
390
367
// ==== ----------------------------------------------------------------------------------------------------------------
391
368
// MARK: Gossiping
@@ -439,78 +416,6 @@ final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
439
416
// ==== ------------------------------------------------------------------------------------------------------------
440
417
// MARK: utility functions
441
418
442
- /*
443
- struct ForwardedSWIMMessage: Codable {
444
- let message: SWIM.Message
445
- let recipient: _ActorRef<SWIM.Message>
446
- }
447
-
448
- func forwardingSWIMBehavior(forwardTo ref: _ActorRef<ForwardedSWIMMessage>) -> _Behavior<SWIM.Message> {
449
- .receive { context, message in
450
- ref.tell(.init(message: message, recipient: context.myself))
451
- return .same
452
- }
453
- }
454
-
455
- func expectPing(
456
- on probe: ActorTestProbe<SWIM.Message>, reply: Bool, incarnation: SWIM.Incarnation = 0,
457
- file: StaticString = #file, line: UInt = #line, column: UInt = #column,
458
- assertPayload: (SWIM.GossipPayload) throws -> Void = { _ in
459
- }
460
- ) throws {
461
- switch try probe.expectMessage(file: file, line: line, column: column) {
462
- case .remote(.ping(let replyTo, let payload, let sequenceNumber)):
463
- try assertPayload(payload)
464
- if reply {
465
- replyTo.tell(.remote(.pingResponse(.ack(target: probe.ref, incarnation: incarnation, payload: .none, sequenceNumber: sequenceNumber))))
466
- }
467
- case let message:
468
- throw probe.error("Expected to receive `.ping`, received \(message) instead")
469
- }
470
- }
471
-
472
- func awaitStatus(
473
- _ status: SWIM.Status, for peer: _ActorRef<SWIM.Message>,
474
- on swimShell: _ActorRef<SWIM.Message>, within timeout: Duration,
475
- file: StaticString = #file, line: UInt = #line, column: UInt = #column
476
- ) throws {
477
- let testKit = self._testKits.first!
478
- let stateProbe = testKit.makeTestProbe(expecting: [SWIM.Member].self)
479
-
480
- try testKit.eventually(within: timeout, file: file, line: line, column: column) {
481
- swimShell.tell(._testing(._getMembershipState(replyTo: stateProbe.ref)))
482
- let membership = try stateProbe.expectMessage()
483
-
484
- let otherStatus = membership
485
- .first(where: { $0.peer as! SWIM.Ref == peer })
486
- .map(\.status)
487
- guard otherStatus == status else {
488
- throw testKit.error("Expected status [\(status)] for [\(peer)], but found \(otherStatus.debugDescription); Membership: \(membership)", file: file, line: line)
489
- }
490
- }
491
- }
492
-
493
- func holdStatus(
494
- _ status: SWIM.Status, for peer: _ActorRef<SWIM.Message>,
495
- on swimShell: _ActorRef<SWIM.Message>, within timeout: Duration,
496
- file: StaticString = #file, line: UInt = #line, column: UInt = #column
497
- ) throws {
498
- let testKit = self._testKits.first!
499
- let stateProbe = testKit.makeTestProbe(expecting: [SWIM.Member].self)
500
-
501
- try testKit.assertHolds(for: timeout, file: file, line: line, column: column) {
502
- swimShell.tell(._testing(._getMembershipState(replyTo: stateProbe.ref)))
503
- let membership = try stateProbe.expectMessage()
504
- let otherStatus = membership
505
- .first(where: { $0.peer as! SWIM.Ref == peer })
506
- .map(\.status)
507
- guard otherStatus == status else {
508
- throw testKit.error("Expected status [\(status)] for [\(peer)], but found \(otherStatus.debugDescription)")
509
- }
510
- }
511
- }
512
- */
513
-
514
419
private func configureSWIM( for swimShell: SWIMActorShell , members: [ SWIMActorShell ] ) async throws {
515
420
var memberStatus : [ SWIMActorShell : SWIM . Status ] = [ : ]
516
421
for member in members {
@@ -563,6 +468,7 @@ final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
563
468
let otherStatus = membership
564
469
. first ( where: { $0. peer as! SWIMActorShell == peer } )
565
470
. map ( \. status)
471
+
566
472
guard otherStatus == status else {
567
473
throw testKit. error ( " Expected status [ \( status) ] for [ \( peer) ], but found \( otherStatus. debugDescription) ; Membership: \( membership) " , file: file, line: line)
568
474
}
0 commit comments