aboutsummaryrefslogtreecommitdiff
path: root/Utils/src/IO/WriteOnlyBufferedStream.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Utils/src/IO/WriteOnlyBufferedStream.cs')
-rw-r--r--Utils/src/IO/WriteOnlyBufferedStream.cs255
1 files changed, 0 insertions, 255 deletions
diff --git a/Utils/src/IO/WriteOnlyBufferedStream.cs b/Utils/src/IO/WriteOnlyBufferedStream.cs
deleted file mode 100644
index 5e7faa1..0000000
--- a/Utils/src/IO/WriteOnlyBufferedStream.cs
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Utils
-* File: WriteOnlyBufferedStream.cs
-*
-* WriteOnlyBufferedStream.cs is part of VNLib.Utils which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Utils is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published
-* by the Free Software Foundation, either version 2 of the License,
-* or (at your option) any later version.
-*
-* VNLib.Utils is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with VNLib.Utils. If not, see http://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Memory;
-
-namespace VNLib.Utils.IO
-{
- /// <summary>
- /// A basic accumulator style write buffered stream
- /// </summary>
- public class WriteOnlyBufferedStream : Stream
- {
- private readonly ISlindingWindowBuffer<byte> _buffer;
- private readonly bool LeaveOpen;
-
- /// <summary>
- /// Gets the underlying stream that interfaces with the backing store
- /// </summary>
- public Stream BaseStream { get; init; }
-
- /// <summary>
- /// Initalizes a new <see cref="WriteOnlyBufferedStream"/> using the
- /// specified backing stream, using the specified buffer size, and
- /// optionally leaves the stream open
- /// </summary>
- /// <param name="baseStream">The backing stream to write buffered data to</param>
- /// <param name="bufferSize">The size of the internal buffer</param>
- /// <param name="leaveOpen">A value indicating of the stream should be left open when the buffered stream is closed</param>
- public WriteOnlyBufferedStream(Stream baseStream, int bufferSize, bool leaveOpen = false)
- {
- BaseStream = baseStream;
- //Create buffer
- _buffer = InitializeBuffer(bufferSize);
- LeaveOpen = leaveOpen;
- }
- /// <summary>
- /// Invoked by the constuctor method to allocte the internal buffer with the specified buffer size.
- /// </summary>
- /// <param name="bufferSize">The requested size of the buffer to alloc</param>
- /// <remarks>By default requests the buffer from the <see cref="ArrayPool{T}.Shared"/> instance</remarks>
- protected virtual ISlindingWindowBuffer<byte> InitializeBuffer(int bufferSize)
- {
- return new ArrayPoolStreamBuffer<byte>(ArrayPool<byte>.Shared, bufferSize);
- }
-
- ///<inheritdoc/>
- public override void Close()
- {
- try
- {
- //Make sure the buffer is empty
- WriteBuffer();
-
- if (!LeaveOpen)
- {
- //Dispose stream
- BaseStream.Dispose();
- }
- }
- finally
- {
- _buffer.Close();
- }
- }
- ///<inheritdoc/>
- public override async ValueTask DisposeAsync()
- {
- try
- {
- if (_buffer.AccumulatedSize > 0)
- {
- await WriteBufferAsync(CancellationToken.None);
- }
-
- if (!LeaveOpen)
- {
- //Dispose stream
- await BaseStream.DisposeAsync();
- }
-
- GC.SuppressFinalize(this);
- }
- finally
- {
- _buffer.Close();
- }
- }
-
- ///<inheritdoc/>
- public override void Flush() => WriteBuffer();
- ///<inheritdoc/>
- public override Task FlushAsync(CancellationToken cancellationToken) => WriteBufferAsync(cancellationToken).AsTask();
-
- private void WriteBuffer()
- {
- //Only if data is available to write
- if (_buffer.AccumulatedSize > 0)
- {
- //Write data to stream
- BaseStream.Write(_buffer.Accumulated);
- //Reset position
- _buffer.Reset();
- }
- }
-
- private async ValueTask WriteBufferAsync(CancellationToken token = default)
- {
- if(_buffer.AccumulatedSize > 0)
- {
- await BaseStream.WriteAsync(_buffer.AccumulatedBuffer, token);
- _buffer.Reset();
- }
- }
- ///<inheritdoc/>
- public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count));
-
- public override void Write(ReadOnlySpan<byte> buffer)
- {
- ForwardOnlyReader<byte> reader = new(buffer);
- //Attempt to buffer/flush data until all data is sent
- do
- {
- //Try to buffer as much as possible
- ERRNO buffered = _buffer.TryAccumulate(reader.Window);
-
- if(buffered < reader.WindowSize)
- {
- //Buffer is full and needs to be flushed
- WriteBuffer();
- //Advance reader and continue to buffer
- reader.Advance(buffered);
- continue;
- }
-
- break;
- }
- while (true);
- }
-
- public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
- }
-
- public async override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
- {
- ForwardOnlyMemoryReader<byte> reader = new(buffer);
- //Attempt to buffer/flush data until all data is sent
- do
- {
- //Try to buffer as much as possible
- ERRNO buffered = _buffer.TryAccumulate(reader.Window.Span);
-
- if (buffered < reader.WindowSize)
- {
- //Buffer is full and needs to be flushed
- await WriteBufferAsync(cancellationToken);
- //Advance reader and continue to buffer
- reader.Advance(buffered);
- continue;
- }
-
- break;
- }
- while (true);
- }
-
-
- /// <summary>
- /// Always false
- /// </summary>
- public override bool CanRead => false;
- /// <summary>
- /// Always returns false
- /// </summary>
- public override bool CanSeek => false;
- /// <summary>
- /// Always true
- /// </summary>
- public override bool CanWrite => true;
- /// <summary>
- /// Returns the size of the underlying buffer
- /// </summary>
- public override long Length => _buffer.AccumulatedSize;
- /// <summary>
- /// Always throws <see cref="NotSupportedException"/>
- /// </summary>
- /// <exception cref="NotSupportedException"></exception>
- public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
- /// <summary>
- /// Always throws <see cref="NotSupportedException"/>
- /// </summary>
- /// <exception cref="NotSupportedException"></exception>
- /// <returns></returns>
- public override int Read(byte[] buffer, int offset, int count)
- {
- throw new NotSupportedException("This stream is not readable");
- }
-
- /// <summary>
- /// Always throws <see cref="NotSupportedException"/>
- /// </summary>
- /// <exception cref="NotSupportedException"></exception>
- public override long Seek(long offset, SeekOrigin origin)
- {
- throw new NotSupportedException();
- }
-
- /// <summary>
- /// Always throws <see cref="NotSupportedException"/>
- /// </summary>
- /// <exception cref="NotSupportedException"></exception>
- public override void SetLength(long value)
- {
- throw new NotSupportedException();
- }
-
- public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- throw new NotImplementedException();
- }
-
- public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
- {
- throw new NotImplementedException();
- }
- }
-}