Throttle message sends
This commit is contained in:
		
							parent
							
								
									15578a6bac
								
							
						
					
					
						commit
						01668873dd
					
				|  | @ -114,9 +114,13 @@ socket.addEventListener('open', function (event) { | |||
| }); | ||||
| 
 | ||||
| socket.addEventListener('message', function (event) { | ||||
|     const message = JSON.parse (event.data); | ||||
|     // console.log('Raw value from server', message.v);
 | ||||
|     message.v = fixupValue (message.v); | ||||
|     // console.log('Message from server', message);
 | ||||
|     processMessage (message); | ||||
|     const messages = JSON.parse (event.data); | ||||
|     console.log("Messages", messages); | ||||
|     if (Array.isArray (messages)) { | ||||
|         messages.forEach (function (m) { | ||||
|             // console.log('Raw value from server', m.v);
 | ||||
|             m.v = fixupValue (m.v); | ||||
|             processMessage (m); | ||||
|         }); | ||||
|     } | ||||
| }); | ||||
|  |  | |||
							
								
								
									
										179
									
								
								Ooui/UI.cs
								
								
								
								
							
							
						
						
									
										179
									
								
								Ooui/UI.cs
								
								
								
								
							|  | @ -228,8 +228,8 @@ namespace Ooui | |||
|             // Create a new session and let it handle everything from here | ||||
|             // | ||||
|             try { | ||||
|                 var session = new Session (webSocket, element); | ||||
|                 await session.RunAsync (serverToken).ConfigureAwait (false); | ||||
|                 var session = new Session (webSocket, element, serverToken); | ||||
|                 await session.RunAsync ().ConfigureAwait (false); | ||||
|             } | ||||
|             catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) { | ||||
|                 // The remote party closed the WebSocket connection without completing the close handshake. | ||||
|  | @ -242,50 +242,6 @@ namespace Ooui | |||
|             } | ||||
|         } | ||||
| 
 | ||||
