Skip to content

Add optional write completion handler for emit's #1097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions Source/SocketIO/Client/SocketIOClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,26 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
/// - parameter items: The items to send with this event. May be left out.
open func emit(_ event: String, _ items: SocketData...) {
do {
try emit(event, with: items.map({ try $0.socketRepresentation() }))
try emit(event, with: items.map({ try $0.socketRepresentation() }), completion: {})
} catch {
DefaultSocketLogger.Logger.error("Error creating socketRepresentation for emit: \(event), \(items)",
type: logType)

handleClientEvent(.error, data: [event, items, error])
}
}

/// Send an event to the server, with optional data items and write completion handler.
///
/// If an error occurs trying to transform `items` into their socket representation, a `SocketClientEvent.error`
/// will be emitted. The structure of the error data is `[eventName, items, theError]`
///
/// - parameter event: The event to send.
/// - parameter items: The items to send with this event. May be left out.
/// - parameter completion: Callback called on transport write completion.
open func emit(_ event: String, _ items: SocketData..., completion: @escaping () -> ()) {
do {
try emit(event, with: items.map({ try $0.socketRepresentation() }), completion: completion)
} catch {
DefaultSocketLogger.Logger.error("Error creating socketRepresentation for emit: \(event), \(items)",
type: logType)
Expand All @@ -228,7 +247,17 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
/// - parameter items: The items to send with this event. Send an empty array to send no data.
@objc
open func emit(_ event: String, with items: [Any]) {
emit([event] + items)
emit([event] + items, completion: {})
}

/// Same as emit, but meant for Objective-C
///
/// - parameter event: The event to send.
/// - parameter items: The items to send with this event. Send an empty array to send no data.
/// - parameter completion: Callback called on transport write completion.
@objc
open func emit(_ event: String, with items: [Any], completion: @escaping () -> ()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last thing, does it make sense to also add a nicer Swift version of this that uses variadics? I think it should be possible to define

open func emit(_ event: String, _ items: SocketData..., completion: () -> ())  { /* call to base version like the other methods */ }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a method for Swift. I also added the method signature to the SocketIOClientSpec, but not sure if that's needed?

emit([event] + items, completion: completion)
}

/// Sends a message to the server, requesting an ack.
Expand Down Expand Up @@ -284,8 +313,15 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
return createOnAck([event] + items)
}

func emit(_ data: [Any], ack: Int? = nil, binary: Bool = true, isAck: Bool = false) {
func emit(_ data: [Any], ack: Int? = nil, binary: Bool = true, isAck: Bool = false, completion: (() -> ())? = nil) {
// wrap the completion handler so it always runs async via handlerQueue
let wrappedCompletion = {[weak self] in
guard let this = self else { return }
this.manager?.handleQueue.async { completion?() }
}

guard status == .connected else {
wrappedCompletion();
handleClientEvent(.error, data: ["Tried emitting when not connected"])
return
}
Expand All @@ -295,7 +331,7 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {

DefaultSocketLogger.Logger.log("Emitting: \(str), Ack: \(isAck)", type: logType)

manager?.engine?.send(str, withData: packet.binary)
manager?.engine?.send(str, withData: packet.binary, completion: completion != nil ? wrappedCompletion : nil)
}

/// Call when you wish to tell the server that you've received the event for `ack`.
Expand Down
10 changes: 10 additions & 0 deletions Source/SocketIO/Client/SocketIOClientSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public protocol SocketIOClientSpec : AnyObject {
/// - parameter items: The items to send with this event. May be left out.
func emit(_ event: String, _ items: SocketData...)

/// Send an event to the server, with optional data items and write completion handler.
///
/// If an error occurs trying to transform `items` into their socket representation, a `SocketClientEvent.error`
/// will be emitted. The structure of the error data is `[eventName, items, theError]`
///
/// - parameter event: The event to send.
/// - parameter items: The items to send with this event. May be left out.
/// - parameter completion: Callback called on transport write completion.
func emit(_ event: String, _ items: SocketData..., completion: @escaping () -> ())

/// Call when you wish to tell the server that you've received the event for `ack`.
///
/// - parameter ack: The ack number.
Expand Down
30 changes: 17 additions & 13 deletions Source/SocketIO/Engine/SocketEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
/// A queue of engine.io messages waiting for POSTing
///
/// **You should not touch this directly**
public var postWait = [String]()
public var postWait = [Post]()

/// `true` if there is an outstanding poll. Trying to poll before the first is done will cause socket.io to
/// disconnect us.
Expand Down Expand Up @@ -340,15 +340,15 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
if polling {
disconnectPolling(reason: reason)
} else {
sendWebSocketMessage("", withType: .close, withData: [])
sendWebSocketMessage("", withType: .close, withData: [], completion: {})
closeOutEngine(reason: reason)
}
}

// We need to take special care when we're polling that we send it ASAP
// Also make sure we're on the emitQueue since we're touching postWait
private func disconnectPolling(reason: String) {
postWait.append(String(SocketEnginePacketType.close.rawValue))
postWait.append((String(SocketEnginePacketType.close.rawValue), {}))

doRequest(for: createRequestForPostWithPostWait()) {_, _, _ in }
closeOutEngine(reason: reason)
Expand All @@ -366,7 +366,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So

DefaultSocketLogger.Logger.log("Switching to WebSockets", type: SocketEngine.logType)

sendWebSocketMessage("", withType: .upgrade, withData: [])
sendWebSocketMessage("", withType: .upgrade, withData: [], completion: {})
polling = false
fastUpgrade = false
probing = false
Expand All @@ -384,7 +384,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
DefaultSocketLogger.Logger.log("Flushing probe wait", type: SocketEngine.logType)

for waiter in probeWait {
write(waiter.msg, withType: waiter.type, withData: waiter.data)
write(waiter.msg, withType: waiter.type, withData: waiter.data, completion:waiter.completion)
}

probeWait.removeAll(keepingCapacity: false)
Expand All @@ -398,7 +398,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
guard let ws = self.ws else { return }

for msg in postWait {
ws.write(string: msg)
ws.write(string: msg.msg, completion: msg.completion)
}

postWait.removeAll(keepingCapacity: false)
Expand Down Expand Up @@ -544,7 +544,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
}

pongsMissed += 1
write("", withType: .ping, withData: [])
write("", withType: .ping, withData: [], completion: {})

engineQueue.asyncAfter(deadline: DispatchTime.now() + .milliseconds(pingInterval)) {[weak self, id = self.sid] in
// Make sure not to ping old connections
Expand Down Expand Up @@ -600,7 +600,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
DefaultSocketLogger.Logger.log("Upgrading transport to WebSockets", type: SocketEngine.logType)

fastUpgrade = true
sendPollMessage("", withType: .noop, withData: [])
sendPollMessage("", withType: .noop, withData: [], completion: {})
// After this point, we should not send anymore polling messages
}
}
Expand All @@ -610,23 +610,27 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
/// - parameter msg: The message to send.
/// - parameter type: The type of this message.
/// - parameter data: Any data that this message has.
open func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data]) {
/// - parameter completion: Callback called on transport write completion.
open func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: @escaping () -> ()) {
engineQueue.async {
guard self.connected else { return }
guard self.connected else {
completion()
return
}
guard !self.probing else {
self.probeWait.append((msg, type, data))
self.probeWait.append((msg, type, data, completion))

return
}

if self.polling {
DefaultSocketLogger.Logger.log("Writing poll: \(msg) has data: \(data.count != 0)",
type: SocketEngine.logType)
self.sendPollMessage(msg, withType: type, withData: data)
self.sendPollMessage(msg, withType: type, withData: data, completion: completion)
} else {
DefaultSocketLogger.Logger.log("Writing ws: \(msg) has data: \(data.count != 0)",
type: SocketEngine.logType)
self.sendWebSocketMessage(msg, withType: type, withData: data)
self.sendWebSocketMessage(msg, withType: type, withData: data, completion: completion)
}
}
}
Expand Down
20 changes: 13 additions & 7 deletions Source/SocketIO/Engine/SocketEnginePollable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public protocol SocketEnginePollable : SocketEngineSpec {
/// A queue of engine.io messages waiting for POSTing
///
/// **You should not touch this directly**
var postWait: [String] { get set }
var postWait: [Post] { get set }

/// The URLSession that will be used for polling.
var session: URLSession? { get }
Expand Down Expand Up @@ -65,7 +65,7 @@ public protocol SocketEnginePollable : SocketEngineSpec {
/// - parameter message: The message to send.
/// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message.
func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data])
func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ())

