fixed a bug with messages over 4096 bytes and added easier way to use this

This commit is contained in:
Mike Nolan 2022-11-23 06:55:49 -06:00
parent 61caa43268
commit 10603d336d
2 changed files with 93 additions and 26 deletions

View File

@ -13,6 +13,67 @@ using System.IO;
namespace Tesses.WebServer namespace Tesses.WebServer
{ {
public abstract class EasyWebSocketServer
{
System.Timers.Timer timer;
Func<WebSocketMessage,Task> wsm;
Func<byte[],Task> ping;
bool canEnable;
public bool Enabled {
get{
if(!canEnable) return false;
return timer.Enabled;
}
set{
if(canEnable) timer.Enabled = value;
}}
internal async Task Opened(Func<WebSocketMessage,Task> sendWsm,Func<byte[],Task> ping,CancellationToken token)
{
this.wsm=sendWsm;
this.ping =ping;
timer=new System.Timers.Timer();
timer.Elapsed += async(sender,e)=>{
try{
await Ping();
}catch(Exception ex)
{
_=ex;
}
};
timer.Interval = 10000;
canEnable=true;
await OnConnectionStarted(token);
}
public abstract Task OnConnectionStarted(CancellationToken token);
public async Task Ping()
{
await Ping(new byte[]{ (byte)'P', (byte)'i', (byte)'n', (byte)'g' });
}
public async Task Ping(byte[] data)
{
await ping(data);
}
public abstract Task OnReceiveMessage(WebSocketMessage msg);
public async Task SendMessage(WebSocketMessage msg)
{
await wsm(msg);
}
public void Close(bool clean)
{
canEnable=false;
timer.Enabled=false;
timer.Dispose();
OnConnectionEnded(clean);
}
protected virtual void OnConnectionEnded(bool clean)
{
}
}
public static class WebSocketExtensions public static class WebSocketExtensions
{ {
@ -44,6 +105,10 @@ namespace Tesses.WebServer
},c)),async(m)=>await Task.Run(()=>arrived(m)),closed); },c)),async(m)=>await Task.Run(()=>arrived(m)),closed);
Task.Run(()=>t).Wait(); Task.Run(()=>t).Wait();
} }
public static async Task StartEasyWebSocketConnectionAsync(this ServerContext ctx,EasyWebSocketServer wss)
{
await ctx.StartWebSocketConnectionAsync(wss.Opened,wss.OnReceiveMessage,wss.Close);
}
public static async Task StartWebSocketConnectionAsync(this ServerContext ctx,Func<Func<WebSocketMessage,Task>,Func<byte[],Task>,CancellationToken,Task> opened,Func<WebSocketMessage,Task> arrived,Action<bool> closed) public static async Task StartWebSocketConnectionAsync(this ServerContext ctx,Func<Func<WebSocketMessage,Task>,Func<byte[],Task>,CancellationToken,Task> opened,Func<WebSocketMessage,Task> arrived,Action<bool> closed)
{ {
WebSocketServer server=new WebSocketServer(ctx); WebSocketServer server=new WebSocketServer(ctx);
@ -122,20 +187,6 @@ namespace Tesses.WebServer.WebSocket
{ {
Text=JsonConvert.SerializeObject(data); Text=JsonConvert.SerializeObject(data);
} }
internal IEnumerable<(byte[] array,int)> GetPackets()
{
int read=0;
int offset=0;
byte[] buffer=new byte[4096];
do
{
read = Math.Min(buffer.Length,data.Length-offset);
Array.Copy(data,offset,buffer,0,read);
yield return (buffer,read);
offset+=read;
}while(read>0);
}
public string Text {get{return Encoding.UTF8.GetString(Data);} private set{data=Encoding.UTF8.GetBytes(value); Binary=false;}} public string Text {get{return Encoding.UTF8.GetString(Data);} private set{data=Encoding.UTF8.GetBytes(value); Binary=false;}}
@ -159,6 +210,7 @@ namespace Tesses.WebServer.WebSocket
public class WebSocketServer public class WebSocketServer
{ {
Mutex mtx=new Mutex();
bool hasInit=false; bool hasInit=false;
ServerContext context; ServerContext context;
public WebSocketServer(ServerContext ctx) public WebSocketServer(ServerContext ctx)
@ -194,30 +246,42 @@ namespace Tesses.WebServer.WebSocket
public async Task SendMessageAsync(WebSocketMessage msg) public async Task SendMessageAsync(WebSocketMessage msg)
{ {
while(!hasInit) ;
int opCode = msg.Binary ? 0x2 : 0x1;
(byte[] buff,int len)[] parts = msg.GetPackets().ToArray();
for(int i = 0;i<parts.Length;i++) while(!hasInit) ;
mtx.WaitOne();
int opCode = msg.Binary ? 0x2 : 0x1;
int dataBytes = msg.Data.Length;
int lengthlastByte = dataBytes % 4096;
int noPackets = (int)Math.Ceiling(dataBytes / 4096.0);
for(int i = 0;i<noPackets;i++)
{ {
bool fin = i == parts.Length-1; bool fin = i == noPackets-1;
int finField = fin ? 0b10000000 : 0; int finField = fin ? 0b10000000 : 0;
int opCode2 = i==0 ? opCode : 0; int opCode2 = i==0 ? opCode : 0;
byte firstByte= (byte)(finField | (opCode2 & 0xF)); byte firstByte= (byte)(finField | (opCode2 & 0xF));
var b=glenBytes(parts[i].len); int r=(i==noPackets-1?lengthlastByte : 4096);
byte[] message = new byte[1+b.Length + parts[i].len]; var b=glenBytes(r);
byte[] message = new byte[1+b.Length + (fin?lengthlastByte : 4096)];
message[0]=firstByte; message[0]=firstByte;
Array.Copy(b,0,message,1,b.Length); Array.Copy(b,0,message,1,b.Length);
Array.Copy(parts[i].buff,0,message,1+b.Length,parts[i].len); Array.Copy(msg.Data,i*4096,message,1+b.Length,r);
await context.NetworkStream.WriteAsync(message,0,message.Length); await context.NetworkStream.WriteAsync(message,0,message.Length);
} }
mtx.ReleaseMutex();
} }
private async Task PongSend(byte[] msg,long len) private async Task PongSend(byte[] msg,long len)
{ {
mtx.WaitOne();
int finField = 0b10000000 ; int finField = 0b10000000 ;
byte firstByte= (byte)(finField | 0xA); byte firstByte= (byte)(finField | 0xA);
@ -227,7 +291,7 @@ namespace Tesses.WebServer.WebSocket
Array.Copy(b,0,message,1,b.Length); Array.Copy(b,0,message,1,b.Length);
Array.Copy(msg,0,message,1+b.Length,len); Array.Copy(msg,0,message,1+b.Length,len);
await context.NetworkStream.WriteAsync(message,0,message.Length); await context.NetworkStream.WriteAsync(message,0,message.Length);
mtx.ReleaseMutex();
} }
private string get_Sec_WebSocketAccept(string headerVal) private string get_Sec_WebSocketAccept(string headerVal)
{ {
@ -264,6 +328,7 @@ namespace Tesses.WebServer.WebSocket
public async Task Ping(byte[] ping) public async Task Ping(byte[] ping)
{ {
mtx.WaitOne();
int finField = 0b10000000 ; int finField = 0b10000000 ;
byte firstByte= (byte)(finField | 0x9); byte firstByte= (byte)(finField | 0x9);
@ -273,6 +338,7 @@ namespace Tesses.WebServer.WebSocket
Array.Copy(b,0,message,1,b.Length); Array.Copy(b,0,message,1,b.Length);
Array.Copy(ping,0,message,1+b.Length,ping.Length); Array.Copy(ping,0,message,1+b.Length,ping.Length);
await context.NetworkStream.WriteAsync(message,0,message.Length); await context.NetworkStream.WriteAsync(message,0,message.Length);
mtx.ReleaseMutex();
} }
private async Task<short> get_short() private async Task<short> get_short()
{ {
@ -286,6 +352,7 @@ namespace Tesses.WebServer.WebSocket
} }
private async Task<(byte[] data,long len)> read_packet_async(byte len) private async Task<(byte[] data,long len)> read_packet_async(byte len)
{ {
int realLen=len & 127; int realLen=len & 127;
bool masked=(len & 0b10000000) > 0; bool masked=(len & 0b10000000) > 0;
long realLen2 = realLen >= 126 ? realLen > 126 ? await get_long() : await get_short() : realLen; long realLen2 = realLen >= 126 ? realLen > 126 ? await get_long() : await get_short() : realLen;

View File

@ -5,9 +5,9 @@
<PackageId>Tesses.WebServer.WebSocket</PackageId> <PackageId>Tesses.WebServer.WebSocket</PackageId>
<Author>Mike Nolan</Author> <Author>Mike Nolan</Author>
<Company>Tesses</Company> <Company>Tesses</Company>
<Version>1.0.0</Version> <Version>1.0.1</Version>
<AssemblyVersion>1.0.0</AssemblyVersion> <AssemblyVersion>1.0.1</AssemblyVersion>
<FileVersion>1.0.0</FileVersion> <FileVersion>1.0.1<FileVersion>
<Description>WebSockets for Tesses.WebServer</Description> <Description>WebSockets for Tesses.WebServer</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression> <PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>HTTP, WebServer, Website, WebSockets</PackageTags> <PackageTags>HTTP, WebServer, Website, WebSockets</PackageTags>