Cancel aborted sessions
This commit is contained in:
parent
096c42adbc
commit
a8b227ec1c
190
Ooui/UI.cs
190
Ooui/UI.cs
|
@ -179,7 +179,7 @@ namespace Ooui
|
||||||
response.Close ();
|
response.Close ();
|
||||||
}
|
}
|
||||||
|
|
||||||
static async void ProcessWebSocketRequest (HttpListenerContext listenerContext, CancellationToken token)
|
static async void ProcessWebSocketRequest (HttpListenerContext listenerContext, CancellationToken serverToken)
|
||||||
{
|
{
|
||||||
//
|
//
|
||||||
// Find the element
|
// Find the element
|
||||||
|
@ -225,82 +225,11 @@ namespace Ooui
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Keep a list of all the elements for which we've transmitted the initial state
|
// Create a new session and let it handle everything from here
|
||||||
//
|
|
||||||
var createdIds = new HashSet<string> {
|
|
||||||
"window",
|
|
||||||
"document",
|
|
||||||
"document.body",
|
|
||||||
};
|
|
||||||
|
|
||||||
//
|
|
||||||
// Preparse handlers for the element
|
|
||||||
//
|
|
||||||
Action<Message> onElementMessage = async m => {
|
|
||||||
if (webSocket == null) return;
|
|
||||||
try {
|
|
||||||
await SendMessageAsync (webSocket, m, element, createdIds, token).ConfigureAwait (false);
|
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
Error ("Failed to handled element message", ex);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
//
|
|
||||||
// Communicate!
|
|
||||||
//
|
//
|
||||||
try {
|
try {
|
||||||
//
|
var session = new Session (webSocket, element);
|
||||||
// Start watching for changes in the element
|
await session.RunAsync (serverToken).ConfigureAwait (false);
|
||||||
//
|
|
||||||
element.MessageSent += onElementMessage;
|
|
||||||
|
|
||||||
//
|
|
||||||
// Add it to the document body
|
|
||||||
//
|
|
||||||
await SendMessageAsync (webSocket, new Message {
|
|
||||||
TargetId = "document.body",
|
|
||||||
MessageType = MessageType.Call,
|
|
||||||
Key = "appendChild",
|
|
||||||
Value = new[] { element },
|
|
||||||
}, element, createdIds, token).ConfigureAwait (false);
|
|
||||||
|
|
||||||
//
|
|
||||||
// Listen for events
|
|
||||||
//
|
|
||||||
var receiveBuffer = new byte[1024];
|
|
||||||
|
|
||||||
while (webSocket.State == WebSocketState.Open && !token.IsCancellationRequested) {
|
|
||||||
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), token).ConfigureAwait (false);
|
|
||||||
|
|
||||||
if (receiveResult.MessageType == WebSocketMessageType.Close) {
|
|
||||||
await webSocket.CloseAsync (WebSocketCloseStatus.NormalClosure, "", token).ConfigureAwait (false);
|
|
||||||
}
|
|
||||||
else if (receiveResult.MessageType == WebSocketMessageType.Binary) {
|
|
||||||
await webSocket.CloseAsync (WebSocketCloseStatus.InvalidMessageType, "Cannot accept binary frame", token).ConfigureAwait (false);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
var size = receiveResult.Count;
|
|
||||||
while (!receiveResult.EndOfMessage) {
|
|
||||||
if (size >= receiveBuffer.Length) {
|
|
||||||
await webSocket.CloseAsync (WebSocketCloseStatus.MessageTooBig, "Message too big", token).ConfigureAwait (false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
receiveResult = await webSocket.ReceiveAsync (new ArraySegment<byte>(receiveBuffer, size, receiveBuffer.Length - size), token).ConfigureAwait (false);
|
|
||||||
size += receiveResult.Count;
|
|
||||||
}
|
|
||||||
var receivedString = Encoding.UTF8.GetString (receiveBuffer, 0, size);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Console.WriteLine ("RECEIVED: {0}", receivedString);
|
|
||||||
var message = Newtonsoft.Json.JsonConvert.DeserializeObject<Message> (receivedString);
|
|
||||||
element.Receive (message);
|
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
Error ("Failed to process received message", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) {
|
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) {
|
||||||
// The remote party closed the WebSocket connection without completing the close handshake.
|
// The remote party closed the WebSocket connection without completing the close handshake.
|
||||||
|
@ -309,7 +238,6 @@ namespace Ooui
|
||||||
Error ("Web socket failed", ex);
|
Error ("Web socket failed", ex);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
element.MessageSent -= onElementMessage;
|
|
||||||
webSocket?.Dispose ();
|
webSocket?.Dispose ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -364,5 +292,115 @@ namespace Ooui
|
||||||
Console.WriteLine ("{0}: {1}", message, ex);
|
Console.WriteLine ("{0}: {1}", message, ex);
|
||||||
Console.ResetColor ();
|
Console.ResetColor ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class Session
|
||||||
|
{
|
||||||
|
readonly WebSocket webSocket;
|
||||||
|
readonly Element element;
|
||||||
|
|
||||||
|
public Session (WebSocket webSocket, Element element)
|
||||||
|
{
|
||||||
|
this.webSocket = webSocket;
|
||||||
|
this.element = element;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task RunAsync (CancellationToken serverToken)
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Create a new session cancellation token that will trigger
|
||||||
|
// automatically if the server shutsdown or the session shutsdown.
|
||||||
|
//
|
||||||
|
var sessionCts = new CancellationTokenSource ();
|
||||||
|
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource (serverToken, sessionCts.Token);
|
||||||
|
var token = linkedCts.Token;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Keep a list of all the elements for which we've transmitted the initial state
|
||||||
|
//
|
||||||
|
var createdIds = new HashSet<string> {
|
||||||
|
"window",
|
||||||
|
"document",
|
||||||
|
"document.body",
|
||||||
|
};
|
||||||
|
|
||||||
|
//
|
||||||
|
// Preparse handlers for the element
|
||||||
|
//
|
||||||
|
Action<Message> onElementMessage = null;
|
||||||
|
onElementMessage = async m => {
|
||||||
|
if (webSocket == null) return;
|
||||||
|
try {
|
||||||
|
await SendMessageAsync (webSocket, m, element, createdIds, token).ConfigureAwait (false);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
if (webSocket.State == WebSocketState.Aborted) {
|
||||||
|
Error ("WebSocket is aborted, cancelling session", ex);
|
||||||
|
element.MessageSent -= onElementMessage;
|
||||||
|
sessionCts.Cancel ();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Error ("Failed to handle element message", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
//
|
||||||
|
// Start watching for changes in the element
|
||||||
|
//
|
||||||
|
element.MessageSent += onElementMessage;
|
||||||
|
|
||||||
|
try {
|
||||||
|
//
|
||||||
|
// Add it to the document body
|
||||||
|
//
|
||||||
|
await SendMessageAsync (webSocket, new Message {
|
||||||
|
TargetId = "document.body",
|
||||||
|
MessageType = MessageType.Call,
|
||||||
|
Key = "appendChild",
|
||||||
|
Value = new[] { element },
|
||||||
|
}, element, createdIds, token).ConfigureAwait (false);
|
||||||
|
|
||||||
|
//
|
||||||
|
// Listen for events
|
||||||
|
//
|
||||||
|
var receiveBuffer = new byte[1024];
|
||||||
|
|
||||||
|
while (webSocket.State == WebSocketState.Open && !token.IsCancellationRequested) {
|
||||||
|
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), token).ConfigureAwait (false);
|
||||||
|
|
||||||
|
if (receiveResult.MessageType == WebSocketMessageType.Close) {
|
||||||
|
await webSocket.CloseAsync (WebSocketCloseStatus.NormalClosure, "", token).ConfigureAwait (false);
|
||||||
|
}
|
||||||
|
else if (receiveResult.MessageType == WebSocketMessageType.Binary) {
|
||||||
|
await webSocket.CloseAsync (WebSocketCloseStatus.InvalidMessageType, "Cannot accept binary frame", token).ConfigureAwait (false);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
var size = receiveResult.Count;
|
||||||
|
while (!receiveResult.EndOfMessage) {
|
||||||
|
if (size >= receiveBuffer.Length) {
|
||||||
|
await webSocket.CloseAsync (WebSocketCloseStatus.MessageTooBig, "Message too big", token).ConfigureAwait (false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
receiveResult = await webSocket.ReceiveAsync (new ArraySegment<byte>(receiveBuffer, size, receiveBuffer.Length - size), token).ConfigureAwait (false);
|
||||||
|
size += receiveResult.Count;
|
||||||
|
}
|
||||||
|
var receivedString = Encoding.UTF8.GetString (receiveBuffer, 0, size);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Console.WriteLine ("RECEIVED: {0}", receivedString);
|
||||||
|
var message = Newtonsoft.Json.JsonConvert.DeserializeObject<Message> (receivedString);
|
||||||
|
element.Receive (message);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
Error ("Failed to process received message", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
element.MessageSent -= onElementMessage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue