From 01668873dd2a790e98f596a2d97267157ed45bda Mon Sep 17 00:00:00 2001 From: "Frank A. Krueger" Date: Sun, 18 Jun 2017 01:13:15 -0700 Subject: [PATCH] Throttle message sends --- Ooui/Client.js | 14 ++-- Ooui/UI.cs | 179 +++++++++++++++++++++++++++++-------------------- 2 files changed, 114 insertions(+), 79 deletions(-) diff --git a/Ooui/Client.js b/Ooui/Client.js index 197bc1b..d779bdf 100644 --- a/Ooui/Client.js +++ b/Ooui/Client.js @@ -114,9 +114,13 @@ socket.addEventListener('open', function (event) { }); socket.addEventListener('message', function (event) { - const message = JSON.parse (event.data); - // console.log('Raw value from server', message.v); - message.v = fixupValue (message.v); - // console.log('Message from server', message); - processMessage (message); + const messages = JSON.parse (event.data); + console.log("Messages", messages); + if (Array.isArray (messages)) { + messages.forEach (function (m) { + // console.log('Raw value from server', m.v); + m.v = fixupValue (m.v); + processMessage (m); + }); + } }); diff --git a/Ooui/UI.cs b/Ooui/UI.cs index 32e9766..aea02ca 100644 --- a/Ooui/UI.cs +++ b/Ooui/UI.cs @@ -228,8 +228,8 @@ namespace Ooui // Create a new session and let it handle everything from here // try { - var session = new Session (webSocket, element); - await session.RunAsync (serverToken).ConfigureAwait (false); + var session = new Session (webSocket, element, serverToken); + await session.RunAsync ().ConfigureAwait (false); } catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) { // The remote party closed the WebSocket connection without completing the close handshake. @@ -242,50 +242,6 @@ namespace Ooui } } - static async Task SendMessageAsync (WebSocket webSocket, Message message, EventTarget target, HashSet createdIds, CancellationToken token) - { - // - // Make sure all the referenced objects have been created - // - if (message.MessageType == MessageType.Create) { - createdIds.Add (message.TargetId); - } - else { - if (!createdIds.Contains (message.TargetId)) { - createdIds.Add (message.TargetId); - await SendStateMessagesAsync (webSocket, target.GetElementById (message.TargetId), createdIds, token).ConfigureAwait (false); - } - if (message.Value is Array a) { - for (var i = 0; i < a.Length; i++) { - // Console.WriteLine ($"A{i} = {a.GetValue(i)}"); - if (a.GetValue (i) is EventTarget e && !createdIds.Contains (e.Id)) { - createdIds.Add (e.Id); - await SendStateMessagesAsync (webSocket, e, createdIds, token).ConfigureAwait (false); - } - } - } - } - - // - // Now actually send this message - // - if (token.IsCancellationRequested) - return; - var json = Newtonsoft.Json.JsonConvert.SerializeObject (message); - var outputBuffer = new ArraySegment (Encoding.UTF8.GetBytes (json)); - await webSocket.SendAsync (outputBuffer, WebSocketMessageType.Text, true, token).ConfigureAwait (false); - } - - static async Task SendStateMessagesAsync (WebSocket webSocket, EventTarget target, HashSet createdIds, CancellationToken token) - { - if (target == null) return; - - foreach (var m in target.StateMessages) { - if (token.IsCancellationRequested) return; - await SendMessageAsync (webSocket, m, target, createdIds, token).ConfigureAwait (false); - } - } - static void Error (string message, Exception ex) { Console.ForegroundColor = ConsoleColor.Red; @@ -297,27 +253,35 @@ namespace Ooui { readonly WebSocket webSocket; readonly Element element; + readonly Action handleElementMessageSent; - public Session (WebSocket webSocket, Element element) + readonly CancellationTokenSource sessionCts = new CancellationTokenSource (); + readonly CancellationTokenSource linkedCts; + readonly CancellationToken token; + + readonly HashSet createdIds; + readonly List queuedMessages = new List (); + + readonly System.Timers.Timer sendThrottle; + DateTime lastTransmitTime = DateTime.MinValue; + readonly TimeSpan throttleInterval = TimeSpan.FromSeconds (1.0 / 30); // 30 FPS max + + public Session (WebSocket webSocket, Element element, CancellationToken serverToken) { this.webSocket = webSocket; this.element = element; - } - public async Task RunAsync (CancellationToken serverToken) - { // // Create a new session cancellation token that will trigger // automatically if the server shutsdown or the session shutsdown. // - var sessionCts = new CancellationTokenSource (); - var linkedCts = CancellationTokenSource.CreateLinkedTokenSource (serverToken, sessionCts.Token); - var token = linkedCts.Token; + linkedCts = CancellationTokenSource.CreateLinkedTokenSource (serverToken, sessionCts.Token); + token = linkedCts.Token; // // Keep a list of all the elements for which we've transmitted the initial state // - var createdIds = new HashSet { + createdIds = new HashSet { "window", "document", "document.body", @@ -326,38 +290,38 @@ namespace Ooui // // Preparse handlers for the element // - Action onElementMessage = null; - onElementMessage = async m => { - if (webSocket == null) return; - try { - await SendMessageAsync (webSocket, m, element, createdIds, token).ConfigureAwait (false); - } - catch (Exception ex) { - if (webSocket.State == WebSocketState.Aborted) { - Error ("WebSocket is aborted, cancelling session", ex); - element.MessageSent -= onElementMessage; - sessionCts.Cancel (); - } - else { - Error ("Failed to handle element message", ex); - } + handleElementMessageSent = QueueMessage; + + // + // Create a timer to use as a throttle when sending messages + // + sendThrottle = new System.Timers.Timer (throttleInterval.TotalMilliseconds); + sendThrottle.Elapsed += (s, e) => { + // e.SignalTime + System.Console.WriteLine ("TICK SEND THROTTLE FOR {0}", element); + if ((e.SignalTime - lastTransmitTime) >= throttleInterval) { + sendThrottle.Enabled = false; + lastTransmitTime = e.SignalTime; + TransmitQueuedMessages (); } }; + } + public async Task RunAsync () + { // // Start watching for changes in the element // - element.MessageSent += onElementMessage; + element.MessageSent += handleElementMessageSent; try { // // Add it to the document body // - await SendMessageAsync (webSocket, Message.Call ("document.body", "appendChild", element), - element, createdIds, token).ConfigureAwait (false); + QueueMessage (Message.Call ("document.body", "appendChild", element)); // - // Listen for events + // Start the Read Loop // var receiveBuffer = new byte[1024]; @@ -396,7 +360,74 @@ namespace Ooui } } finally { - element.MessageSent -= onElementMessage; + element.MessageSent -= handleElementMessageSent; + } + } + + void QueueStateMessages (EventTarget target) + { + if (target == null) return; + foreach (var m in target.StateMessages) { + QueueMessage (m); + } + } + + void QueueMessage (Message message) + { + // + // Make sure all the referenced objects have been created + // + if (message.MessageType == MessageType.Create) { + createdIds.Add (message.TargetId); + } + else { + if (!createdIds.Contains (message.TargetId)) { + createdIds.Add (message.TargetId); + QueueStateMessages (element.GetElementById (message.TargetId)); + } + if (message.Value is Array a) { + for (var i = 0; i < a.Length; i++) { + // Console.WriteLine ($"A{i} = {a.GetValue(i)}"); + if (a.GetValue (i) is EventTarget e && !createdIds.Contains (e.Id)) { + createdIds.Add (e.Id); + QueueStateMessages (e); + } + } + } + } + + // + // Add it to the queue + // + lock (queuedMessages) queuedMessages.Add (message); + sendThrottle.Enabled = true; + } + + async void TransmitQueuedMessages () + { + try { + // + // Dequeue as many messages as we can + // + var messagesToSend = new List (); + lock (queuedMessages) { + messagesToSend.AddRange (queuedMessages); + queuedMessages.Clear (); + } + if (messagesToSend.Count == 0) + return; + + // + // Now actually send this message + // + var json = Newtonsoft.Json.JsonConvert.SerializeObject (messagesToSend); + var outputBuffer = new ArraySegment (Encoding.UTF8.GetBytes (json)); + await webSocket.SendAsync (outputBuffer, WebSocketMessageType.Text, true, token).ConfigureAwait (false); + } + catch (Exception ex) { + Error ("Failed to send queued messages, aborting session", ex); + element.MessageSent -= handleElementMessageSent; + sessionCts.Cancel (); } } }