/// Call to stop polling and invalidate the URLSession.
func stopPolling()
Expand All @@ -74,12 +74,15 @@ public protocol SocketEnginePollable : SocketEngineSpec {
// Default polling methods
extension SocketEnginePollable {
func createRequestForPostWithPostWait() -> URLRequest {
defer { postWait.removeAll(keepingCapacity: true) }
defer {
for packet in postWait { packet.completion() }
postWait.removeAll(keepingCapacity: true)
}

var postStr = ""

for packet in postWait {
postStr += "\(packet.utf16.count):\(packet)"
postStr += "\(packet.msg.utf16.count):\(packet.msg)"
}

DefaultSocketLogger.Logger.log("Created POST string: \(postStr)", type: "SocketEnginePolling")
Expand Down Expand Up @@ -209,14 +212,17 @@ extension SocketEnginePollable {
/// - parameter message: The message to send.
/// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message.
public func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data]) {
/// - parameter completion: Callback called on transport write completion.
public func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ()) {
DefaultSocketLogger.Logger.log("Sending poll: \(message) as type: \(type.rawValue)", type: "SocketEnginePolling")

postWait.append(String(type.rawValue) + message)
postWait.append((String(type.rawValue) + message, completion))

for data in datas {
if case let .right(bin) = createBinaryDataForSend(using: data) {
postWait.append(bin)
// completion handler will be called on initial message write
// TODO: call completion after last message in batch
postWait.append((bin, {}))
}
}

Expand Down
9 changes: 5 additions & 4 deletions Source/SocketIO/Engine/SocketEngineSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ import Starscream
/// - parameter msg: The message to send.
/// - parameter type: The type of this message.
/// - parameter data: Any data that this message has.
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data])
/// - parameter completion: Callback called on transport write completion.
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: @escaping () -> ())
}

