Problems in coming up with good streaming primitives

Abstractions around streaming are difficult to get right.

Streaming is even more difficult with an async environment. The possibility for the same thread to do something else while waiting for read to complete presents additional challenges in control and timing.

You might think that async&greenlets isn't worth it because of the problems and challenges that it exposes. Doing so you would forget that the problems do not go away when you get rid of greenlets or async.

Interactive programs are always concurrently running. You either solve the problems originating from concurrency or then you pretend it doesn't exist in your more complicated callback or event-queue based programs.

Disclaimer: All of these concepts aren't present in Lever, and I am still making up my mind.

Primitives

The smallest unit in streaming has turned out to be a buffer. Second smallest is a list of buffers.

Buffers must be represented as arrays of unsigned bytes rather than strings, for two reasons. Strings are usually immutables for usability reasons whereas buffers should be mutable for usability. Another reason is that we got rid of the ASCII encoding a long time ago. Strings are a separate concept that stand apart from their binary representation.

UTF-8 encoding

UTF-8 is network byte order encoding that provides the following four formats for unicode characters:

0000 - 007F
0xxx xxxx

0080 - 07FF
110x xxxx  10xx xxxx

0800 - FFFF
1110 xxxx  10xx xxxx  10xx xxxx

10000 - 10FFFF
1111 0xxx  10xx xxxx  10xx xxxx  10xx xxxx

Conversion of UTF-8 into binary isn't usually a problem. The encoding can run without a state in a single function.

Decoding requires a state of some kind because a correct unicode string may consist of 1 to 4 bytes. Whichever method you are using to decode this format, it is clear that, aside invalid codepoints, you may receive truncated codepoints.

In a continuous stream the utf-8 decoder should handle the truncation situation, or at least something should handle it prior the decoder. To do exactly this we have Utf8Decoder in Lever:

dec = Utf8Decoder()
str0 = dec(data0)
str1 = dec(data1)
str2 = dec(data2)
dec.finish()

Utf8Decoder.finish will raise an error if the string was truncated. You can omit it if the truncation does not matter in your use case.

Events, queues and edges

In async model you have to be able to wait for an occurrence and react on it when it happens. The common framework for this has been events and queues.

Events are missed if they aren't waited for. They happen only once and aren't asked after. They can be waited by several tasks and listened by several objects at same time and every task and listener is triggered at the same time when the event is triggered.

Queues are collecting what they receive. Depending on the situation they either collect waiting tasks or accumulate values. There are several variations of queues. Queue may be attached to an event, but it doesn't need to source from such.

.length attribute of a queue reveals how many items are queued in. Zero length doesn't imply that the queue is closed, and in other hand nonzero length only implies that the queue can supply at least that many at the immediate moment.

Queues can be waited upon, but append into a queue doesn't cause a block. In some cases queues can be put to allow lookahead and putback. Anything that looks and behaves like a queue is a queue. As consequence it follows that Generators, and iterators are queues as well.

Edges are a special case from the queues in the sense that only the latest result, and the changes from an event matter. Edge stores whether the event was changed since the last wait and the value of the event. There are lot of situations where only the change since the last check matters.

All of these constructs can be closed. If during the close something is listening or waiting on them, the listening or waiting objects are enqueued with an exception whose flavor depends a bit on the context. Normally it is a discard exception. It is not reported if it reaches the eventloop and the purpose is to allow the waiting task to run the finalizers.

Files and streams

Lever implements its basic I/O through libuv, and libuv doesn't come with streaming capabilities for files because they require special treatment on every platform. Therefore file reads and writes are requests and behave different from streams.

This contradicts the UNIX principle that everything is a file. Instead of trying to abstract the difference away in my runtime, I thought this is fine. Lets embrace the difference.

After a while it has turned out to be a very rational choice. Streams indeed aren't files, and files aren't streams.

The difference is that you can random access a file unlike you can random access a stream. This also means that the semantics for file read and write are different from the stream read and write.

Linux called these concepts pread and pwrite, so I gave them the same names. They have the following format:

count = File.pread(data, offset)
count = File.pwrite(data, offset)

And streams have the following format:

Stream.on_read : Queue
Stream.read_start(maxbuf=default_maxbuf)
Stream.read_stop()
data  = Stream.read()
count = Stream.write(data)

The data is an array of unsigned bytes of course. write, pwrite and pread support vectored I/O and can accept a list of buffers.

Avoiding excess buffering

Stream.read() either grabs from the queue or waits. If nothing else is happening, it stops the stream so that the system isn't buffering more than the application can handle behind the scenes. This is an important feature for efficient networking.

Streams also provide a choice to start reading and react on the action with .read or a listener in the .on_read. If there is something waiting with .read, it takes a priority over the .on_read.

If the queue length reaches maxbuf, the read automatically stops until there comes more space into the queue.

Mmap is not async, and doesn't need to be

There is the story that someone stopped supporting mmap in node.js because it can't be made asynchronous. I think this was a humbug decision.

Memory-mapped I/O is common way to interface with external hardware today. It should always be an option for the user.

Similar posts