tesses.http/Tesses.Http/Streams.cs

210 lines
5.9 KiB
C#
Raw Permalink Normal View History

2022-08-22 17:30:32 +00:00
using System.Net;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System;
using System.Threading.Tasks;
namespace Tesses.Http
{
public class PrependStream : Stream
{
int offsetInSpecialInfo=0;
byte[] specialInfo;
Stream actualStream;
public PrependStream(byte[] specialInfo,Stream outerStrm)
{
this.specialInfo=specialInfo;
actualStream=outerStrm;
}
public override bool CanRead => actualStream.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => actualStream.CanWrite;
public override long Length => throw new NotImplementedException();
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public override void Flush()
{
actualStream.Flush();
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if(offsetInSpecialInfo < specialInfo.Length)
{
//we are in special info header
int read=Math.Min(count,specialInfo.Length-offsetInSpecialInfo);
Array.Copy(specialInfo,offsetInSpecialInfo,buffer,offset,read);
offsetInSpecialInfo += read;
return read;
}
else
{
return await actualStream.ReadAsync(buffer,offset,count,cancellationToken);
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if(offsetInSpecialInfo < specialInfo.Length)
{
//we are in special info header
int read=Math.Min(count,specialInfo.Length-offsetInSpecialInfo);
Array.Copy(specialInfo,offsetInSpecialInfo,buffer,offset,read);
offsetInSpecialInfo += read;
return read;
}
else
{
return actualStream.Read(buffer,offset,count);
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
actualStream.Write(buffer,offset,count);
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await actualStream.WriteAsync(buffer, offset, count, cancellationToken);
}
}
public class StreamPiper : IDisposable
{
CancellationTokenSource token=new CancellationTokenSource();
Thread _t1,_t2;
Stream _s1,_s2;
bool ownStrm;
public StreamPiper(Stream s1,Stream s2,bool ownStrm=true)
{
_s1=s1;
_s2=s2;
_t1=new Thread(CopyStream1To2);
_t2=new Thread(CopyStream2To1);
this.ownStrm=ownStrm;
}
private void CopyStream1To2()
{
try{
_s1.CopyToAsync(_s2,1024,token.Token).Wait();
}catch(System.AggregateException ex)
{
_=ex;
token.Cancel();
}
}
private void CopyStream2To1()
{
try{
_s2.CopyToAsync(_s1,1024,token.Token).Wait();
}catch(System.AggregateException ex)
{
_=ex;
token.Cancel();
}
}
public void Dispose()
{
token.Cancel();
token.Dispose();
_t1.Abort();
_t2.Abort();
if(ownStrm)
{
_s1.Dispose();
_s2.Dispose();
}
}
public void Pipe(CancellationToken token=default(CancellationToken))
{
token.Register(()=>{
this.token.Cancel();
});
_t1.Start();
_t2.Start();
_t1.Join();
_t2.Join();
}
}
public class RangeStream : Stream
{
Stream _strm;
long _len;
long _read=0;
public RangeStream(Stream strm,long? pos,long? len)
{
long _pos=0;
if(pos.HasValue)
{
_pos=pos.Value;
strm.Seek(pos.Value,SeekOrigin.Begin);
}
if(len.HasValue)
{
_len=len.Value;
}else{
_len=strm.Length;
}
_len=Math.Min(_len,strm.Length - _pos);
_strm=strm;
}
public override bool CanRead => _strm.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => _len;
public override long Position { get => _read; set => throw new NotImplementedException(); }
public override void Flush()
{
_strm.Flush();
}
public override int Read(byte[] buffer, int offset, int count)
{
int read=(int)Math.Min(_len-_read,count);
if(read == 0) return 0;
read=_strm.Read(buffer,offset,count);
_read+= read;
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
}