Skip to content

Commit 850b7bc

Browse files
DanTupgrouma
authored andcommitted
Support transparent reconnects on the server (flutter#19)
@grouma this is an attempt to fix flutter#18 (may be easier to [view the diff ignoring whitespace](https://github.com/dart-lang/sse/pull/19/files?utf8=%E2%9C%93&diff=unified&w=1) since some code got indenting and makes the diff look much bigger than it is). However there is an exposed method here - `closeSink` that closes the underlying sink (in order to test - we can't close the exposed `sink` because it closes the stream controller that needs to continue to be used). I'm not sure if there's a better way (we can add `@visibleForTesting`, though `meta` isn't currently referenced here). Happy to make changes if this isn't what you had in mind (and I can test it end-to-end through dwds and GitPod to confirm it works prior to merging it).
1 parent a5e8d44 commit 850b7bc

File tree

6 files changed

+405
-242
lines changed

6 files changed

+405
-242
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ addons:
55

66
dart:
77
- dev
8-
- 2.1.0
8+
- 2.2.0
99

1010
with_content_shell: false
1111

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 3.1.0
2+
3+
- Add optional `keepAlive` parameter to the `SseHandler`. If `keepAlive` is
4+
supplied, the connection will remain active for this period after a
5+
disconnect and can be reconnected transparently. If there is no reconnect
6+
within that period, the connection will be closed normally.
7+
18
## 3.0.0
29

310
- Add retry logic.

lib/server/sse_handler.dart

Lines changed: 1 addition & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -2,151 +2,4 @@
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
44

5-
import 'dart:async';
6-
import 'dart:convert';
7-
8-
import 'package:async/async.dart';
9-
import 'package:logging/logging.dart';
10-
import 'package:pedantic/pedantic.dart';
11-
import 'package:shelf/shelf.dart' as shelf;
12-
import 'package:stream_channel/stream_channel.dart';
13-
14-
// RFC 2616 requires carriage return delimiters.
15-
String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
16-
'Content-Type: text/event-stream\r\n'
17-
'Cache-Control: no-cache\r\n'
18-
'Connection: keep-alive\r\n'
19-
'Access-Control-Allow-Credentials: true\r\n'
20-
'Access-Control-Allow-Origin: $origin\r\n'
21-
'\r\n\r\n';
22-
23-
/// A bi-directional SSE connection between server and browser.
24-
class SseConnection extends StreamChannelMixin<String> {
25-
/// Incoming messages from the Browser client.
26-
final _incomingController = StreamController<String>();
27-
28-
/// Outgoing messages to the Browser client.
29-
final _outgoingController = StreamController<String>();
30-
31-
final Sink _sink;
32-
33-
final _closedCompleter = Completer<void>();
34-
35-
SseConnection(this._sink) {
36-
_outgoingController.stream.listen((data) {
37-
if (!_closedCompleter.isCompleted) {
38-
// JSON encode the message to escape new lines.
39-
_sink.add('data: ${json.encode(data)}\n');
40-
_sink.add('\n');
41-
}
42-
});
43-
_outgoingController.onCancel = _close;
44-
_incomingController.onCancel = _close;
45-
}
46-
47-
/// The message added to the sink has to be JSON encodable.
48-
@override
49-
StreamSink<String> get sink => _outgoingController.sink;
50-
51-
// Add messages to this [StreamSink] to send them to the server.
52-
/// [Stream] of messages sent from the server to this client.
53-
///
54-
/// A message is a decoded JSON object.
55-
@override
56-
Stream<String> get stream => _incomingController.stream;
57-
58-
void _close() {
59-
if (!_closedCompleter.isCompleted) {
60-
_closedCompleter.complete();
61-
_sink.close();
62-
if (!_outgoingController.isClosed) _outgoingController.close();
63-
if (!_incomingController.isClosed) _incomingController.close();
64-
}
65-
}
66-
}
67-
68-
/// [SseHandler] handles requests on a user defined path to create
69-
/// two-way communications of JSON encodable data between server and clients.
70-
///
71-
/// A server sends messages to a client through an SSE channel, while
72-
/// a client sends message to a server through HTTP POST requests.
73-
class SseHandler {
74-
final _logger = Logger('SseHandler');
75-
final Uri _uri;
76-
final _connections = <String, SseConnection>{};
77-
final _connectionController = StreamController<SseConnection>();
78-
79-
StreamQueue<SseConnection> _connectionsStream;
80-
81-
SseHandler(this._uri);
82-
83-
StreamQueue<SseConnection> get connections =>
84-
_connectionsStream ??= StreamQueue(_connectionController.stream);
85-
86-
shelf.Handler get handler => _handle;
87-
88-
int get numberOfClients => _connections.length;
89-
90-
shelf.Response _createSseConnection(shelf.Request req, String path) {
91-
req.hijack((channel) async {
92-
var sink = utf8.encoder.startChunkedConversion(channel.sink);
93-
sink.add(_sseHeaders(req.headers['origin']));
94-
var clientId = req.url.queryParameters['sseClientId'];
95-
var connection = SseConnection(sink);
96-
_connections[clientId] = connection;
97-
unawaited(connection._closedCompleter.future.then((_) {
98-
_connections.remove(clientId);
99-
}));
100-
// Remove connection when it is remotely closed or the stream is
101-
// cancelled.
102-
channel.stream.listen((_) {
103-
// SSE is unidirectional. Responses are handled through POST requests.
104-
}, onDone: () {
105-
connection._close();
106-
});
107-
108-
_connectionController.add(connection);
109-
});
110-
return shelf.Response.notFound('');
111-
}
112-
113-
String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
114-
115-
Future<shelf.Response> _handle(shelf.Request req) async {
116-
var path = _getOriginalPath(req);
117-
if (_uri.path != path) {
118-
return shelf.Response.notFound('');
119-
}
120-
121-
if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
122-
return _createSseConnection(req, path);
123-
}
124-
125-
if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
126-
return _handleIncomingMessage(req, path);
127-
}
128-
129-
return shelf.Response.notFound('');
130-
}
131-
132-
Future<shelf.Response> _handleIncomingMessage(
133-
shelf.Request req, String path) async {
134-
try {
135-
var clientId = req.url.queryParameters['sseClientId'];
136-
var message = await req.readAsString();
137-
var jsonObject = json.decode(message) as String;
138-
_connections[clientId]?._incomingController?.add(jsonObject);
139-
} catch (e, st) {
140-
_logger.fine('Failed to handle incoming message. $e $st');
141-
}
142-
return shelf.Response.ok('', headers: {
143-
'access-control-allow-credentials': 'true',
144-
'access-control-allow-origin': _originFor(req),
145-
});
146-
}
147-
148-
String _originFor(shelf.Request req) =>
149-
// Firefox does not set header "origin".
150-
// https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
151-
req.headers['origin'] ?? req.headers['host'];
152-
}
5+
export 'package:sse/src/server/sse_handler.dart' show SseConnection, SseHandler;