extension SocketEngineSpec {
Expand All @@ -162,7 +163,7 @@ extension SocketEngineSpec {
if !cookiesToAdd.isEmpty {
req.allHTTPHeaderFields = HTTPCookie.requestHeaderFields(with: cookiesToAdd)
}

if let extraHeaders = extraHeaders {
for (headerName, value) in extraHeaders {
req.setValue(value, forHTTPHeaderField: headerName)
Expand All @@ -179,7 +180,7 @@ extension SocketEngineSpec {
}

/// Send an engine message (4)
func send(_ msg: String, withData datas: [Data]) {
write(msg, withType: .message, withData: datas)
func send(_ msg: String, withData datas: [Data], completion: (() -> ())? = nil) {
write(msg, withType: .message, withData: datas, completion: completion ?? {})
}
}
10 changes: 6 additions & 4 deletions Source/SocketIO/Engine/SocketEngineWebsocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ public protocol SocketEngineWebsocket : SocketEngineSpec {
/// - parameter message: The message to send.
/// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message.
func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data])
/// - parameter completion: Callback called on transport write completion.
func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ())
}

// WebSocket methods
extension SocketEngineWebsocket {
func probeWebSocket() {
if ws?.isConnected ?? false {
sendWebSocketMessage("probe", withType: .ping, withData: [])
sendWebSocketMessage("probe", withType: .ping, withData: [], completion: {})
}
}

Expand All @@ -55,14 +56,15 @@ extension SocketEngineWebsocket {
/// - parameter message: The message to send.
/// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message.
public func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data]) {
/// - parameter completion: Callback called on transport write completion.
public func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ()) {
DefaultSocketLogger.Logger.log("Sending ws: \(str) as type: \(type.rawValue)", type: "SocketEngineWebSocket")

ws?.write(string: "\(type.rawValue)\(str)")

for data in datas {
if case let .left(bin) = createBinaryDataForSend(using: data) {
ws?.write(data: bin)
ws?.write(data: bin, completion: completion)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/SocketIO/Manager/SocketManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ open class SocketManager : NSObject, SocketManagerSpec, SocketParsable, SocketDa
/// - parameter items: The data to send with this event.
open func emitAll(_ event: String, withItems items: [Any]) {
forAll {socket in
socket.emit(event, with: items)
socket.emit(event, with: items, completion: {})
}
}

Expand Down
5 changes: 4 additions & 1 deletion Source/SocketIO/Util/SocketTypes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ public typealias AckCallback = ([Any]) -> ()
/// A typealias for a normal callback.
public typealias NormalCallback = ([Any], SocketAckEmitter) -> ()

/// A typealias for a queued POST
public typealias Post = (msg: String, completion: (() -> ()))

typealias JSON = [String: Any]
typealias Probe = (msg: String, type: SocketEnginePacketType, data: [Data])
typealias Probe = (msg: String, type: SocketEnginePacketType, data: [Data], completion: (() -> ()))
typealias ProbeWaitQueue = [Probe]

enum Either<E, V> {
Expand Down
2 changes: 1 addition & 1 deletion Tests/TestSocketIO/SocketMangerTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public class TestSocket : SocketIOClient {
super.didDisconnect(reason: reason)
}

public override func emit(_ event: String, with items: [Any]) {
public override func emit(_ event: String, with items: [Any], completion: @escaping () -> ()) {
expectations[ManagerExpectation.emitAllEventCalled]?.fulfill()
expectations[ManagerExpectation.emitAllEventCalled] = nil

Expand Down
7 changes: 6 additions & 1 deletion Tests/TestSocketIO/SocketSideEffectTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class SocketSideEffectTest: XCTestCase {
XCTAssertEqual(socket.currentAck, 1)
}

func testEmitCompletionSyntax() {
socket.emit("test", completion: {})
socket.emit("test", "thing", completion: {})
}

func testHandleAck() {
let expect = expectation(description: "handled ack")
socket.emitWithAck("test").timingOut(after: 0) {data in
Expand Down Expand Up @@ -506,5 +511,5 @@ class TestEngine : SocketEngineSpec {
func flushWaitingForPostToWebSocket() { }
func parseEngineData(_ data: Data) { }
func parseEngineMessage(_ message: String) { }
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data]) { }
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: @escaping () -> ()) { }
}
14 changes: 14 additions & 0 deletions Tests/TestSocketIOObjc/SocketObjectiveCTest.m
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ - (void)testEmitSyntax {
[self.socket emit:@"testEmit" with:@[@YES]];
}

- (void)testEmitWriteCompletionSyntax {
[self.socket emit:@"testEmit" with:@[@YES] completion:^{}];
}

- (void)testEmitWriteCompletion {
XCTestExpectation* expect = [self expectationWithDescription:@"Write completion should be called"];

[self.socket emit:@"testEmit" with:@[@YES] completion:^{
[expect fulfill];
}];

[self waitForExpectationsWithTimeout:0.3 handler:nil];
}

- (void)testRawEmitSyntax {
[[self.socket rawEmitView] emit:@"myEvent" with:@[@1]];
}
Expand Down