From ab07d9d36e3e61f48584920d882d95dead6e7600 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 9 Jul 2023 14:22:43 -0400 Subject: Http compression & response overhaul. Removed intenral compression support. --- .../Core/Buffering/ContextLockedBufferManager.cs | 36 +-- .../src/Core/Buffering/IHttpBufferManager.cs | 6 - .../src/Core/Compression/CompressionMethod.cs | 52 ++++ .../src/Core/Compression/IHttpCompressorManager.cs | 90 +++++++ .../src/Core/Compression/IResponseCompressor.cs | 63 +++++ .../src/Core/Compression/ManagedHttpCompressor.cs | 85 ++++++ lib/Net.Http/src/Core/HttpContext.cs | 57 ++-- lib/Net.Http/src/Core/HttpEncodedSegment.cs | 47 ++++ lib/Net.Http/src/Core/HttpServerBase.cs | 80 +++++- lib/Net.Http/src/Core/HttpServerProcessing.cs | 99 +++---- lib/Net.Http/src/Core/IConnectionContext.cs | 12 +- lib/Net.Http/src/Core/IHttpContextInformation.cs | 4 +- lib/Net.Http/src/Core/IHttpLifeCycle.cs | 7 +- lib/Net.Http/src/Core/IHttpResponseBody.cs | 37 ++- lib/Net.Http/src/Core/Request/HttpInputStream.cs | 17 +- lib/Net.Http/src/Core/Request/HttpRequest.cs | 14 +- .../src/Core/Request/HttpRequestExtensions.cs | 43 +-- .../src/Core/Response/ChunkDataAccumulator.cs | 31 +-- lib/Net.Http/src/Core/Response/ChunkedStream.cs | 292 +++++++++------------ lib/Net.Http/src/Core/Response/DirectStream.cs | 78 ++---- .../src/Core/Response/HeaderDataAccumulator.cs | 143 +++++----- .../Core/Response/HttpContextResponseWriting.cs | 245 +++++++---------- lib/Net.Http/src/Core/Response/HttpResponse.cs | 162 +++++++----- lib/Net.Http/src/Core/Response/ResponseWriter.cs | 175 +++++++++--- .../src/Core/Response/ReusableResponseStream.cs | 95 +++++++ lib/Net.Http/src/Core/ServerPreEncodedSegments.cs | 46 ++++ lib/Net.Http/src/Helpers/HelperTypes.cs | 16 +- lib/Net.Http/src/HttpBufferConfig.cs | 5 - lib/Net.Http/src/HttpConfig.cs | 13 +- 29 files changed, 1296 insertions(+), 754 deletions(-) create mode 100644 lib/Net.Http/src/Core/Compression/CompressionMethod.cs create mode 100644 lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs create mode 100644 lib/Net.Http/src/Core/Compression/IResponseCompressor.cs create mode 100644 lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs create mode 100644 lib/Net.Http/src/Core/HttpEncodedSegment.cs create mode 100644 lib/Net.Http/src/Core/Response/ReusableResponseStream.cs create mode 100644 lib/Net.Http/src/Core/ServerPreEncodedSegments.cs (limited to 'lib/Net.Http/src') diff --git a/lib/Net.Http/src/Core/Buffering/ContextLockedBufferManager.cs b/lib/Net.Http/src/Core/Buffering/ContextLockedBufferManager.cs index 7da105b..ac2da25 100644 --- a/lib/Net.Http/src/Core/Buffering/ContextLockedBufferManager.cs +++ b/lib/Net.Http/src/Core/Buffering/ContextLockedBufferManager.cs @@ -84,8 +84,8 @@ namespace VNLib.Net.Http.Core.Buffering //Header parse buffer is a special case as it will be double the size due to the char buffer int headerParseBufferSize = GetMaxHeaderBufferSize(in Config); - //Discard/form data buffer - int discardAndFormDataSize = ComputeDiscardFormataBufferSize(in Config); + //Response/form data buffer + int responseAndFormDataSize = ComputeResponseAndFormDataBuffer(in Config); //Slice and store the buffer segments _segments = new() @@ -93,13 +93,11 @@ namespace VNLib.Net.Http.Core.Buffering //Shared header buffer HeaderAccumulator = GetNextSegment(ref full, headerParseBufferSize), - //Shared discard buffer and form data buffer - DiscardAndFormData = GetNextSegment(ref full, discardAndFormDataSize), + //Shared response and form data buffer + ResponseAndFormData = GetNextSegment(ref full, responseAndFormDataSize), //Buffers cannot be shared - ChunkedResponseAccumulator = GetNextSegment(ref full, Config.ChunkedResponseAccumulatorSize), - - ResponseBuffer = GetNextSegment(ref full, Config.ResponseBufferSize), + ChunkedResponseAccumulator = GetNextSegment(ref full, Config.ChunkedResponseAccumulatorSize) }; /* @@ -172,20 +170,15 @@ namespace VNLib.Net.Http.Core.Buffering /* - * Discard buffer may be used for form-data parsing as they will never be used at - * the same time during normal operation + * Response buffer and form data buffer are shared because they are never + * used at the same time. */ /// - public Memory GetFormDataBuffer() => _segments.DiscardAndFormData; - - /// - public Memory GetDiscardBuffer() => _segments.DiscardAndFormData; + public Memory GetFormDataBuffer() => _segments.ResponseAndFormData; /// - public Memory GetResponseDataBuffer() => _segments.ResponseBuffer; - - + public Memory GetResponseDataBuffer() => _segments.ResponseAndFormData; static Memory GetNextSegment(ref Memory buffer, int size) { @@ -216,14 +209,14 @@ namespace VNLib.Net.Http.Core.Buffering { return config.ResponseBufferSize + config.ChunkedResponseAccumulatorSize - + ComputeDiscardFormataBufferSize(in config) + + ComputeResponseAndFormDataBuffer(in config) + GetMaxHeaderBufferSize(in config); //Header buffers are shared } - static int ComputeDiscardFormataBufferSize(in HttpBufferConfig config) + static int ComputeResponseAndFormDataBuffer(in HttpBufferConfig config) { //Get the larger of the two buffers, so it can be shared between the two - return Math.Max(config.DiscardBufferSize, config.FormDataBufferSize); + return Math.Max(config.ResponseBufferSize, config.FormDataBufferSize); } @@ -231,9 +224,8 @@ namespace VNLib.Net.Http.Core.Buffering { public readonly Memory HeaderAccumulator { get; init; } public readonly Memory ChunkedResponseAccumulator { get; init; } - public readonly Memory DiscardAndFormData { get; init; } - public readonly Memory ResponseBuffer { get; init; } - } + public readonly Memory ResponseAndFormData { get; init; } + } private sealed class HeaderAccumulatorBuffer: SplitHttpBufferElement, IResponseHeaderAccBuffer, IHttpHeaderParseBuffer diff --git a/lib/Net.Http/src/Core/Buffering/IHttpBufferManager.cs b/lib/Net.Http/src/Core/Buffering/IHttpBufferManager.cs index 445aa6f..c7020ff 100644 --- a/lib/Net.Http/src/Core/Buffering/IHttpBufferManager.cs +++ b/lib/Net.Http/src/Core/Buffering/IHttpBufferManager.cs @@ -51,12 +51,6 @@ namespace VNLib.Net.Http.Core.Buffering /// The memory block used for buffering application response data Memory GetResponseDataBuffer(); - /// - /// Gets the independent buffer used to discard data request data - /// - /// The memory block used for discarding request data - Memory GetDiscardBuffer(); - /// /// Gets a buffer used for buffering form-data /// diff --git a/lib/Net.Http/src/Core/Compression/CompressionMethod.cs b/lib/Net.Http/src/Core/Compression/CompressionMethod.cs new file mode 100644 index 0000000..165a546 --- /dev/null +++ b/lib/Net.Http/src/Core/Compression/CompressionMethod.cs @@ -0,0 +1,52 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: CompressionMethod.cs +* +* CompressionMethod.cs is part of VNLib.Net.Http which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Net.Http +{ + /// + /// Represents a supported compression type + /// + [Flags] + public enum CompressionMethod + { + /// + /// No compression + /// + None = 0x00, + /// + /// GZip compression is required + /// + Gzip = 0x01, + /// + /// Deflate compression is required + /// + Deflate = 0x02, + /// + /// Brotli compression is required + /// + Brotli = 0x04 + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs new file mode 100644 index 0000000..982fa28 --- /dev/null +++ b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs @@ -0,0 +1,90 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: IHttpCompressorManager.cs +* +* IHttpCompressorManager.cs is part of VNLib.Net.Http which is part +* of the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Net.Http +{ + /// + /// Represents an http compressor manager that creates compressor state instances and processes + /// compression operations. + /// + /// + /// All method calls must be thread-safe. Method calls on a given compressor state are guarunteed + /// to be thread-safe, but method calls on different compressor states are not. + /// + public interface IHttpCompressorManager + { + /// + /// Gets the supported compression methods for this compressor manager + /// + /// The supported compression methods + /// + /// Called when the server starts to cache the value. All supported methods must be returned + /// before constructing the server. + /// + CompressionMethod GetSupportedMethods(); + + /// + /// Allocates a new compressor state object that will be used for compression operations. + /// + /// The compressor state + object AllocCompressor(); + + /// + /// Compresses a block of data using the compressor state. The input block size is + /// guarunteed to the the block size returned by + /// or smaller. + /// + /// This method may be called with an empty input block to flush the compressor state. + /// + /// + /// The compressor state instance + /// The input buffer to compress + /// A value that indicates if this block is the final block + /// The compressed block to send to the client + /// + /// The returned memory block should belong to the individual compressor state, and be valid until the + /// call to deinit. The result of the block should remain valid + /// until the next call to compress or deinit. + /// + ReadOnlyMemory CompressBlock(object compressorState, ReadOnlyMemory input, bool finalBlock); + + /// + /// Initializes the compressor state for a compression operation + /// + /// The user-defined compression state + /// The compression method + /// The block size of the compressor, or if block size is irrelavant + int InitCompressor(object compressorState, CompressionMethod compMethod); + + /// + /// Deinitializes the compressor state. This method is guarnteed to be called after + /// a call to regardless of + /// the success of the operation involoving the compressor state + /// + /// The initialized compressor state + void DeinitCompressor(object compressorState); + } +} diff --git a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs new file mode 100644 index 0000000..0beea28 --- /dev/null +++ b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs @@ -0,0 +1,63 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: IResponseCompressor.cs +* +* IResponseCompressor.cs is part of VNLib.Net.Http which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Threading.Tasks; + +namespace VNLib.Net.Http.Core.Compression +{ + /// + /// Represents a per-context compressor + /// + internal interface IResponseCompressor + { + /// + /// The desired block size for the compressor. This is an optimization feature. + /// If the block size is unlimited, this should return 0. This value is only read + /// after initialization + /// + int BlockSize { get; } + + /// + /// Frees the resources used by the compressor on a compression operation + /// + void Free(); + + /// + /// Initializes the compressor for a compression operation + /// + /// The compression mode to use + /// The stream to write compressed data to + void Init(Stream output, CompressionMethod compMethod); + + /// + /// Compresses a block of data and writes it to the output stream + /// + /// The block of memory to write to compress + /// A value that indicates if this block is the final block + /// A task that represents the compression operation + ValueTask CompressBlockAsync(ReadOnlyMemory buffer, bool finalBlock); + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs new file mode 100644 index 0000000..e580b2d --- /dev/null +++ b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: ManagedHttpCompressor.cs +* +* ManagedHttpCompressor.cs is part of VNLib.Net.Http which is part of +* the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Threading.Tasks; + + +namespace VNLib.Net.Http.Core.Compression +{ + + internal sealed class ManagedHttpCompressor : IResponseCompressor + { + //Store the compressor + private readonly IHttpCompressorManager _provider; + + public ManagedHttpCompressor(IHttpCompressorManager provider) + { + _provider = provider; + } + + /* + * The compressor alloc is deferd until the first call to Init() + * This is because user-code should not be called during the constructor + * or internal calls. This is to avoid user-code errors causing the app + * to crash during critical sections that do not have exception handling. + */ + + private object? _compressor; + private Stream? _stream; + + /// + public int BlockSize { get; private set; } + + /// + public ValueTask CompressBlockAsync(ReadOnlyMemory buffer, bool finalBlock) + { + //Compress the block + ReadOnlyMemory result = _provider.CompressBlock(_compressor!, buffer, finalBlock); + + //Write the compressed block to the stream + return _stream!.WriteAsync(result); + } + + /// + public void Free() + { + //Remove stream ref and de-init the compressor + _stream = null; + _provider.DeinitCompressor(_compressor!); + } + + /// + public void Init(Stream output, CompressionMethod compMethod) + { + //Defer alloc the compressor + _compressor ??= _provider.AllocCompressor(); + + //Store the stream and init the compressor + _stream = output; + BlockSize = _provider.InitCompressor(_compressor, compMethod); + } + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Core/HttpContext.cs b/lib/Net.Http/src/Core/HttpContext.cs index cdc3554..b17498d 100644 --- a/lib/Net.Http/src/Core/HttpContext.cs +++ b/lib/Net.Http/src/Core/HttpContext.cs @@ -25,10 +25,12 @@ using System; using System.IO; using System.Text; +using System.Threading.Tasks; using VNLib.Utils; using VNLib.Utils.Memory.Caching; using VNLib.Net.Http.Core.Buffering; +using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core { @@ -70,7 +72,7 @@ namespace VNLib.Net.Http.Core public readonly ContextLockedBufferManager Buffers; /// - /// Gets or sets the alternate application protocol to swtich to + /// Gets or sets the alternate application protocol to switch to /// /// /// This property is only cleared when the context is released for reuse @@ -78,30 +80,35 @@ namespace VNLib.Net.Http.Core /// or this property must be exlicitly cleared /// public IAlternateProtocol? AlternateProtocol { get; set; } - + private readonly IResponseCompressor? _compressor; private readonly ResponseWriter responseWriter; - private ITransportContext? _ctx; + private ITransportContext? _ctx; public HttpContext(HttpServer server) { ParentServer = server; - //Store CRLF bytes - CrlfBytes = server.Config.HttpEncoding.GetBytes(HttpHelpers.CRLF); - //Init buffer manager Buffers = new(server.Config.BufferConfig); //Create new request - Request = new HttpRequest(this); + Request = new (this); //create a new response object - Response = new HttpResponse(Buffers, this); + Response = new (Buffers, this); //Init response writer ResponseBody = responseWriter = new ResponseWriter(); + /* + * We can alloc a new compressor if the server supports compression. + * If no compression is supported, the compressor will never be accessed + */ + _compressor = server.SupportedCompressionMethods == CompressionMethod.None ? + null : + new ManagedHttpCompressor(server.Config.CompressorManager!); + ContextFlags = new(0); } @@ -109,9 +116,6 @@ namespace VNLib.Net.Http.Core #region Context information - /// - public ReadOnlyMemory CrlfBytes { get; } - /// Encoding IHttpContextInformation.Encoding => ParentServer.Config.HttpEncoding; @@ -122,15 +126,26 @@ namespace VNLib.Net.Http.Core HttpConfig IHttpContextInformation.Config => ParentServer.Config; /// - Stream IHttpContextInformation.GetTransport() => _ctx!.ConnectionStream; + public ServerPreEncodedSegments EncodedSegments => ParentServer.PreEncodedSegments; + + /// + public Stream GetTransport() => _ctx!.ConnectionStream; #endregion #region LifeCycle Hooks /// - public void InitializeContext(ITransportContext ctx) => _ctx = ctx; - + public void InitializeContext(ITransportContext ctx) + { + _ctx = ctx; + + //Alloc buffers + Buffers.AllocateBuffer(ParentServer.Config.MemoryPool); + + //Init new connection + Response.OnNewConnection(); + } /// public void BeginRequest() @@ -146,21 +161,26 @@ namespace VNLib.Net.Http.Core Request.Initialize(_ctx!, ParentServer.Config.DefaultHttpVersion); } + /// + public Task FlushTransportAsnc() + { + return _ctx!.ConnectionStream.FlushAsync(); + } + /// public void EndRequest() { Request.OnComplete(); Response.OnComplete(); responseWriter.OnComplete(); + + //Free compressor when a message flow is complete + _compressor?.Free(); } void IReusable.Prepare() { - Request.OnPrepare(); Response.OnPrepare(); - - //Alloc buffers - Buffers.AllocateBuffer(ParentServer.Config.MemoryPool); } bool IReusable.Release() @@ -170,7 +190,6 @@ namespace VNLib.Net.Http.Core AlternateProtocol = null; //Release response/requqests - Request.OnRelease(); Response.OnRelease(); //Zero before returning to pool diff --git a/lib/Net.Http/src/Core/HttpEncodedSegment.cs b/lib/Net.Http/src/Core/HttpEncodedSegment.cs new file mode 100644 index 0000000..c036a1b --- /dev/null +++ b/lib/Net.Http/src/Core/HttpEncodedSegment.cs @@ -0,0 +1,47 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: HttpEncodedSegment.cs +* +* HttpEncodedSegment.cs is part of VNLib.Net.Http which is part of +* the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Net.Http.Core +{ + /// + /// Holds a pre-encoded segment of data + /// + /// The buffer containing the segment data + /// The offset in the buffer to begin the segment at + /// The length of the segment + internal readonly record struct HttpEncodedSegment(byte[] Buffer, int Offset, int Length) + { + /// + /// Span representation of the pre-encoded segment + /// + public Span Span => Buffer.AsSpan(Offset, Length); + + /// + /// Memory representation of the pre-encoded segment + /// + public Memory Memory => Buffer.AsMemory(Offset, Length); + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Core/HttpServerBase.cs b/lib/Net.Http/src/Core/HttpServerBase.cs index 9283998..9fb1169 100644 --- a/lib/Net.Http/src/Core/HttpServerBase.cs +++ b/lib/Net.Http/src/Core/HttpServerBase.cs @@ -36,6 +36,7 @@ */ using System; +using System.Text; using System.Linq; using System.Threading; using System.Net.Sockets; @@ -64,9 +65,20 @@ namespace VNLib.Net.Http /// not specific route /// public const string WILDCARD_KEY = "*"; - + + /// + /// Gets the servers supported http versions + /// + public const HttpVersion SupportedVersions = HttpVersion.Http09 | HttpVersion.Http1 | HttpVersion.Http11; + + /// + /// Static discard buffer for destroying data. This buffer must never be read from + /// + internal static readonly Memory WriteOnlyScratchBuffer = new byte[64 * 1024]; + private readonly ITransportProvider Transport; private readonly IReadOnlyDictionary ServerRoots; + private readonly IWebRoot? _wildcardRoot; #region caches /// @@ -93,6 +105,15 @@ namespace VNLib.Net.Http /// public bool Running { get; private set; } + /// + /// The for the current server + /// + internal readonly ServerPreEncodedSegments PreEncodedSegments; + /// + /// Cached supported compression methods + /// + internal readonly CompressionMethod SupportedCompressionMethods; + private CancellationTokenSource? StopToken; /// @@ -113,12 +134,21 @@ namespace VNLib.Net.Http ServerRoots = sites.ToDictionary(static r => r.Hostname, static tv => tv, StringComparer.OrdinalIgnoreCase); //Compile and store the timeout keepalive header KeepAliveTimeoutHeaderValue = $"timeout={(int)Config.ConnectionKeepAlive.TotalSeconds}"; - //Store termination for the current instance - HeaderLineTermination = config.HttpEncoding.GetBytes(HttpHelpers.CRLF); //Create a new context store ContextStore = ObjectRental.CreateReusable(() => new HttpContext(this)); //Setup config copy with the internal http pool Transport = transport; + //Create the pre-encoded segments + PreEncodedSegments = GetSegments(config.HttpEncoding); + //Store a ref to the crlf memory segment + HeaderLineTermination = PreEncodedSegments.CrlfBytes.Memory; + //Cache supported compression methods, or none if compressor is null + SupportedCompressionMethods = config.CompressorManager == null ? + CompressionMethod.None : + config.CompressorManager.GetSupportedMethods(); + + //Cache wildcard root + _wildcardRoot = ServerRoots.GetValueOrDefault(WILDCARD_KEY); } private static void ValidateConfig(in HttpConfig conf) @@ -153,11 +183,6 @@ namespace VNLib.Net.Http throw new ArgumentException("DefaultHttpVersion cannot be NotSupported", nameof(conf)); } - if (conf.BufferConfig.DiscardBufferSize < 64) - { - throw new ArgumentException("DiscardBufferSize cannot be less than 64 bytes", nameof(conf)); - } - if (conf.BufferConfig.FormDataBufferSize < 64) { throw new ArgumentException("FormDataBufferSize cannot be less than 64 bytes", nameof(conf)); @@ -204,6 +229,40 @@ namespace VNLib.Net.Http } } + private static ServerPreEncodedSegments GetSegments(Encoding encoding) + { + int offset = 0, length; + + //Alloc buffer to store segments in + byte[] buffer = new byte[16]; + + Span span = buffer; + + //Get crlf bytes + length = encoding.GetBytes(HttpHelpers.CRLF, span); + + //Build crlf segment + HttpEncodedSegment crlf = new(buffer, offset, length); + + offset += length; + span = span[offset..]; + + //Get final chunk bytes + length = encoding.GetBytes("0\r\n\r\n", span); + + //Build final chunk segment + HttpEncodedSegment finalChunk = new(buffer, offset, length); + + offset += length; + + //Build the segments + return new ServerPreEncodedSegments(buffer) + { + FinalChunkTermination = finalChunk, + CrlfBytes = crlf + }; + } + /// /// Begins listening for connections on configured interfaces for configured hostnames. /// @@ -272,14 +331,14 @@ namespace VNLib.Net.Http } } - //Clear all caches + //Clear all caches before leaving to aid gc CacheHardClear(); //Clear running flag Running = false; Config.ServerLog.Information("HTTP server {hc} exiting", GetHashCode()); } - + /// /// @@ -297,7 +356,6 @@ namespace VNLib.Net.Http { //When clause guards nulls switch (se.SocketErrorCode) - { //Ignore aborted messages case SocketError.ConnectionAborted: diff --git a/lib/Net.Http/src/Core/HttpServerProcessing.cs b/lib/Net.Http/src/Core/HttpServerProcessing.cs index ad3ba80..d3c5981 100644 --- a/lib/Net.Http/src/Core/HttpServerProcessing.cs +++ b/lib/Net.Http/src/Core/HttpServerProcessing.cs @@ -27,7 +27,9 @@ using System.IO; using System.Net; using System.Threading; using System.Net.Sockets; +using System.Diagnostics; using System.Threading.Tasks; +using System.Collections.Generic; using System.Runtime.CompilerServices; using VNLib.Utils.Memory; @@ -66,9 +68,9 @@ namespace VNLib.Net.Http transportContext.ConnectionStream.ReadTimeout = Config.ActiveConnectionRecvTimeout; //Process the request - bool keepAlive = await ProcessHttpEventAsync(transportContext, context); + bool keepAlive = await ProcessHttpEventAsync(context); - //If not keepalive, exit the listening loop + //If not keepalive, exit the listening loop and clean up connection if (!keepAlive) { break; @@ -112,11 +114,11 @@ namespace VNLib.Net.Http //Catch wrapped OC exceptions catch (IOException ioe) when (ioe.InnerException is OperationCanceledException oce) { - Config.ServerLog.Debug("Failed to receive transport data within a timeout period {m}, connection closed", oce.Message); + Config.ServerLog.Debug("Failed to receive transport data within a timeout period {m} connection closed", oce.Message); } catch (OperationCanceledException oce) { - Config.ServerLog.Debug("Failed to receive transport data within a timeout period {m}, connection closed", oce.Message); + Config.ServerLog.Debug("Failed to receive transport data within a timeout period {m} connection closed", oce.Message); } catch(Exception ex) { @@ -148,10 +150,9 @@ namespace VNLib.Net.Http /// /// Main event handler for all incoming connections /// - /// The describing the incoming connection /// Reusable context object [MethodImpl(MethodImplOptions.AggressiveOptimization)] - private async Task ProcessHttpEventAsync(ITransportContext transportContext, HttpContext context) + private async Task ProcessHttpEventAsync(HttpContext context) { //Prepare http context to process a new message context.BeginRequest(); @@ -159,7 +160,7 @@ namespace VNLib.Net.Http try { //Try to parse the http request (may throw exceptions, let them propagate to the transport layer) - int status = (int)ParseRequest(transportContext, context); + int status = (int)ParseRequest(context); //Check status code for socket error, if so, return false to close the connection if (status >= 1000) @@ -198,7 +199,10 @@ namespace VNLib.Net.Http #endif //Close the response - await context.WriteResponseAsync(StopToken!.Token); + await context.WriteResponseAsync(); + + //Flush the stream before retuning + await context.FlushTransportAsnc(); /* * If an alternate protocol was specified, we need to break the keepalive loop @@ -217,7 +221,6 @@ namespace VNLib.Net.Http /// Reads data synchronously from the transport and attempts to parse an HTTP message and /// built a request. /// - /// /// /// 0 if the request was successfully parsed, the /// to return to the client because the entity could not be processed @@ -231,13 +234,13 @@ namespace VNLib.Net.Http /// /// [MethodImpl(MethodImplOptions.AggressiveOptimization)] - private HttpStatusCode ParseRequest(ITransportContext transport, HttpContext ctx) + private HttpStatusCode ParseRequest(HttpContext ctx) { //Get the parse buffer IHttpHeaderParseBuffer parseBuffer = ctx.Buffers.RequestHeaderParseBuffer; //Init parser - TransportReader reader = new (transport.ConnectionStream, parseBuffer, Config.HttpEncoding, HeaderLineTermination); + TransportReader reader = new (ctx.GetTransport(), parseBuffer, Config.HttpEncoding, HeaderLineTermination); try { @@ -283,7 +286,7 @@ namespace VNLib.Net.Http } [MethodImpl(MethodImplOptions.AggressiveOptimization)] - private async ValueTask ProcessRequestAsync(HttpContext context, HttpStatusCode status) + private async Task ProcessRequestAsync(HttpContext context, HttpStatusCode status) { //Check status if (status != 0) @@ -302,8 +305,8 @@ namespace VNLib.Net.Http return false; } - //We only support version 1 and 1/1 - if ((context.Request.HttpVersion & (HttpVersion.Http11 | HttpVersion.Http1)) == 0) + //Make sure the server supports the http version + if ((context.Request.HttpVersion & SupportedVersions) == 0) { //Close the connection when we exit context.Response.Headers[HttpResponseHeader.Connection] = "closed"; @@ -323,9 +326,16 @@ namespace VNLib.Net.Http //Store keepalive value from request, and check if keepalives are enabled by the configuration bool keepalive = context.Request.KeepAlive & Config.ConnectionKeepAlive > TimeSpan.Zero; - //Set connection header (only for http1 and 1.1) + //Set connection header (only for http1.1) + if (keepalive) { + /* + * Request parsing only sets the keepalive flag if the connection is http1.1 + * so we can verify this in an assert + */ + Debug.Assert(context.Request.HttpVersion == HttpVersion.Http11, "Request parsing allowed keepalive for a non http1.1 connection, this is a bug"); + context.Response.Headers[HttpResponseHeader.Connection] = "keep-alive"; context.Response.Headers[HttpResponseHeader.KeepAlive] = KeepAliveTimeoutHeaderValue; } @@ -336,7 +346,9 @@ namespace VNLib.Net.Http } //Get the server root for the specified location - if (!ServerRoots.TryGetValue(context.Request.Location.DnsSafeHost, out IWebRoot? root) && !ServerRoots.TryGetValue(WILDCARD_KEY, out root)) + IWebRoot? root = ServerRoots!.GetValueOrDefault(context.Request.Location.DnsSafeHost, _wildcardRoot); + + if (root == null) { context.Respond(HttpStatusCode.NotFound); //make sure control leaves @@ -374,16 +386,23 @@ namespace VNLib.Net.Http await context.InitRequestBodyAsync(); + /* + * The event object should be cleared when it is no longer in use, IE before + * this procedure returns. + */ + HttpEvent ev = new(context); + try { - await ProcessAsync(root, context); - return keepalive; + //Enter user-code + await root.ClientConnectedAsync(ev); } //The user-code requested termination of the connection catch (TerminateConnectionException tce) { - //Log the event as a debug so user can see the result + //Log the event as a debug so user can see it was handled Config.ServerLog.Debug(tce, "User-code requested a connection termination"); + //See if the exception requested an error code response if (tce.Code > 0) { @@ -395,47 +414,33 @@ namespace VNLib.Net.Http //Clear any currently set headers since no response is requested context.Response.Headers.Clear(); } - } - return false; - } - /// - /// Processes a client connection after pre-processing has completed - /// - /// The to process the event on - /// The to process - /// A task that resolves when the user-code has completed processing the entity - /// - /// - private static async ValueTask ProcessAsync(IWebRoot root, HttpContext ctx) - { - /* - * The event object should be cleared when it is no longer in use, IE before - * this procedure returns. - */ - HttpEvent ev = new (ctx); - try - { - await root.ClientConnectedAsync(ev); - } - //User code requested exit, elevate the exception - catch (TerminateConnectionException) - { - throw; + //Close connection + return false; } //Transport exception - catch(IOException ioe) when (ioe.InnerException is SocketException) + catch (IOException ioe) when (ioe.InnerException is SocketException) { throw; } catch (Exception ex) { - ctx.ParentServer.Config.ServerLog.Warn(ex, "Unhandled exception during application code execution."); + Config.ServerLog.Warn(ex, "Unhandled exception during application code execution."); } finally { ev.Clear(); } + + /* + * The http state should still be salvagable event with a user-code failure, + * so we shouldnt need to terminate requests here. This may need to be changed + * if a bug is found and users expect the framework to handle the error. + * The safest option would be terminate the connection, well see. + * + * For now I will allow it. + */ + return keepalive; } } diff --git a/lib/Net.Http/src/Core/IConnectionContext.cs b/lib/Net.Http/src/Core/IConnectionContext.cs index 2e3ca46..fd12d74 100644 --- a/lib/Net.Http/src/Core/IConnectionContext.cs +++ b/lib/Net.Http/src/Core/IConnectionContext.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Http @@ -22,7 +22,6 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System.Threading; using System.Threading.Tasks; namespace VNLib.Net.Http.Core @@ -49,9 +48,14 @@ namespace VNLib.Net.Http.Core /// Sends any pending data associated with the request to the /// connection that begun the request /// - /// A token to cancel the operation /// A Task that completes when the response has completed - Task WriteResponseAsync(CancellationToken cancellationToken); + Task WriteResponseAsync(); + + /// + /// Flushes and pending data associated with the request to the transport + /// + /// A task that represents the flush operation + Task FlushTransportAsnc(); /// /// Signals to the context that it will release any request specific diff --git a/lib/Net.Http/src/Core/IHttpContextInformation.cs b/lib/Net.Http/src/Core/IHttpContextInformation.cs index 2a6e10c..ba1566b 100644 --- a/lib/Net.Http/src/Core/IHttpContextInformation.cs +++ b/lib/Net.Http/src/Core/IHttpContextInformation.cs @@ -31,9 +31,9 @@ namespace VNLib.Net.Http.Core internal interface IHttpContextInformation { /// - /// Local crlf characters + /// Gets pre-encoded binary segments for the current server's encoding /// - ReadOnlyMemory CrlfBytes { get; } + ServerPreEncodedSegments EncodedSegments { get; } /// /// The current connection's encoding diff --git a/lib/Net.Http/src/Core/IHttpLifeCycle.cs b/lib/Net.Http/src/Core/IHttpLifeCycle.cs index 135219d..9ba5ff1 100644 --- a/lib/Net.Http/src/Core/IHttpLifeCycle.cs +++ b/lib/Net.Http/src/Core/IHttpLifeCycle.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Http @@ -58,5 +58,10 @@ namespace VNLib.Net.Http.Core /// method should not throw exceptions /// void OnComplete(); + + /// + /// Raised when a new connection is established on the current context + /// + void OnNewConnection(); } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/IHttpResponseBody.cs b/lib/Net.Http/src/Core/IHttpResponseBody.cs index aa2dd34..f0f88d2 100644 --- a/lib/Net.Http/src/Core/IHttpResponseBody.cs +++ b/lib/Net.Http/src/Core/IHttpResponseBody.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Http @@ -24,12 +24,17 @@ using System; using System.IO; -using System.Threading; using System.Threading.Tasks; +using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core { + /* + * Optimization notes. The buffer parameters are undefined unless + * the BufferRequired property is true. + */ + /// /// Represents a rseponse entity body /// @@ -46,28 +51,40 @@ namespace VNLib.Net.Http.Core /// bool BufferRequired { get; } + /// + /// The length of the content. Length is a required property + /// + long Length { get; } + /// /// Writes internal response entity data to the destination stream /// /// The response stream to write data to /// An optional buffer used to buffer responses /// The maximum length of the response data to write - /// A token to cancel the operation /// A task that resolves when the response is completed - Task WriteEntityAsync(Stream dest, long count, Memory? buffer, CancellationToken token); + Task WriteEntityAsync(Stream dest, long count, Memory buffer); /// /// Writes internal response entity data to the destination stream /// - /// The response stream to write data to + /// The response compressor /// An optional buffer used to buffer responses - /// A token to cancel the operation /// A task that resolves when the response is completed - Task WriteEntityAsync(Stream dest, Memory? buffer, CancellationToken token); - + Task WriteEntityAsync(IResponseCompressor dest, Memory buffer); + + /* + * Added to the response writing hot-paths optimize calls when compression + * is disabled and an explicit length is not required. + */ + /// - /// The length of the content + /// Writes internal response entity data to the destination stream + /// without compression /// - long Length { get; } + /// The response stream to write data to + /// Optional buffer if required, used to buffer response data + /// A task that resolves when the response is completed + Task WriteEntityAsync(Stream dest, Memory buffer); } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Request/HttpInputStream.cs b/lib/Net.Http/src/Core/Request/HttpInputStream.cs index 3d34445..dc903d2 100644 --- a/lib/Net.Http/src/Core/Request/HttpInputStream.cs +++ b/lib/Net.Http/src/Core/Request/HttpInputStream.cs @@ -30,7 +30,6 @@ using System.Threading.Tasks; using VNLib.Utils; using VNLib.Utils.Memory; using VNLib.Utils.Extensions; -using VNLib.Net.Http.Core.Buffering; namespace VNLib.Net.Http.Core { @@ -39,6 +38,7 @@ namespace VNLib.Net.Http.Core /// internal sealed class HttpInputStream : Stream { + private readonly IHttpContextInformation ContextInfo; private long ContentLength; @@ -183,7 +183,7 @@ namespace VNLib.Net.Http.Core if (writer.RemainingSize > 0) { //Read from transport - ERRNO read = await InputStream!.ReadAsync(writer.Remaining, cancellationToken).ConfigureAwait(true); + int read = await InputStream!.ReadAsync(writer.Remaining, cancellationToken).ConfigureAwait(true); //Update writer position writer.Advance(read); @@ -198,9 +198,8 @@ namespace VNLib.Net.Http.Core /// /// Asynchronously discards all remaining data in the stream /// - /// The buffer manager to request the discard buffer from /// A task that represents the discard operations - public ValueTask DiscardRemainingAsync(IHttpBufferManager bufMan) + public ValueTask DiscardRemainingAsync() { long remaining = Remaining; @@ -219,20 +218,18 @@ namespace VNLib.Net.Http.Core //We must actaully disacrd data from the stream else { - return DiscardStreamDataAsync(bufMan); + return DiscardStreamDataAsync(); } } - private async ValueTask DiscardStreamDataAsync(IHttpBufferManager bufMan) + private async ValueTask DiscardStreamDataAsync() { - //Reqest discard buffer - Memory discardBuffer = bufMan.GetDiscardBuffer(); - int read; do { //Read data to the discard buffer until reading is completed (read == 0) - read = await ReadAsync(discardBuffer, CancellationToken.None).ConfigureAwait(true); + read = await ReadAsync(HttpServer.WriteOnlyScratchBuffer, CancellationToken.None) + .ConfigureAwait(true); } while (read != 0); } diff --git a/lib/Net.Http/src/Core/Request/HttpRequest.cs b/lib/Net.Http/src/Core/Request/HttpRequest.cs index b036680..0668197 100644 --- a/lib/Net.Http/src/Core/Request/HttpRequest.cs +++ b/lib/Net.Http/src/Core/Request/HttpRequest.cs @@ -86,14 +86,16 @@ namespace VNLib.Net.Http.Core //New reusable input stream InputStream = new(contextInfo); RequestBody = new(); - } - + } + + void IHttpLifeCycle.OnPrepare() + { } - public void OnPrepare() - {} + void IHttpLifeCycle.OnRelease() + { } - public void OnRelease() - {} + void IHttpLifeCycle.OnNewConnection() + { } public void OnNewRequest() { diff --git a/lib/Net.Http/src/Core/Request/HttpRequestExtensions.cs b/lib/Net.Http/src/Core/Request/HttpRequestExtensions.cs index 1f52d17..f92a685 100644 --- a/lib/Net.Http/src/Core/Request/HttpRequestExtensions.cs +++ b/lib/Net.Http/src/Core/Request/HttpRequestExtensions.cs @@ -35,46 +35,49 @@ using VNLib.Utils.Extensions; namespace VNLib.Net.Http.Core { + internal static class HttpRequestExtensions { - public enum CompressionType - { - None, - Gzip, - Deflate, - Brotli - } - /// - /// Gets the that the connection accepts - /// in a default order, or none if not enabled + /// Gets the that the connection accepts + /// in a default order, or none if not enabled or the server does not support it /// /// - /// A with a value the connection support + /// The server supported methods + /// A with a value the connection support [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static CompressionType GetCompressionSupport(this HttpRequest request) + public static CompressionMethod GetCompressionSupport(this HttpRequest request, CompressionMethod serverSupported) { string? acceptEncoding = request.Headers[HttpRequestHeader.AcceptEncoding]; + /* + * Priority order is gzip, deflate, br. Br is last for dynamic compression + * because of performace. We also need to make sure the server supports + * the desired compression method also. + */ + if (acceptEncoding == null) { - return CompressionType.None; + return CompressionMethod.None; } - else if (acceptEncoding.Contains("gzip", StringComparison.OrdinalIgnoreCase)) + else if (serverSupported.HasFlag(CompressionMethod.Gzip) + && acceptEncoding.Contains("gzip", StringComparison.OrdinalIgnoreCase)) { - return CompressionType.Gzip; + return CompressionMethod.Gzip; } - else if (acceptEncoding.Contains("deflate", StringComparison.OrdinalIgnoreCase)) + else if (serverSupported.HasFlag(CompressionMethod.Deflate) + && acceptEncoding.Contains("deflate", StringComparison.OrdinalIgnoreCase)) { - return CompressionType.Deflate; + return CompressionMethod.Deflate; } - else if (acceptEncoding.Contains("br", StringComparison.OrdinalIgnoreCase)) + else if (serverSupported.HasFlag(CompressionMethod.Brotli) + && acceptEncoding.Contains("br", StringComparison.OrdinalIgnoreCase)) { - return CompressionType.Brotli; + return CompressionMethod.Brotli; } else { - return CompressionType.None; + return CompressionMethod.None; } } diff --git a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs index 9ea06f3..4a88361 100644 --- a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs +++ b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs @@ -28,21 +28,20 @@ using VNLib.Utils; using VNLib.Utils.IO; using VNLib.Net.Http.Core.Buffering; -namespace VNLib.Net.Http.Core +namespace VNLib.Net.Http.Core.Response { /// /// A specialized for buffering data /// in Http/1.1 chunks /// - internal class ChunkDataAccumulator : IDataAccumulator, IHttpLifeCycle + internal class ChunkDataAccumulator : IDataAccumulator { - private const string LAST_CHUNK_STRING = "0\r\n\r\n"; public const int RESERVED_CHUNK_SUGGESTION = 32; private readonly int ReservedSize; private readonly IHttpContextInformation Context; private readonly IChunkAccumulatorBuffer Buffer; - private readonly ReadOnlyMemory LastChunk; + public ChunkDataAccumulator(IChunkAccumulatorBuffer buffer, IHttpContextInformation context) { @@ -50,9 +49,6 @@ namespace VNLib.Net.Http.Core Context = context; Buffer = buffer; - - //Convert and store cached versions of the last chunk bytes - LastChunk = context.Encoding.GetBytes(LAST_CHUNK_STRING); } /* @@ -92,7 +88,7 @@ namespace VNLib.Net.Http.Core public ERRNO TryBufferChunk(ReadOnlySpan data) { //Calc data size and reserve space for final crlf - int dataToCopy = Math.Min(data.Length, RemainingSize - Context.CrlfBytes.Length); + int dataToCopy = Math.Min(data.Length, RemainingSize - Context.EncodedSegments.CrlfBytes.Length); //Write as much data as possible data[..dataToCopy].CopyTo(Remaining); @@ -134,7 +130,7 @@ namespace VNLib.Net.Http.Core UpdateChunkSize(); //Write trailing chunk delimiter - this.Append(Context.CrlfBytes.Span); + this.Append(Context.EncodedSegments.CrlfBytes.Span); return GetCompleteChunk(); } @@ -150,10 +146,10 @@ namespace VNLib.Net.Http.Core UpdateChunkSize(); //Write trailing chunk delimiter - this.Append(Context.CrlfBytes.Span); + this.Append(Context.EncodedSegments.CrlfBytes.Span); //Write final chunk to the end of the accumulator - this.Append(LastChunk.Span); + this.Append(Context.EncodedSegments.FinalChunkTermination.Span); return GetCompleteChunk(); } @@ -174,7 +170,7 @@ namespace VNLib.Net.Http.Core * the start of the chunk data. * * [reserved segment] [chunk data] [eoc] - * [...0a \r\n] [10 bytes of data] [eoc] + * [...0a\r\n] [10 bytes of data] [eoc] */ private void UpdateChunkSize() @@ -209,7 +205,7 @@ namespace VNLib.Net.Http.Core * the exact size required to store the encoded chunk size */ - _reservedOffset = (ReservedSize - (initOffset + Context.CrlfBytes.Length)); + _reservedOffset = (ReservedSize - (initOffset + Context.EncodedSegments.CrlfBytes.Length)); Span upshifted = Buffer.GetBinSpan()[_reservedOffset..ReservedSize]; @@ -220,7 +216,7 @@ namespace VNLib.Net.Http.Core upshifted = upshifted[initOffset..]; //Copy crlf - Context.CrlfBytes.Span.CopyTo(upshifted); + Context.EncodedSegments.CrlfBytes.Span.CopyTo(upshifted); } @@ -235,12 +231,5 @@ namespace VNLib.Net.Http.Core _reservedOffset = 0; AccumulatedSize = 0; } - - public void OnPrepare() - { } - - public void OnRelease() - { } - } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Response/ChunkedStream.cs b/lib/Net.Http/src/Core/Response/ChunkedStream.cs index 3aa4330..639e18f 100644 --- a/lib/Net.Http/src/Core/Response/ChunkedStream.cs +++ b/lib/Net.Http/src/Core/Response/ChunkedStream.cs @@ -35,7 +35,6 @@ */ using System; -using System.IO; using System.Threading; using System.Threading.Tasks; @@ -43,223 +42,178 @@ using VNLib.Utils; using VNLib.Utils.Memory; using VNLib.Net.Http.Core.Buffering; -#pragma warning disable CA2215 // Dispose methods should call base class dispose - -namespace VNLib.Net.Http.Core +namespace VNLib.Net.Http.Core.Response { - internal sealed partial class HttpResponse - { - /// - /// Writes chunked HTTP message bodies to an underlying streamwriter - /// - private sealed class ChunkedStream : Stream, IHttpLifeCycle - { - - private readonly ChunkDataAccumulator ChunckAccumulator; - private readonly IHttpContextInformation ContextInfo; +#pragma warning disable CA2215 // Dispose methods should call base class dispose +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task - private Stream? TransportStream; - private bool HadError; + /// + /// Writes chunked HTTP message bodies to an underlying streamwriter + /// + internal sealed class ChunkedStream : ReusableResponseStream + { + private readonly ChunkDataAccumulator ChunckAccumulator; + + private bool HadError; - internal ChunkedStream(IChunkAccumulatorBuffer buffer, IHttpContextInformation context) + internal ChunkedStream(IChunkAccumulatorBuffer buffer, IHttpContextInformation context) + { + //Init accumulator + ChunckAccumulator = new(buffer, context); + } + + public override void Write(ReadOnlySpan chunk) + { + //Only write non-zero chunks + if (chunk.Length <= 0) { - ContextInfo = context; - - //Init accumulator - ChunckAccumulator = new(buffer, context); + return; } - - - public override bool CanRead => false; - public override bool CanSeek => false; - public override bool CanWrite => true; - public override long Length => throw new NotSupportedException(); - public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } - public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream cannot be read from"); - public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream does not support seeking"); - public override void SetLength(long value) => throw new NotSupportedException("This stream does not support seeking"); - public override void Flush() { } - public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; - - - public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count)); - public override void Write(ReadOnlySpan chunk) + + //Init reader + ForwardOnlyReader reader = new(chunk); + try { - //Only write non-zero chunks - if (chunk.Length <= 0) + do { - return; - } + //try to accumulate the chunk data + ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window); - //Init reader - ForwardOnlyReader reader = new(chunk); - try - { - do + //Not all data was buffered + if (written < reader.WindowSize) { - //try to accumulate the chunk data - ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window); - - //Not all data was buffered - if (written < reader.WindowSize) - { - //Advance reader - reader.Advance(written); + //Advance reader + reader.Advance(written); - //Flush accumulator - Memory accChunk = ChunckAccumulator.GetChunkData(); + //Flush accumulator + Memory accChunk = ChunckAccumulator.GetChunkData(); - //Reset the chunk accumulator - ChunckAccumulator.Reset(); + //Reset the chunk accumulator + ChunckAccumulator.Reset(); - //Write chunk data - TransportStream!.Write(accChunk.Span); + //Write chunk data + transport!.Write(accChunk.Span); - //Continue to buffer / flush as needed - continue; - } - break; + //Continue to buffer / flush as needed + continue; } - while (true); - } - catch - { - HadError = true; - throw; + break; } + while (true); + } + catch + { + HadError = true; + throw; } - + } - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async ValueTask WriteAsync(ReadOnlyMemory chunk, CancellationToken cancellationToken = default) + { + //Only write non-zero chunks + if (chunk.Length <= 0) { - return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + return; } - public override async ValueTask WriteAsync(ReadOnlyMemory chunk, CancellationToken cancellationToken = default) + try { - //Only write non-zero chunks - if (chunk.Length <= 0) - { - return; - } + //Init reader + ForwardOnlyMemoryReader reader = new(chunk); - try + do { - //Init reader - ForwardOnlyMemoryReader reader = new(chunk); + //try to accumulate the chunk data + ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window.Span); - do + //Not all data was buffered + if (written < reader.WindowSize) { - //try to accumulate the chunk data - ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window.Span); + //Advance reader + reader.Advance(written); - //Not all data was buffered - if (written < reader.WindowSize) - { - //Advance reader - reader.Advance(written); + //Flush accumulator + Memory accChunk = ChunckAccumulator.GetChunkData(); - //Flush accumulator - Memory accChunk = ChunckAccumulator.GetChunkData(); + //Reset the chunk accumulator + ChunckAccumulator.Reset(); - //Reset the chunk accumulator - ChunckAccumulator.Reset(); + //Flush accumulator async + await transport!.WriteAsync(accChunk, cancellationToken); - //Flush accumulator async - await TransportStream!.WriteAsync(accChunk, cancellationToken); - - //Continue to buffer / flush as needed - continue; - } - - break; + //Continue to buffer / flush as needed + continue; } - while (true); - } - catch - { - HadError = true; - throw; + + break; } + while (true); } + catch + { + HadError = true; + throw; + } + } - - public override async ValueTask DisposeAsync() + public override async ValueTask DisposeAsync() + { + //If write error occured, then do not write the last chunk + if (HadError) { - //If write error occured, then do not write the last chunk - if (HadError) - { - return; - } + return; + } - //Complete the last chunk - Memory chunkData = ChunckAccumulator.GetFinalChunkData(); + //Complete the last chunk + Memory chunkData = ChunckAccumulator.GetFinalChunkData(); - //Reset the accumulator - ChunckAccumulator.Reset(); - - //Write remaining data to stream - await TransportStream!.WriteAsync(chunkData, CancellationToken.None); + //Reset the accumulator + ChunckAccumulator.Reset(); - //Flush base stream - await TransportStream!.FlushAsync(CancellationToken.None); - } + //Write remaining data to stream + await transport!.WriteAsync(chunkData, CancellationToken.None); - protected override void Dispose(bool disposing) => Close(); + //Flush base stream + await transport!.FlushAsync(CancellationToken.None); + } - public override void Close() + public override void Close() + { + //If write error occured, then do not write the last chunk + if (HadError) { - //If write error occured, then do not write the last chunk - if (HadError) - { - return; - } - - //Complete the last chunk - Memory chunkData = ChunckAccumulator.GetFinalChunkData(); + return; + } - //Reset the accumulator - ChunckAccumulator.Reset(); + //Complete the last chunk + Memory chunkData = ChunckAccumulator.GetFinalChunkData(); - //Write chunk data - TransportStream!.Write(chunkData.Span); + //Reset the accumulator + ChunckAccumulator.Reset(); - //Flush base stream - TransportStream!.Flush(); - } - - - #region Hooks + //Write chunk data + transport!.Write(chunkData.Span); - public void OnPrepare() - { - ChunckAccumulator.OnPrepare(); - } + //Flush base stream + transport!.Flush(); + } - public void OnRelease() - { - ChunckAccumulator.OnRelease(); - } + #region Hooks - public void OnNewRequest() - { - ChunckAccumulator.OnNewRequest(); - - //Get transport stream even if not used - TransportStream = ContextInfo.GetTransport(); - } + public void OnNewRequest() + { + ChunckAccumulator.OnNewRequest(); + } - public void OnComplete() - { - ChunckAccumulator.OnComplete(); - TransportStream = null; - - //Clear error flag - HadError = false; - } + public void OnComplete() + { + ChunckAccumulator.OnComplete(); - #endregion + //Clear error flag + HadError = false; } + + #endregion } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Response/DirectStream.cs b/lib/Net.Http/src/Core/Response/DirectStream.cs index 3c984ef..7d0e568 100644 --- a/lib/Net.Http/src/Core/Response/DirectStream.cs +++ b/lib/Net.Http/src/Core/Response/DirectStream.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Http @@ -23,74 +23,28 @@ */ using System; -using System.IO; using System.Threading; using System.Threading.Tasks; -namespace VNLib.Net.Http.Core +namespace VNLib.Net.Http.Core.Response { - internal sealed partial class HttpResponse + internal sealed class DirectStream : ReusableResponseStream { - private sealed class DirectStream : Stream - { - private Stream? BaseStream; - - public void Prepare(Stream transport) - { - BaseStream = transport; - } - - public override void Write(byte[] buffer, int offset, int count) - { - BaseStream!.Write(buffer, offset, count); - } - - public override void Write(ReadOnlySpan buffer) - { - BaseStream!.Write(buffer); - } - - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return BaseStream!.WriteAsync(buffer, offset, count, cancellationToken); - } - - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) - { - return BaseStream!.WriteAsync(buffer, cancellationToken); - } - - - public override bool CanRead => false; - public override bool CanSeek => false; - public override bool CanWrite => true; - public override long Length => throw new InvalidOperationException("Stream does not have a length property"); - public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } - public override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("Stream does not support reading"); - public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException("Stream does not support seeking"); - public override void SetLength(long value) => throw new InvalidOperationException("Stream does not support seeking"); - public override void Flush() => BaseStream!.Flush(); - public override Task FlushAsync(CancellationToken cancellationToken) => BaseStream!.FlushAsync(cancellationToken); - - - public override void Close() - { - BaseStream = null; - } + public override void Write(ReadOnlySpan buffer) + { + transport!.Write(buffer); + } + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return transport!.WriteAsync(buffer, cancellationToken); + } + + //Pass through flush methods + public override void Flush() => transport!.Flush(); - protected override void Dispose(bool disposing) - { - //Do not call base dispose - Close(); - } + public override Task FlushAsync(CancellationToken cancellationToken) => transport!.FlushAsync(cancellationToken); - public override ValueTask DisposeAsync() - { - Close(); - return ValueTask.CompletedTask; - } - } } -} \ No newline at end of file +} diff --git a/lib/Net.Http/src/Core/Response/HeaderDataAccumulator.cs b/lib/Net.Http/src/Core/Response/HeaderDataAccumulator.cs index 8c2461c..7f026e2 100644 --- a/lib/Net.Http/src/Core/Response/HeaderDataAccumulator.cs +++ b/lib/Net.Http/src/Core/Response/HeaderDataAccumulator.cs @@ -28,97 +28,94 @@ using VNLib.Utils.Memory; using VNLib.Utils.Extensions; using VNLib.Net.Http.Core.Buffering; -namespace VNLib.Net.Http.Core +namespace VNLib.Net.Http.Core.Response { - internal partial class HttpResponse + /// + /// Specialized data accumulator for compiling response headers + /// + internal sealed class HeaderDataAccumulator { + private readonly IResponseHeaderAccBuffer _buffer; + private readonly IHttpContextInformation _contextInfo; + private int AccumulatedSize; + + public HeaderDataAccumulator(IResponseHeaderAccBuffer accBuffer, IHttpContextInformation ctx) + { + _buffer = accBuffer; + _contextInfo = ctx; + } + /// - /// Specialized data accumulator for compiling response headers + /// Initializes a new for buffering character header data /// - private sealed class HeaderDataAccumulator + /// A for buffering character header data + public ForwardOnlyWriter GetWriter() { - private readonly IResponseHeaderAccBuffer _buffer; - private readonly IHttpContextInformation _contextInfo; - private int AccumulatedSize; + Span chars = _buffer.GetCharSpan(); + return new ForwardOnlyWriter(chars); + } - public HeaderDataAccumulator(IResponseHeaderAccBuffer accBuffer, IHttpContextInformation ctx) + /// + /// Encodes and writes the contents of the to the internal accumulator + /// + /// The character buffer writer to commit data from + public void CommitChars(ref ForwardOnlyWriter writer) + { + if (writer.Written == 0) { - _buffer = accBuffer; - _contextInfo = ctx; + return; } - /// - /// Initializes a new for buffering character header data - /// - /// A for buffering character header data - public ForwardOnlyWriter GetWriter() - { - Span chars = _buffer.GetCharSpan(); - return new ForwardOnlyWriter(chars); - } + //Write the entire token to the buffer + WriteToken(writer.AsSpan()); + } - /// - /// Encodes and writes the contents of the to the internal accumulator - /// - /// The character buffer writer to commit data from - public void CommitChars(ref ForwardOnlyWriter writer) - { - if (writer.Written == 0) - { - return; - } + /// + /// Encodes a single token and writes it directly to the internal accumulator + /// + /// The character sequence to accumulate + public void WriteToken(ReadOnlySpan chars) + { + //Get remaining buffer + Span remaining = _buffer.GetBinSpan()[AccumulatedSize..]; - //Write the entire token to the buffer - WriteToken(writer.AsSpan()); - } + //Commit all chars to the buffer + AccumulatedSize += _contextInfo.Encoding.GetBytes(chars, remaining); + } - /// - /// Encodes a single token and writes it directly to the internal accumulator - /// - /// The character sequence to accumulate - public void WriteToken(ReadOnlySpan chars) - { - //Get remaining buffer - Span remaining = _buffer.GetBinSpan()[AccumulatedSize..]; + /// + /// Writes the http termination sequence to the internal accumulator + /// + public void WriteTermination() + { + //Write the http termination sequence + Span remaining = _buffer.GetBinSpan()[AccumulatedSize..]; - //Commit all chars to the buffer - AccumulatedSize += _contextInfo.Encoding.GetBytes(chars, remaining); - } - - /// - /// Writes the http termination sequence to the internal accumulator - /// - public void WriteTermination() - { - //Write the http termination sequence - Span remaining = _buffer.GetBinSpan()[AccumulatedSize..]; - - _contextInfo.CrlfBytes.Span.CopyTo(remaining); - - //Advance the accumulated window - AccumulatedSize += _contextInfo.CrlfBytes.Length; - } + _contextInfo.EncodedSegments.CrlfBytes.Span.CopyTo(remaining); + + //Advance the accumulated window + AccumulatedSize += _contextInfo.EncodedSegments.CrlfBytes.Length; + } - /// - /// Resets the internal accumulator - /// - public void Reset() => AccumulatedSize = 0; + /// + /// Resets the internal accumulator + /// + public void Reset() => AccumulatedSize = 0; - /// - /// Gets the accumulated response data as its memory buffer, and resets the internal accumulator - /// - /// The buffer segment containing the accumulated response data - public Memory GetResponseData() - { - //get the current buffer as memory and return the accumulated segment - Memory accumulated = _buffer.GetMemory()[..AccumulatedSize]; + /// + /// Gets the accumulated response data as its memory buffer, and resets the internal accumulator + /// + /// The buffer segment containing the accumulated response data + public Memory GetResponseData() + { + //get the current buffer as memory and return the accumulated segment + Memory accumulated = _buffer.GetMemory()[..AccumulatedSize]; - //Reset the buffer - Reset(); + //Reset the buffer + Reset(); - return accumulated; - } + return accumulated; } } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs index 2aab2d3..0b29e43 100644 --- a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs +++ b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs @@ -24,31 +24,31 @@ using System; using System.IO; -using System.IO.Compression; using System.Net; -using System.Threading; +using System.Diagnostics; using System.Threading.Tasks; namespace VNLib.Net.Http.Core { +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task internal partial class HttpContext { /// - public async Task WriteResponseAsync(CancellationToken cancellation) + public async Task WriteResponseAsync() { /* * If exceptions are raised, the transport is unusable, the connection is terminated, * and the release method will be called so the context can be reused */ - ValueTask discardTask = Request.InputStream.DiscardRemainingAsync(Buffers); + ValueTask discardTask = Request.InputStream.DiscardRemainingAsync(); //See if discard is needed if (ResponseBody.HasData) { //Parallel the write and discard - Task response = WriteResponseInternalAsync(cancellation); + Task response = WriteResponseInternalAsync(); if (discardTask.IsCompletedSuccessfully) { @@ -57,7 +57,7 @@ namespace VNLib.Net.Http.Core } else { - //If discard is not complete, await both + //If discard is not complete, await both, avoid wait-all method because it will allocate await Task.WhenAll(discardTask.AsTask(), response); } } @@ -69,77 +69,46 @@ namespace VNLib.Net.Http.Core //Close response once send and discard are complete await Response.CloseAsync(); } - + /// /// If implementing application set a response entity body, it is written to the output stream /// - /// A token to cancel the operation - private async Task WriteResponseInternalAsync(CancellationToken token) + private Task WriteResponseInternalAsync() { //Adjust/append vary header Response.Headers.Add(HttpResponseHeader.Vary, "Accept-Encoding"); - //For head methods - if (Request.Method == HttpMethod.HEAD) - { - if (Request.Range != null) - { - //Get local range - Tuple range = Request.Range; - - //Calc constrained content length - long length = ResponseBody.GetResponseLengthWithRange(range); - - //End range is inclusive so substract 1 - long endRange = (range.Item1 + length) - 1; + bool hasRange = Request.Range != null; + long length = ResponseBody.Length; + CompressionMethod compMethod = CompressionMethod.None; - //Set content-range header - Response.SetContentRange(range.Item1, endRange, length); + /* + * Process range header, data will not be compressed because that would + * require buffering, not a feature yet, and since the range will tell + * us the content length, we can get a direct stream to write to + */ + if (hasRange) + { + //Get local range + Tuple range = Request.Range!; - //Specify what the content length would be - Response.Headers[HttpResponseHeader.ContentLength] = length.ToString(); + //Calc constrained content length + length = ResponseBody.GetResponseLengthWithRange(range); - } - else - { - //If the request method is head, do everything but send the body - Response.Headers[HttpResponseHeader.ContentLength] = ResponseBody.Length.ToString(); - } + //End range is inclusive so substract 1 + long endRange = (range.Item1 + length) - 1; - //We must send headers here so content length doesnt get overwritten, close will be called after this to flush to transport - Response.FlushHeaders(); + //Set content-range header + Response.SetContentRange(range.Item1, endRange, length); } - else + /* + * It will be known at startup whether compression is supported, if not this is + * essentially a constant. + */ + else if (ParentServer.SupportedCompressionMethods != CompressionMethod.None) { - Stream outputStream; - /* - * Process range header, data will not be compressed because that would - * require buffering, not a feature yet, and since the range will tell - * us the content length, we can get a direct stream to write to - */ - if (Request.Range != null) - { - //Get local range - Tuple range = Request.Range; - - //Calc constrained content length - long length = ResponseBody.GetResponseLengthWithRange(range); - - //End range is inclusive so substract 1 - long endRange = (range.Item1 + length) - 1; - - //Set content-range header - Response.SetContentRange(range.Item1, endRange, length); - - //Get the raw output stream and set the length to the number of bytes - outputStream = await Response.GetStreamAsync(length); - - await WriteEntityDataAsync(outputStream, length, token); - } - else - { - //Determine if compression should be used - bool compressionDisabled = + //Determine if compression should be used + bool compressionDisabled = //disabled because app code disabled it ContextFlags.IsSet(COMPRESSION_DISABLED_MSK) //Disabled because too large or too small @@ -148,109 +117,95 @@ namespace VNLib.Net.Http.Core //Disabled because lower than http11 does not support chunked encoding || Request.HttpVersion < HttpVersion.Http11; + if (!compressionDisabled) + { //Get first compression method or none if disabled - HttpRequestExtensions.CompressionType ct = compressionDisabled ? HttpRequestExtensions.CompressionType.None : Request.GetCompressionSupport(); + compMethod = Request.GetCompressionSupport(ParentServer.SupportedCompressionMethods); - switch (ct) + //Set response headers + switch (compMethod) { - case HttpRequestExtensions.CompressionType.Gzip: - { - //Specify gzip encoding (using chunked encoding) - Response.Headers[HttpResponseHeader.ContentEncoding] = "gzip"; - - //get the chunked output stream - Stream chunked = await Response.GetStreamAsync(); - - //Use chunked encoding and send data as its written - outputStream = new GZipStream(chunked, ParentServer.Config.CompressionLevel, false); - } - break; - case HttpRequestExtensions.CompressionType.Deflate: - { - //Specify gzip encoding (using chunked encoding) - Response.Headers[HttpResponseHeader.ContentEncoding] = "deflate"; - //get the chunked output stream - Stream chunked = await Response.GetStreamAsync(); - //Use chunked encoding and send data as its written - outputStream = new DeflateStream(chunked, ParentServer.Config.CompressionLevel, false); - } + case CompressionMethod.Gzip: + //Specify gzip encoding (using chunked encoding) + Response.Headers[HttpResponseHeader.ContentEncoding] = "gzip"; break; - case HttpRequestExtensions.CompressionType.Brotli: - { - //Specify Brotli encoding (using chunked encoding) - Response.Headers[HttpResponseHeader.ContentEncoding] = "br"; - //get the chunked output stream - Stream chunked = await Response.GetStreamAsync(); - //Use chunked encoding and send data as its written - outputStream = new BrotliStream(chunked, ParentServer.Config.CompressionLevel, false); - } + case CompressionMethod.Deflate: + //Specify delfate encoding (using chunked encoding) + Response.Headers[HttpResponseHeader.ContentEncoding] = "deflate"; break; - //Default is no compression - case HttpRequestExtensions.CompressionType.None: - default: - //Since we know how long the response will be, we can submit it now (see note above for same issues) - outputStream = await Response.GetStreamAsync(ResponseBody.Length); + case CompressionMethod.Brotli: + //Specify Brotli encoding (using chunked encoding) + Response.Headers[HttpResponseHeader.ContentEncoding] = "br"; break; } - - //Write entity to output - await WriteEntityDataAsync(outputStream, token); } } - } - private async Task WriteEntityDataAsync(Stream outputStream, CancellationToken token) - { - try + //Check on head methods + if (Request.Method == HttpMethod.HEAD) { - //Determine if buffer is required - if (ResponseBody.BufferRequired) - { - //Get response data buffer, may be smaller than suggested size - Memory buffer = Buffers.GetResponseDataBuffer(); + //Specify what the content length would be + Response.Headers[HttpResponseHeader.ContentLength] = length.ToString(); - //Write response - await ResponseBody.WriteEntityAsync(outputStream, buffer, token); - } - //No buffer is required, write response directly - else - { - //Write without buffer - await ResponseBody.WriteEntityAsync(outputStream, null, token); - } + //We must send headers here so content length doesnt get overwritten, close will be called after this to flush to transport + Response.FlushHeaders(); + + return Task.CompletedTask; } - finally + else { - //always dispose output stream - await outputStream.DisposeAsync(); + //Set the explicit length if a range was set + return WriteEntityDataAsync(length, compMethod, hasRange); } } - - private async Task WriteEntityDataAsync(Stream outputStream, long length, CancellationToken token) + + private async Task WriteEntityDataAsync(long length, CompressionMethod compMethod, bool hasExplicitLength) { - try + //Get output stream, and always dispose it + await using Stream outputStream = await GetOutputStreamAsync(length, compMethod); + + //Determine if buffer is required + Memory buffer = ResponseBody.BufferRequired ? Buffers.GetResponseDataBuffer() : Memory.Empty; + + /* + * Using compression, we must initialize a compressor, and write the response + * with the locked compressor + */ + if (compMethod != CompressionMethod.None) { - //Determine if buffer is required - if (ResponseBody.BufferRequired) - { - //Get response data buffer, may be smaller than suggested size - Memory buffer = Buffers.GetResponseDataBuffer(); + //Compressor must never be null at this point + Debug.Assert(_compressor != null, "Compression was allowed but the compressor was not initialized"); + + //Init compressor (Deinint is deferred to the end of the request) + _compressor.Init(outputStream, compMethod); + + //Write response + await ResponseBody.WriteEntityAsync(_compressor, buffer); - //Write response - await ResponseBody.WriteEntityAsync(outputStream, length, buffer, token); - } - //No buffer is required, write response directly - else - { - //Write without buffer - await ResponseBody.WriteEntityAsync(outputStream, length, null, token); - } } - finally + /* + * Explicit length may be set when the response may have more data than requested + * by the client. IE: when a range is set, we need to make sure we sent exactly the + * correct data, otherwise the client will drop the connection. + */ + else if(hasExplicitLength) { - //always dispose output stream - await outputStream.DisposeAsync(); + //Write response with explicit length + await ResponseBody.WriteEntityAsync(outputStream, length, buffer); + + } + else + { + await ResponseBody.WriteEntityAsync(outputStream, buffer); } } + + private ValueTask GetOutputStreamAsync(long length, CompressionMethod compMethod) + { + return compMethod == CompressionMethod.None ? Response.GetStreamAsync(length) : Response.GetStreamAsync(); + } + +#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task + } } diff --git a/lib/Net.Http/src/Core/Response/HttpResponse.cs b/lib/Net.Http/src/Core/Response/HttpResponse.cs index 6d974ed..c09c3f7 100644 --- a/lib/Net.Http/src/Core/Response/HttpResponse.cs +++ b/lib/Net.Http/src/Core/Response/HttpResponse.cs @@ -25,6 +25,7 @@ using System; using System.IO; using System.Net; +using System.Diagnostics; using System.Threading.Tasks; using System.Collections.Generic; using System.Runtime.CompilerServices; @@ -33,11 +34,13 @@ using VNLib.Utils; using VNLib.Utils.IO; using VNLib.Utils.Memory; using VNLib.Utils.Extensions; + using VNLib.Net.Http.Core.Buffering; +using VNLib.Net.Http.Core.Response; namespace VNLib.Net.Http.Core { - internal partial class HttpResponse : IHttpLifeCycle + internal sealed class HttpResponse : IHttpLifeCycle #if DEBUG ,IStringSerializeable #endif @@ -79,6 +82,7 @@ namespace VNLib.Net.Http.Core /// Sets the status code of the response /// /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void SetStatusCode(HttpStatusCode code) { if (HeadersBegun) @@ -97,36 +101,7 @@ namespace VNLib.Net.Http.Core internal void AddCookie(HttpCookie cookie) => Cookies.Add(cookie); /// - /// Allows sending an early 100-Continue status message to the client - /// - /// - internal async Task SendEarly100ContinueAsync() - { - Check(); - - //Send a status message with the continue response status - Writer.WriteToken(HttpHelpers.GetResponseString(ContextInfo.CurrentVersion, HttpStatusCode.Continue)); - - //Trailing crlf - Writer.WriteTermination(); - - //Get the response data header block - Memory responseBlock = Writer.GetResponseData(); - - //get base stream - Stream bs = ContextInfo.GetTransport(); - - //Write the response data to the base stream - await bs.WriteAsync(responseBlock); - - //Flush the base stream - await bs.FlushAsync(); - } - - - /// - /// Sends the status message and all available headers to the client. - /// Headers set after method returns will be sent when output stream is requested or scope exits + /// Compiles and flushes all headers to the header accumulator ready for sending /// /// /// @@ -187,7 +162,7 @@ namespace VNLib.Net.Http.Core Writer.CommitChars(ref writer); } - private ValueTask EndFlushHeadersAsync(Stream transport) + private ValueTask EndFlushHeadersAsync() { //Sent all available headers FlushHeaders(); @@ -201,6 +176,15 @@ namespace VNLib.Net.Http.Core //Update sent headers HeadersSent = true; + //Get the transport stream to write the response data to + Stream transport = ContextInfo.GetTransport(); + + /* + * ASYNC NOTICE: It is safe to get the memory block then return the task + * because the response writer is not cleaned up until the OnComplete() + * method, so the memory block is valid until then. + */ + //Write the response data to the base stream return responseBlock.IsEmpty ? ValueTask.CompletedTask : transport.WriteAsync(responseBlock); } @@ -212,23 +196,20 @@ namespace VNLib.Net.Http.Core /// A configured for writing data to client /// /// - public async ValueTask GetStreamAsync(long ContentLength) + public ValueTask GetStreamAsync(long ContentLength) { Check(); //Add content length header Headers[HttpResponseHeader.ContentLength] = ContentLength.ToString(); - //End sending headers so the user can write to the ouput stream - Stream transport = ContextInfo.GetTransport(); - - await EndFlushHeadersAsync(transport); - - //Init direct stream - ReusableDirectStream.Prepare(transport); + //Flush headers + ValueTask flush = EndFlushHeadersAsync(); - //Return the direct stream - return ReusableDirectStream; + //Return the reusable stream + return flush.IsCompletedSuccessfully ? + ValueTask.FromResult(ReusableDirectStream) + : GetStreamAsyncCore(flush, ReusableDirectStream); } /// @@ -237,26 +218,30 @@ namespace VNLib.Net.Http.Core /// supporting chunked encoding /// /// - public async ValueTask GetStreamAsync() + public ValueTask GetStreamAsync() { -#if DEBUG - if (ContextInfo.CurrentVersion != HttpVersion.Http11) - { - throw new InvalidOperationException("Chunked transfer encoding is not acceptable for this http version"); - } -#endif + //Chunking is only an http 1.1 feature (should never get called otherwise) + Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11); + Check(); //Set encoding type to chunked with user-defined compression Headers[HttpResponseHeader.TransferEncoding] = "chunked"; - //End sending headers so the user can write to the ouput stream - Stream transport = ContextInfo.GetTransport(); - - await EndFlushHeadersAsync(transport); + //Flush headers + ValueTask flush = EndFlushHeadersAsync(); //Return the reusable stream - return ReusableChunkedStream; + return flush.IsCompletedSuccessfully ? + ValueTask.FromResult(ReusableChunkedStream) + : GetStreamAsyncCore(flush, ReusableChunkedStream); + } + + private static async ValueTask GetStreamAsyncCore(ValueTask flush, Stream stream) + { + //Await the flush and get the stream + await flush.ConfigureAwait(false); + return stream; } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -267,16 +252,45 @@ namespace VNLib.Net.Http.Core throw new InvalidOperationException("Headers have already been sent!"); } } - + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + + /// + /// Allows sending an early 100-Continue status message to the client + /// + /// + internal async Task SendEarly100ContinueAsync() + { + Check(); + + //Send a status message with the continue response status + Writer.WriteToken(HttpHelpers.GetResponseString(ContextInfo.CurrentVersion, HttpStatusCode.Continue)); + + //Trailing crlf + Writer.WriteTermination(); + + //Get the response data header block + Memory responseBlock = Writer.GetResponseData(); + + //get base stream + Stream bs = ContextInfo.GetTransport(); + + //Write the response data to the base stream + await bs.WriteAsync(responseBlock); + + //Flush the base stream to send the data to the client + await bs.FlushAsync(); + } + /// /// Finalzies the response to a client by sending all available headers if /// they have not been sent yet /// /// - internal async ValueTask CloseAsync() + internal ValueTask CloseAsync() { - //If headers havent been sent yet, send them and there must be no content - if (!HeadersBegun || !HeadersSent) + //If headers haven't been sent yet, send them and there must be no content + if (!HeadersSent) { //RFC 7230, length only set on 200 + but not 204 if ((int)_code >= 200 && (int)_code != 204) @@ -285,29 +299,36 @@ namespace VNLib.Net.Http.Core Headers[HttpResponseHeader.ContentLength] = "0"; } - //Flush transport - Stream transport = ContextInfo.GetTransport(); - //Finalize headers - await EndFlushHeadersAsync(transport); - - //Flush transport - await transport.FlushAsync(); + return EndFlushHeadersAsync(); } + + return ValueTask.CompletedTask; } - +#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task + + /// public void OnPrepare() - { - //Propagate all child lifecycle hooks - ReusableChunkedStream.OnPrepare(); - } + { } + /// public void OnRelease() { ReusableChunkedStream.OnRelease(); + ReusableDirectStream.OnRelease(); + } + + /// + public void OnNewConnection() + { + //Get the transport stream and init streams + Stream transport = ContextInfo.GetTransport(); + ReusableChunkedStream.OnNewConnection(transport); + ReusableDirectStream.OnNewConnection(transport); } + /// public void OnNewRequest() { //Default to okay status code @@ -316,6 +337,7 @@ namespace VNLib.Net.Http.Core ReusableChunkedStream.OnNewRequest(); } + /// public void OnComplete() { //Clear headers and cookies diff --git a/lib/Net.Http/src/Core/Response/ResponseWriter.cs b/lib/Net.Http/src/Core/Response/ResponseWriter.cs index c9f20b5..7a448a1 100644 --- a/lib/Net.Http/src/Core/Response/ResponseWriter.cs +++ b/lib/Net.Http/src/Core/Response/ResponseWriter.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Http @@ -28,12 +28,12 @@ using System.Threading; using System.Threading.Tasks; using VNLib.Utils.Extensions; - +using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core { - [System.Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "")] - internal sealed class ResponseWriter : IHttpResponseBody, IHttpLifeCycle + + internal sealed class ResponseWriter : IHttpResponseBody { private Stream? _streamResponse; private IMemoryResponseReader? _memoryResponse; @@ -89,82 +89,185 @@ namespace VNLib.Net.Http.Core return true; } +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + /// - async Task IHttpResponseBody.WriteEntityAsync(Stream dest, long count, Memory? buffer, CancellationToken token) + async Task IHttpResponseBody.WriteEntityAsync(Stream dest, long count, Memory buffer) { + int remaining; + ReadOnlyMemory segment; + //Write a sliding window response if (_memoryResponse != null) { //Get min value from count/range length - int remaining = (int)Math.Min(count, _memoryResponse.Remaining); + remaining = (int)Math.Min(count, _memoryResponse.Remaining); //Write response body from memory while (remaining > 0) { //Get remaining segment - ReadOnlyMemory segment = _memoryResponse.GetRemainingConstrained(remaining); - + segment = _memoryResponse.GetRemainingConstrained(remaining); + //Write segment to output stream - await dest.WriteAsync(segment, token); - - int written = segment.Length; - + await dest.WriteAsync(segment); + //Advance by the written ammount - _memoryResponse.Advance(written); + _memoryResponse.Advance(segment.Length); //Update remaining - remaining -= written; + remaining -= segment.Length; } } else { //Buffer is required, and count must be supplied - await _streamResponse!.CopyToAsync(dest, buffer!.Value, count, token); + await _streamResponse!.CopyToAsync(dest, buffer, count); + + //Try to dispose the response stream asyncrhonously since we are done with it + await _streamResponse!.DisposeAsync(); + + //remove ref so its not disposed again + _streamResponse = null; } } - /// - async Task IHttpResponseBody.WriteEntityAsync(Stream dest, Memory? buffer, CancellationToken token) + /// + async Task IHttpResponseBody.WriteEntityAsync(Stream dest, Memory buffer) { + ReadOnlyMemory segment; + //Write a sliding window response if (_memoryResponse != null) { //Write response body from memory while (_memoryResponse.Remaining > 0) { - //Get segment - ReadOnlyMemory segment = _memoryResponse.GetMemory(); + //Get remaining segment + segment = _memoryResponse.GetMemory(); - await dest.WriteAsync(segment, token); + //Write segment to output stream + await dest.WriteAsync(segment); - //Advance by + //Advance by the written ammount _memoryResponse.Advance(segment.Length); } } else { - //Buffer is required - await _streamResponse!.CopyToAsync(dest, buffer!.Value, token); + //Buffer is required, and count must be supplied + await _streamResponse!.CopyToAsync(dest, buffer); - //Try to dispose the response stream + //Try to dispose the response stream asyncrhonously since we are done with it await _streamResponse!.DisposeAsync(); - - //remove ref + + //remove ref so its not disposed again _streamResponse = null; } } - /// - void IHttpLifeCycle.OnPrepare() - {} - - /// - void IHttpLifeCycle.OnRelease() - {} + /// + async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor dest, Memory buffer) + { + //Locals + bool remaining; + int read; + ReadOnlyMemory segment; - /// - void IHttpLifeCycle.OnNewRequest() - {} + //Write a sliding window response + if (_memoryResponse != null) + { + /* + * It is safe to assume if a response body was set, that it contains data. + * So the cost or running a loop without data is not a concern. + * + * Since any failed writes to the output will raise exceptions, it is safe + * to advance the reader before writing the data, so we can determine if the + * block is final. + * + * Since we are using a byte-stream reader for memory responses, we can optimize the + * compression loop, if we know its operating block size, so we only compress blocks + * of the block size, then continue the loop without branching or causing nested + * loops + */ + + //Optimize for block size + if (dest.BlockSize > 0) + { + //Write response body from memory + do + { + segment = _memoryResponse.GetRemainingConstrained(dest.BlockSize); + + //Advance by the trimmed segment length + _memoryResponse.Advance(segment.Length); + + //Check if data is remaining after an advance + remaining = _memoryResponse.Remaining > 0; + + //Compress the trimmed block + await dest.CompressBlockAsync(segment, !remaining); + + } while (remaining); + } + else + { + do + { + segment = _memoryResponse.GetMemory(); + + //Advance by the segment length, this should be safe even if its zero + _memoryResponse.Advance(segment.Length); + + //Check if data is remaining after an advance + remaining = _memoryResponse.Remaining > 0; + + //Write to output + await dest.CompressBlockAsync(segment, !remaining); + + } while (remaining); + } + + //Disposing of memory response can be deferred until the end of the request since its always syncrhonous + } + else + { + //Trim buffer to block size if it is set by the compressor + if (dest.BlockSize > 0) + { + buffer = buffer[..dest.BlockSize]; + } + + //Read in loop + do + { + //read + read = await _streamResponse!.ReadAsync(buffer, CancellationToken.None); + + //Guard + if (read == 0) + { + break; + } + + //write only the data that was read, as a segment instead of a block + await dest.CompressBlockAsync(buffer[..read], read < buffer.Length); + + } while (true); + + /* + * Try to dispose the response stream asyncrhonously since we can safley here + * otherwise it will be deferred until the end of the request cleanup + */ + await _streamResponse!.DisposeAsync(); + + //remove ref so its not disposed again + _streamResponse = null; + } + } + +#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task + public void OnComplete() { diff --git a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs new file mode 100644 index 0000000..0be4821 --- /dev/null +++ b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs @@ -0,0 +1,95 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: ReusableResponseStream.cs +* +* ReusableResponseStream.cs is part of VNLib.Net.Http which is part +* of the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Net.Http.Core.Response +{ + +#pragma warning disable CA2215 // Dispose methods should call base class dispose +#pragma warning disable CA1844 // Provide memory-based overrides of async methods when subclassing 'Stream' + + internal abstract class ReusableResponseStream : Stream + { + protected Stream? transport; + + /// + /// Called when a new connection is established + /// + /// + public virtual void OnNewConnection(Stream transport) => this.transport = transport; + + /// + /// Called when the connection is released + /// + public virtual void OnRelease() => this.transport = null; + + + //Block base dispose + protected override void Dispose(bool disposing) + { } + + //Block base close + public override void Close() + { } + + //block base dispose async + public override ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + + //Block flush + public override void Flush() + { } + + //Block flush async + public override Task FlushAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + //Block stream basics + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => throw new NotSupportedException(); + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + //Reading is not enabled + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream cannot be read from"); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream does not support seeking"); + public override void SetLength(long value) => throw new NotSupportedException("This stream does not support seeking"); + + public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count)); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Core/ServerPreEncodedSegments.cs b/lib/Net.Http/src/Core/ServerPreEncodedSegments.cs new file mode 100644 index 0000000..4649813 --- /dev/null +++ b/lib/Net.Http/src/Core/ServerPreEncodedSegments.cs @@ -0,0 +1,46 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: ServerPreEncodedSegments.cs +* +* ServerPreEncodedSegments.cs is part of VNLib.Net.Http which +* is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Net.Http.Core +{ + /// + /// Holds pre-encoded buffer segments for http request/responses + /// + /// + /// Holds ref to internal buffer + /// + internal readonly record struct ServerPreEncodedSegments(byte[] Buffer) + { + /// + /// Holds a pre-encoded segment for all crlf (line termination) bytes + /// + public readonly HttpEncodedSegment CrlfBytes { get; init; } = default; + + /// + /// Holds a pre-encoded segment for the final chunk termination + /// in http chuncked encoding + /// + public readonly HttpEncodedSegment FinalChunkTermination { get; init; } = default; + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Helpers/HelperTypes.cs b/lib/Net.Http/src/Helpers/HelperTypes.cs index d9db604..ecdb28c 100644 --- a/lib/Net.Http/src/Helpers/HelperTypes.cs +++ b/lib/Net.Http/src/Helpers/HelperTypes.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Http @@ -101,21 +101,21 @@ namespace VNLib.Net.Http /// None, /// + /// Http Version 0.9 + /// + Http09 = 0x01, + /// /// Http Version 1 /// - Http1 = 0x01, + Http1 = 0x02, /// /// Http Version 1.1 /// - Http11 = 0x02, + Http11 = 0x04, /// /// Http Version 2.0 /// - Http2 = 0x04, - /// - /// Http Version 0.9 - /// - Http09 = 0x08, + Http2 = 0x08, /// /// Http Version 3.0 /// diff --git a/lib/Net.Http/src/HttpBufferConfig.cs b/lib/Net.Http/src/HttpBufferConfig.cs index d4dc0f4..0191188 100644 --- a/lib/Net.Http/src/HttpBufferConfig.cs +++ b/lib/Net.Http/src/HttpBufferConfig.cs @@ -60,11 +60,6 @@ namespace VNLib.Net.Http /// public readonly int ResponseHeaderBufferSize { get; init; } = 16 * 1024; - /// - /// The size (in bytes) of the buffer to use to discard unread request entity bodies - /// - public readonly int DiscardBufferSize { get; init; } = 64 * 1024; - /// /// The size of the buffer to use when writing response data to the transport /// diff --git a/lib/Net.Http/src/HttpConfig.cs b/lib/Net.Http/src/HttpConfig.cs index 6e22fea..e1bc103 100644 --- a/lib/Net.Http/src/HttpConfig.cs +++ b/lib/Net.Http/src/HttpConfig.cs @@ -24,7 +24,6 @@ using System; using System.Text; -using System.IO.Compression; using VNLib.Utils.Logging; @@ -75,12 +74,6 @@ namespace VNLib.Net.Http /// public readonly Encoding HttpEncoding { get; init; } = Encoding.UTF8; - /// - /// Sets the compression level for response entity streams of all supported types when - /// compression is used. - /// - public readonly CompressionLevel CompressionLevel { get; init; } = CompressionLevel.Optimal; - /// /// Sets the default Http version for responses when the client version cannot be parsed from the request /// @@ -120,5 +113,11 @@ namespace VNLib.Net.Http /// The buffer configuration for the server /// public readonly HttpBufferConfig BufferConfig { get; init; } = new(); + + /// + /// Gets the used to manage response compression for + /// the server. + /// + public readonly IHttpCompressorManager? CompressorManager { get; init; } = null; } } \ No newline at end of file -- cgit