lib/src/server/sse_handler.dart

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:convert';
7+
8+
import 'package:async/async.dart';
9+
import 'package:logging/logging.dart';
10+
import 'package:pedantic/pedantic.dart';
11+
import 'package:shelf/shelf.dart' as shelf;
12+
import 'package:stream_channel/stream_channel.dart';
13+
14+
// RFC 2616 requires carriage return delimiters.
15+
String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
16+
'Content-Type: text/event-stream\r\n'
17+
'Cache-Control: no-cache\r\n'
18+
'Connection: keep-alive\r\n'
19+
'Access-Control-Allow-Credentials: true\r\n'
20+
'Access-Control-Allow-Origin: $origin\r\n'
21+
'\r\n\r\n';
22+
23+
/// A bi-directional SSE connection between server and browser.
24+
class SseConnection extends StreamChannelMixin<String> {
25+
/// Incoming messages from the Browser client.
26+
final _incomingController = StreamController<String>();
27+
28+
/// Outgoing messages to the Browser client.
29+
final _outgoingController = StreamController<String>();
30+
31+
Sink _sink;
32+
33+
/// How long to wait after a connection drops before considering it closed.
34+
final Duration _keepAlive;
35+
36+
/// A timer counting down the KeepAlive period (null if hasn't disconnected).
37+
Timer _keepAliveTimer;
38+
39+
/// Whether this connection is currently in the KeepAlive timeout period.
40+
bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false;
41+
42+
final _closedCompleter = Completer<void>();
43+
44+
/// Creates an [SseConnection] for the supplied [_sink].
45+
///
46+
/// If [keepAlive] is supplied, the connection will remain active for this
47+
/// period after a disconnect and can be reconnected transparently. If there
48+
/// is no reconnect within that period, the connection will be closed normally.
49+
///
50+
/// If [keepAlive] is not supplied, the connection will be closed immediately
51+
/// after a disconnect.
52+
SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
53+
unawaited(_setUpListener());
54+
_outgoingController.onCancel = _close;
55+
_incomingController.onCancel = _close;
56+
}
57+
58+
Future<void> _setUpListener() async {
59+
var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
60+
while (await outgoingStreamQueue.hasNext) {
61+
// If we're in a KeepAlive timeout, there's nowhere to send messages so
62+
// wait a short period and check again.
63+
if (isInKeepAlivePeriod) {
64+
await Future.delayed(const Duration(milliseconds: 200));
65+
continue;
66+
}
67+
68+
// Peek the data so we don't remove it from the stream if we're unable to
69+
// send it.
70+
final data = await outgoingStreamQueue.peek;
71+
try {
72+
// JSON encode the message to escape new lines.
73+
_sink.add('data: ${json.encode(data)}\n');
74+
_sink.add('\n');
75+
await outgoingStreamQueue.next; // Consume from stream if no errors.
76+
} catch (StateError) {
77+
if (_keepAlive == null || _closedCompleter.isCompleted) {
78+
rethrow;
79+
}
80+
// If we got here then the sink may have closed but the stream.onDone
81+
// hasn't fired yet, so pause the subscription and skip calling
82+
// `next` so the message remains in the queue to try again.
83+
_handleDisconnect();
84+
}
85+
}
86+
}
87+
88+
/// The message added to the sink has to be JSON encodable.
89+
@override
90+
StreamSink<String> get sink => _outgoingController.sink;
91+
92+
// Add messages to this [StreamSink] to send them to the server.
93+
/// [Stream] of messages sent from the server to this client.
94+
///
95+
/// A message is a decoded JSON object.
96+
@override
97+
Stream<String> get stream => _incomingController.stream;
98+
99+
void _acceptReconnection(Sink sink) {
100+
_keepAliveTimer?.cancel();
101+
_sink = sink;
102+
}
103+
104+
void _handleDisconnect() {
105+
if (_keepAlive == null) {
106+
// Close immediately if we're not keeping alive.
107+
_close();
108+
} else if (!isInKeepAlivePeriod) {
109+
// Otherwise if we didn't already have an active timer, set a timer to
110+
// close after the timeout period. If the connection comes back, this will
111+
// be cancelled and all messages left in the queue tried again.
112+
_keepAliveTimer = Timer(_keepAlive, _close);
113+
}
114+
}
115+
116+
void _close() {
117+
if (!_closedCompleter.isCompleted) {
118+
_closedCompleter.complete();
119+
_sink.close();
120+
if (!_outgoingController.isClosed) _outgoingController.close();
121+
if (!_incomingController.isClosed) _incomingController.close();
122+
}
123+
}
124+
}
125+
126+
/// [SseHandler] handles requests on a user defined path to create
127+
/// two-way communications of JSON encodable data between server and clients.
128+
///
129+
/// A server sends messages to a client through an SSE channel, while
130+
/// a client sends message to a server through HTTP POST requests.
131+
class SseHandler {
132+
final _logger = Logger('SseHandler');
133+
final Uri _uri;
134+
final Duration _keepAlive;
135+
final _connections = <String, SseConnection>{};
136+
final _connectionController = StreamController<SseConnection>();
137+
138+
StreamQueue<SseConnection> _connectionsStream;
139+
140+
SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive;
141+
142+
StreamQueue<SseConnection> get connections =>
143+
_connectionsStream ??= StreamQueue(_connectionController.stream);
144+
145+
shelf.Handler get handler => _handle;
146+
147+
int get numberOfClients => _connections.length;
148+
149+
shelf.Response _createSseConnection(shelf.Request req, String path) {
150+
req.hijack((channel) async {
151+
var sink = utf8.encoder.startChunkedConversion(channel.sink);
152+
sink.add(_sseHeaders(req.headers['origin']));
153+
var clientId = req.url.queryParameters['sseClientId'];
154+
155+
// Check if we already have a connection for this ID that is in the process
156+
// of timing out (in which case we can reconnect it transparently).
157+
if (_connections[clientId] != null &&
158+
_connections[clientId].isInKeepAlivePeriod) {
159+
_connections[clientId]._acceptReconnection(sink);
160+
} else {
161+
var connection = SseConnection(sink, keepAlive: _keepAlive);
162+
_connections[clientId] = connection;
163+
unawaited(connection._closedCompleter.future.then((_) {
164+
_connections.remove(clientId);
165+
}));
166+
// Remove connection when it is remotely closed or the stream is
167+
// cancelled.
168+
channel.stream.listen((_) {
169+
// SSE is unidirectional. Responses are handled through POST requests.
170+
}, onDone: () {
171+
connection._handleDisconnect();
172+
});
173+
174+
_connectionController.add(connection);
175+
}
176+
});
177+
return shelf.Response.notFound('');
178+
}
179+
180+
String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
181+
182+
Future<shelf.Response> _handle(shelf.Request req) async {
183+
var path = _getOriginalPath(req);
184+
if (_uri.path != path) {
185+
return shelf.Response.notFound('');
186+
}
187+
188+
if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
189+
return _createSseConnection(req, path);
190+
}
191+
192+
if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
193+
return _handleIncomingMessage(req, path);
194+
}
195+
196+
return shelf.Response.notFound('');
197+
}
198+
199+
Future<shelf.Response> _handleIncomingMessage(
200+
shelf.Request req, String path) async {
201+
try {
202+
var clientId = req.url.queryParameters['sseClientId'];
203+
var message = await req.readAsString();
204+
var jsonObject = json.decode(message) as String;
205+
_connections[clientId]?._incomingController?.add(jsonObject);
206+
} catch (e, st) {
207+
_logger.fine('Failed to handle incoming message. $e $st');
208+
}
209+
return shelf.Response.ok('', headers: {
210+
'access-control-allow-credentials': 'true',
211+
'access-control-allow-origin': _originFor(req),
212+
});
213+
}
214+
215+
String _originFor(shelf.Request req) =>
216+
// Firefox does not set header "origin".
217+
// https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
218+
req.headers['origin'] ?? req.headers['host'];
219+
}
220+
221+
void closeSink(SseConnection connection) => connection._sink.close();

0 commit comments

Comments
 (0)