Skip to content

Commit c1954b3

Browse files
committed
wip
1 parent ff5329f commit c1954b3

2 files changed

Lines changed: 118 additions & 7 deletions

File tree

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package net.typedrest.http
2+
3+
namespace TypedRest.Http;
4+
5+
/// <summary>
6+
/// Exposes an HTTP body as a stream of deserialized entities.
7+
/// </summary>
8+
/// <typeparam name="TEntity">The type of entity this stream provides.</typeparam>
9+
public class HttpEntityStream<TEntity>
10+
{
11+
private readonly HttpContent _content;
12+
private readonly MediaTypeFormatter _serializer;
13+
private readonly byte[] _separatorPattern;
14+
15+
private Stream? _stream;
16+
private byte[] _buffer;
17+
private int _startIndex, _endIndex;
18+
19+
/// <summary>
20+
/// Creates a new HTTP entity stream.
21+
/// </summary>
22+
/// <param name="content">The HTTP body.</param>
23+
/// <param name="serializer">Used to deserialize entities in the body.</param>
24+
/// <param name="separator">The character sequence used to detect that a new element starts in an HTTP stream.</param>
25+
/// <param name="bufferSize">The size of the buffer used to collect data for deserialization in bytes.</param>
26+
public HttpEntityStream(HttpContent content, MediaTypeFormatter serializer, string separator, int bufferSize)
27+
{
28+
_content = content ?? throw new ArgumentNullException(nameof(content));
29+
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
30+
_separatorPattern = Encoding.GetEncoding(content.Headers.ContentType?.CharSet ?? "UTF-8").GetBytes(separator ?? throw new ArgumentNullException(nameof(separator)));
31+
_buffer = new byte[bufferSize];
32+
}
33+
34+
/// <summary>
35+
/// Retrieves the next entity from the stream.
36+
/// </summary>
37+
/// <exception cref="EndOfStreamException">The stream has ended and there are no further entities.</exception>
38+
public TEntity GetNext()
39+
{
40+
_stream ??= _content.ReadAsStream();
41+
42+
if (_startIndex >= _endIndex)
43+
{
44+
int count = FillBuffer();
45+
if (count == 0) throw new EndOfStreamException();
46+
}
47+
48+
while (true)
49+
{
50+
int separatorIndex = FindSeparator();
51+
if (separatorIndex != -1)
52+
{
53+
// Complete entity
54+
return Parse(separatorIndex);
55+
}
56+
else if (separatorIndex < _endIndex)
57+
{
58+
// Potentially incomplete entity
59+
try
60+
{
61+
return Parse(_endIndex);
62+
}
63+
catch
64+
{
65+
int count = FillBuffer();
66+
if (count == 0) throw;
67+
}
68+
}
69+
}
70+
}
71+
72+
private Task<int> FillBuffer()
73+
{
74+
if (_startIndex != 0) TrimBuffer();
75+
76+
int count = _stream!.Read(_buffer, _endIndex, _buffer.Length - _endIndex);
77+
_endIndex += count;
78+
return count;
79+
}
80+
81+
private void TrimBuffer()
82+
{
83+
if (_startIndex >= _endIndex)
84+
{
85+
_startIndex = 0;
86+
_endIndex = 0;
87+
}
88+
else
89+
{
90+
int newEndIndex = _endIndex - _startIndex;
91+
92+
var newBuffer = new byte[_buffer.Length];
93+
Array.Copy(_buffer, _startIndex, newBuffer, 0, newEndIndex);
94+
95+
_buffer = newBuffer;
96+
_startIndex = 0;
97+
_endIndex = newEndIndex;
98+
}
99+
}
100+
101+
private int FindSeparator()
102+
=> _buffer.IndexOfPattern(_separatorPattern, _startIndex, count: _endIndex - _startIndex);
103+
104+
private Task<TEntity> Parse(int separatorIndex)
105+
{
106+
using var subStream = new MemoryStream(_buffer, _startIndex, count: separatorIndex - _startIndex);
107+
var result = (TEntity)_serializer.ReadFromStream(typeof(TEntity), subStream, _content, null);
108+
_startIndex = separatorIndex + _separatorPattern.Length;
109+
return result;
110+
}
111+
}

typedrest-reactive/src/main/kotlin/net/typedrest/http/ResponseExtensions.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,27 +46,27 @@ fun <TEntity : Any> Response.entitySequence(
4646
val source = body!!.source()
4747
val buffer = Buffer()
4848

49+
while(!finishedReading) {
50+
51+
}
52+
4953
while (source.read(buffer, bufferSize.toLong()) != -1L || !buffer.exhausted()) {
5054
var separatorIndex = buffer.indexOf(separatorBytes)
5155
do {
5256
val bytes = if (separatorIndex == -1L) buffer.readByteArray() else buffer.readByteArray(separatorIndex)
5357
buffer.skip(separatorBytes.size.toLong())
5458

55-
deserialize(serializer, bytes, entityType)?.let { yield(it) }
59+
parseEntity(serializer, bytes, entityType)?.let { yield(it) }
5660
separatorIndex = buffer.indexOf(separatorBytes)
5761
} while (separatorIndex != -1L)
5862
}
5963

6064
if (!buffer.exhausted()) {
61-
deserialize(serializer, buffer.readByteArray(), entityType)?.let { yield(it) }
65+
parseEntity(serializer, buffer.readByteArray(), entityType)?.let { yield(it) }
6266
}
6367
}
6468

65-
private fun <T : Any> deserialize(
66-
serializer: Serializer,
67-
bytes: ByteArray,
68-
type: Class<T>
69-
): T? {
69+
private fun <T : Any> parseEntity(bytes: ByteArray, serializer: Serializer, type: Class<T>): T? {
7070
if (bytes.isEmpty()) return null
7171
val mediaType: MediaType? = serializer.supportedMediaTypes.firstOrNull()
7272
return serializer.deserialize(bytes.toResponseBody(mediaType), type)

0 commit comments

Comments
 (0)