|         static async Task SendMessageAsync (WebSocket webSocket, Message message, EventTarget target, HashSet<string> createdIds, CancellationToken token) | ||||
|         { | ||||
|             // | ||||
|             // Make sure all the referenced objects have been created | ||||
|             // | ||||
|             if (message.MessageType == MessageType.Create) { | ||||
|                 createdIds.Add (message.TargetId); | ||||
|             } | ||||
|             else { | ||||
|                 if (!createdIds.Contains (message.TargetId)) { | ||||
|                     createdIds.Add (message.TargetId); | ||||
|                     await SendStateMessagesAsync (webSocket, target.GetElementById (message.TargetId), createdIds, token).ConfigureAwait (false); | ||||
|                 } | ||||
|                 if (message.Value is Array a) { | ||||
|                     for (var i = 0; i < a.Length; i++) { | ||||
|                         // Console.WriteLine ($"A{i} = {a.GetValue(i)}"); | ||||
|                         if (a.GetValue (i) is EventTarget e && !createdIds.Contains (e.Id)) { | ||||
|                             createdIds.Add (e.Id); | ||||
|                             await SendStateMessagesAsync (webSocket, e, createdIds, token).ConfigureAwait (false); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             // | ||||
|             // Now actually send this message | ||||
|             // | ||||
|             if (token.IsCancellationRequested) | ||||
|                 return; | ||||
|             var json = Newtonsoft.Json.JsonConvert.SerializeObject (message); | ||||
|             var outputBuffer = new ArraySegment<byte> (Encoding.UTF8.GetBytes (json)); | ||||
|             await webSocket.SendAsync (outputBuffer, WebSocketMessageType.Text, true, token).ConfigureAwait (false); | ||||
|         } | ||||
| 
 | ||||
|         static async Task SendStateMessagesAsync (WebSocket webSocket, EventTarget target, HashSet<string> createdIds, CancellationToken token) | ||||
|         { | ||||
|             if (target == null) return; | ||||
| 
 | ||||
|             foreach (var m in target.StateMessages) { | ||||
|                 if (token.IsCancellationRequested) return; | ||||
|                 await SendMessageAsync (webSocket, m, target, createdIds, token).ConfigureAwait (false); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         static void Error (string message, Exception ex) | ||||
|         { | ||||
|             Console.ForegroundColor = ConsoleColor.Red; | ||||
|  | @ -297,27 +253,35 @@ namespace Ooui | |||
|         { | ||||
|             readonly WebSocket webSocket; | ||||
|             readonly Element element; | ||||
|             readonly Action<Message> handleElementMessageSent; | ||||
| 
 | ||||
|             public Session (WebSocket webSocket, Element element) | ||||
|             readonly CancellationTokenSource sessionCts = new CancellationTokenSource (); | ||||
|             readonly CancellationTokenSource linkedCts; | ||||
|             readonly CancellationToken token; | ||||
| 
 | ||||
|             readonly HashSet<string> createdIds; | ||||
|             readonly List<Message> queuedMessages = new List<Message> (); | ||||
| 
 | ||||
|             readonly System.Timers.Timer sendThrottle; | ||||
|             DateTime lastTransmitTime = DateTime.MinValue; | ||||
|             readonly TimeSpan throttleInterval = TimeSpan.FromSeconds (1.0 / 30); // 30 FPS max | ||||
| 
 | ||||
|             public Session (WebSocket webSocket, Element element, CancellationToken serverToken) | ||||
|             { | ||||
|                 this.webSocket = webSocket; | ||||
|                 this.element = element; | ||||
|             } | ||||
| 
 | ||||
|             public async Task RunAsync (CancellationToken serverToken) | ||||
|             { | ||||
|                 // | ||||
|                 // Create a new session cancellation token that will trigger | ||||
|                 // automatically if the server shutsdown or the session shutsdown. | ||||
|                 // | ||||
|                 var sessionCts = new CancellationTokenSource (); | ||||
|                 var linkedCts = CancellationTokenSource.CreateLinkedTokenSource (serverToken, sessionCts.Token); | ||||
|                 var token = linkedCts.Token; | ||||
|                 linkedCts = CancellationTokenSource.CreateLinkedTokenSource (serverToken, sessionCts.Token); | ||||
|                 token = linkedCts.Token; | ||||
| 
 | ||||
|                 // | ||||
|                 // Keep a list of all the elements for which we've transmitted the initial state | ||||
|                 // | ||||
|                 var createdIds = new HashSet<string> { | ||||
|                 createdIds = new HashSet<string> { | ||||
|                     "window", | ||||
|                     "document", | ||||
|                     "document.body", | ||||
|  | @ -326,38 +290,38 @@ namespace Ooui | |||
|                 // | ||||
|                 // Preparse handlers for the element | ||||
|                 // | ||||
|                 Action<Message> onElementMessage = null; | ||||
|                 onElementMessage = async m => { | ||||
|                     if (webSocket == null) return; | ||||
|                     try { | ||||
|                         await SendMessageAsync (webSocket, m, element, createdIds, token).ConfigureAwait (false); | ||||
|                     } | ||||
|                     catch (Exception ex) {                         | ||||
|                         if (webSocket.State == WebSocketState.Aborted) { | ||||
|                             Error ("WebSocket is aborted, cancelling session", ex); | ||||
|                             element.MessageSent -= onElementMessage; | ||||
|                             sessionCts.Cancel (); | ||||
|                         } | ||||
|                         else { | ||||
|                             Error ("Failed to handle element message", ex); | ||||
|                         } | ||||
|                 handleElementMessageSent = QueueMessage; | ||||
| 
 | ||||
|                 // | ||||
|                 // Create a timer to use as a throttle when sending messages | ||||
|                 // | ||||
|                 sendThrottle = new System.Timers.Timer (throttleInterval.TotalMilliseconds); | ||||
|                 sendThrottle.Elapsed += (s, e) => { | ||||
|                     // e.SignalTime | ||||
|                     System.Console.WriteLine ("TICK SEND THROTTLE FOR {0}", element); | ||||
|                     if ((e.SignalTime - lastTransmitTime) >= throttleInterval) { | ||||
|                         sendThrottle.Enabled = false; | ||||
|                         lastTransmitTime = e.SignalTime; | ||||
|                         TransmitQueuedMessages (); | ||||
|                     } | ||||
|                 }; | ||||
|             } | ||||
| 
 | ||||
|             public async Task RunAsync () | ||||
|             { | ||||
|                 // | ||||
|                 // Start watching for changes in the element | ||||
|                 // | ||||
|                 element.MessageSent += onElementMessage; | ||||
|                 element.MessageSent += handleElementMessageSent; | ||||
| 
 | ||||
|                 try { | ||||
|                     // | ||||
|                     // Add it to the document body | ||||
|                     // | ||||
|                     await SendMessageAsync (webSocket, Message.Call ("document.body", "appendChild", element), | ||||
|                         element, createdIds, token).ConfigureAwait (false); | ||||
|                     QueueMessage (Message.Call ("document.body", "appendChild", element)); | ||||
| 
 | ||||
|                     // | ||||
|                     // Listen for events | ||||
|                     // Start the Read Loop | ||||
|                     // | ||||
|                     var receiveBuffer = new byte[1024]; | ||||
| 
 | ||||
|  | @ -396,7 +360,74 @@ namespace Ooui | |||
|                     } | ||||
|                 } | ||||
|                 finally { | ||||
|                     element.MessageSent -= onElementMessage; | ||||
|                     element.MessageSent -= handleElementMessageSent; | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             void QueueStateMessages (EventTarget target) | ||||
|             { | ||||
|                 if (target == null) return; | ||||
|                 foreach (var m in target.StateMessages) { | ||||
|                     QueueMessage (m); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             void QueueMessage (Message message) | ||||
|             { | ||||
|                 // | ||||
|                 // Make sure all the referenced objects have been created | ||||
|                 // | ||||
|                 if (message.MessageType == MessageType.Create) { | ||||
|                     createdIds.Add (message.TargetId); | ||||
|                 } | ||||
|                 else { | ||||
|                     if (!createdIds.Contains (message.TargetId)) { | ||||
|                         createdIds.Add (message.TargetId); | ||||
|                         QueueStateMessages (element.GetElementById (message.TargetId)); | ||||
|                     } | ||||
|                     if (message.Value is Array a) { | ||||
|                         for (var i = 0; i < a.Length; i++) { | ||||
|                             // Console.WriteLine ($"A{i} = {a.GetValue(i)}"); | ||||
|                             if (a.GetValue (i) is EventTarget e && !createdIds.Contains (e.Id)) { | ||||
|                                 createdIds.Add (e.Id); | ||||
|                                 QueueStateMessages (e); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 // | ||||
|                 // Add it to the queue | ||||
|                 // | ||||
|                 lock (queuedMessages) queuedMessages.Add (message); | ||||
|                 sendThrottle.Enabled = true; | ||||
|             } | ||||
| 
 | ||||
|             async void TransmitQueuedMessages () | ||||
|             { | ||||
|                 try { | ||||
|                     // | ||||
|                     // Dequeue as many messages as we can | ||||
|                     // | ||||
|                     var messagesToSend = new List<Message> (); | ||||
|                     lock (queuedMessages) { | ||||
|                         messagesToSend.AddRange (queuedMessages); | ||||
|                         queuedMessages.Clear (); | ||||
|                     } | ||||
|                     if (messagesToSend.Count == 0) | ||||
|                         return; | ||||
| 
 | ||||
|                     // | ||||
|                     // Now actually send this message | ||||
|                     // | ||||
|                     var json = Newtonsoft.Json.JsonConvert.SerializeObject (messagesToSend); | ||||
|                     var outputBuffer = new ArraySegment<byte> (Encoding.UTF8.GetBytes (json)); | ||||
|                     await webSocket.SendAsync (outputBuffer, WebSocketMessageType.Text, true, token).ConfigureAwait (false); | ||||
|                 } | ||||
|                 catch (Exception ex) {                         | ||||
|                     Error ("Failed to send queued messages, aborting session", ex); | ||||
|                     element.MessageSent -= handleElementMessageSent; | ||||
|                     sessionCts.Cancel (); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue