Iteratee in F# – Part 1

I’ve been playing with Iteratees lately in my work with
Dave on fracture-io.

The Iteratee module used in this post is part of the FSharpx library and provides a set of types and functions for building compositional, input processing components.

A fold, by any other name

A common scenario for I/O processing is parsing an HTTP request message. Granted, most will rely on ASP.NET, HttpListener, or WCF to do this for them. However, HTTP request parsing has a lot of interesting elements that are useful for demonstrating problem areas in inefficient resource usage using other techniques. For our running sample, we’ll focus on parsing the headers of the following HTTP request message:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let httpRequest : byte [] = @"GET /some/uri HTTP/1.1
Accept:text/html,application/xhtml+xml,application/xml
Accept-Charset:ISO-8859-1,utf-8;q=0.7,*;q=0.3
Accept-Encoding:gzip,deflate,sdch
Accept-Language:en-US,en;q=0.8
Cache-Control:max-age=0
Connection:keep-alive
Host:stackoverflow.com
If-Modified-Since:Sun, 25 Sep 2011 20:55:29 GMT
Referer:http://www.bing.com/search?setmkt=en-US&q=don't+use+IEnumerable%3Cbyte%3E
User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.4 (KHTML, like Gecko) Chrome/16.0.889.0 Safari/535.4

<!DOCTYPE HTML PUBLIC ""-//W3C//DTD HTML 4.01//EN"" ""http://www.w3.org/TR/html4/strict.dtd"">
<html>
<head>
...
</head>
<body>
...
</body>
</html>"B

When parsing an HTTP request message, we most likely want to return not just a list of strings but something more descriptive, such as an abstract syntax tree represented as a F# discriminated union. A perfect candidate for taking in our request message above and producing the desired result is Seq.fold.

1
val fold : ('State -> 'a -> 'State) -> 'State -> seq<'a> -> 'State

The left fold is a very useful function. It equivalent to the Enumerable.Aggregate extension method in LINQ. This function takes in a state transformation function, an initial seed value, and a sequence of data, and produces a final state value incrementally.

Looks like we’re done here. However, there are still problems with stopping here:

Composition

You can certainly use function composition to generate an appropriate state incrementing function, but you would still have the problem of being able to pause to delay selecting the appropriate message body processor.

Early termination

Suppose you really only ever want just the message headers. How would you stop processing to free your resources as quickly as possible? Forget it. Fold is going to keep going until it runs out of chunks. Even if you have your fold function stop updating the state after the headers are complete, you won’t get a result until the entire data stream has been processed.

Iteratees

The iteratee itself is a data consumer. It consumes data in chunks until it has either consumed enough data to produce a result or receives an EOF.

An iteratee is based on the fold operator with two differences:

  1. The iteratee may complete its processing early by returning a Done state. The iteratee may return the unused portion of any chunk it was passed in its Done state. This should not be confused with the rest of the stream not yet provided by the enumerator.

  2. The iteratee does not pull data but receives chunks of data from an “enumerator”. It returns a continuation to receive and process the next chunk. This means that an iteratee may be paused at any time, either by neglecting to pass it any additional data or by passing an Empty chunk.

Inspiration

For those interested in reading more, there are any number of articles and examples. Here are a few I’ve referenced quite frequently:

The Scalaz version arguably comes closest to the F# implementation, but I have generally based my implementations on the Haskell source. I’ve got two implementations currently, a version based on the enumerator package, and a continuation-passing style version based on the iteratee package.

The iteratee is composed of one of two states, either a done state with a result and the remainder of the data stream, or a continue state containing a continuation to continue processing.

1
2
3
type Iteratee<'el, 'a> =
    | Done -> 'a * Stream<'el>
    | Continue -> (Stream<'el> -> Iteratee<'el, 'a>)

The Stream type used here is not System.IO.Stream but yet another discriminated union to represent different stream chunk states:

1
2
3
4
type Stream<'el> =
    | Chunk -> 'el  // A chunk of data
    | Empty         // An empty chunk represents a pause and can be used as a transition in compositions
    | EOF           // Denotes that no chunks remain

Some Iteratees

The following are some sample iteratees using the List<’a> type in F#. While not the most efficient, they are easy to express concisely and are thus better for illustrating the use of iteratees.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
let length<'a> : Iteratee<'a list, int> =
    let rec step n = function
        | Empty | Chunk [] -> Continue (step n)
        | Chunk x -> Continue (step (n + x.Length))
        | EOF -> Done(n, EOF)
    in Continue (step 0)

let peek<'a> : Iteratee<'a list, 'a option> =
    let rec inner =
        let rec step = function
            | Empty | Chunk ([]:'a list) -> inner
            | Chunk(x::xs) as s -> Done(Some x, s)
            | EOF -> Done(None, EOF)
        Continue step
    in inner

let head<'a> : Iteratee<'a list, 'a option> =
    let rec inner =
        let rec step = function
            | Empty | Chunk ([]:'a list) -> inner
            | Chunk(x::xs) -> Done(Some x, Chunk xs)
            | EOF -> Done(None, EOF)
        Continue step
    in inner

let drop n =
    let rec step n = function
        | Empty | Chunk [] -> Continue <| step n
        | Chunk str ->
            if str.Length < n then
                Continue <| step (n - str.Length)
            else let extra = List.skip n str in Done((), Chunk extra)
        | EOF -> Done((), EOF)
    in if n <= 0 then empty<_> else Continue (step n)

let private dropWithPredicate pred listOp =
    let rec step = function
        | Empty | Chunk [] -> Continue step
        | Chunk x ->
            match listOp pred x with
            | [] -> Continue step
            | x' -> Done((), Chunk x')
        | EOF as s -> Done((), s)
    in Continue step

let dropWhile pred = dropWithPredicate pred List.skipWhile
let dropUntil pred = dropWithPredicate pred List.skipUntil

let take n =
    let rec step before n = function
        | Empty | Chunk [] -> Continue <| step before n
        | Chunk str ->
            if str.Length < n then
                Continue <| step (before @ str) (n - str.Length)
            else let str', extra = List.splitAt n str in Done(before @ str', Chunk extra)
        | EOF -> Done(before, EOF)
    in if n <= 0 then Done([], Empty) else Continue (step [] n)

let private takeWithPredicate (pred:'a -> bool) listOp =
    let rec step before = function
        | Empty | Chunk [] -> Continue (step before)
        | Chunk str ->
            match listOp pred str with
            | str', [] -> Continue (step (before @ str'))
            | str', extra -> Done(before @ str', Chunk extra)
        | EOF -> Done(before, EOF)
    in Continue (step [])

let takeWhile pred = takeWithPredicate pred List.span
let takeUntil pred = takeWithPredicate pred List.split

let heads str =
    let rec loop count str =
        match count, str with
        | (count, []) -> Done(count, EOF)
        | (count, str) -> Continue (step count str)
    and step count str s =
        match str, s with
        | str, Empty -> loop count str
        | str, (Chunk []) -> loop count str
        | c::t, (Chunk (c'::t')) ->
            if c = c' then step (count + 1) t (Chunk t')
            else Done(count, Chunk (c'::t'))
        | _, s -> Done(count, s)
    loop 0 str

// readLines clearly shows the composition allowed by iteratees.
// Note the use of the heads and takeUntil combinators.
let readLines =
    let toString chars = String(Array.ofList chars)
    let newlines = ['r';'n']
    let newline = ['n']
    let isNewline c = c = 'r' || c = 'n'
    let terminators = heads newlines >>= fun n -> if n = 0 then heads newline else Done(n, Empty)
    let rec lines acc = takeUntil isNewline >>= fun l -> terminators >>= check acc l
    and check acc l count =
        match l, count with
        | _, 0 -> Done(Choice1Of2 (List.rev acc |> List.map toString), Chunk l)
        | [], _ -> Done(Choice2Of2 (List.rev acc |> List.map toString), EOF)
        | l, _ -> lines (l::acc)
    lines []

Enumerators

Enumerators feed data into iteratees, advancing their state until they complete.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Feed the EOF chunk to the iteratee in order to force the result.
let rec enumEOF = function
    | Done(x,_) -> Done(x, EOF)
    | Continue k ->
        match k EOF with
        | Continue _ -> failwith "enumEOF: divergent iteratee"
        | i -> enumEOF i

// Run the iteratee and either return the result or throw an exception for a divergent iteratee.
let run i =
    match enumEOF i with
    | Done(x,_) -> x
    | Continue _ -> failwith "run: divergent iteratee"

// Feed the data stream to the iteratee as a single chunk.
// This is useful for testing but not conserving memory.
// val enumeratePure1Chunk :: 'a list -> Enumerator<'a list,'b>
let enumeratePure1Chunk str i =
    match str, i with
    | [], _ -> i
    | _, Done(_,_) -> i
    | _::_, Continue k -> k (Chunk str)

// Feed data to the iteratee in chunks of size n.
// val enumeratePureNChunk :: 'a list -> int -> Enumerator<'a list,'b>
let rec enumeratePureNChunk n str i =
    match str, i with
    | [], _ -> i
    | _, Done(_,_) -> i
    | _::_, Continue k ->
        let x, xs = List.splitAt n str in enumeratePureNChunk n xs (k (Chunk x))

// Feed data to the iteratee one value at a time.
//val enumerate :: 'a list -> Enumerator<'a list,'b>
let rec enumerate str i =
    match str, i with
    | [], _ -> i
    | _, Done(_,_) -> i
    | x::xs, Continue k -> enumerate xs (k (Chunk [x]))

Reading an HTML Request by Lines

Back to our earlier example of reading HTTP requests, let’s see how we might read it by lines. I’ll add a few caveats. This is not encapsulating the Stream well as we are using a memory stream. I could wrap this up in an enumerateMemoryStream function that takes all the bytes to read (which will likely happen soon). The example below is using the Iteratee.Binary module, not the Iteratee.List module.

1
2
3
4
5
6
7
8
9
10
let runIteratee() =
    let sw = Stopwatch.StartNew()
    let result =
        [ for _ in 1..10000 do
            use stream = new MemoryStream(httpRequest)
            yield! match enumStream 128 stream readLines |> run with
                   | Choice1Of2 x -> x
                   | Choice2Of2 y -> y ]
    sw.Stop()
    printfn "Iteratee read %d lines in %d ms" result.Length sw.ElapsedMilliseconds

So what?

System.IO.Stream-based processing

The System.IO.Stream type should be familiar to anyone who has ever worked with I/O in .NET. Streams are the primary abstraction available for working with streams of data, whether over the file system (FileStream) or network protocols (NetworkStream). Streams also have a nice support structure in the form of TextReaders and TextWriters, among other, similar types.

Using the standard Stream processing apis, you might write something like the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let rec readConsecutiveLines (reader:System.IO.StreamReader) cont =
    if reader.EndOfStream then cont []
    else let line = reader.ReadLine()
         if System.String.IsNullOrEmpty(line) then cont []
         else readConsecutiveLines reader (fun tail -> cont (line::tail))

let readFromStream() =
    let sw = System.Diagnostics.Stopwatch.StartNew()
    let result =
        [ for _ in 1..10000 do
            use stream = new System.IO.MemoryStream(httpRequest)
            use reader = new System.IO.StreamReader(stream)
            yield readConsecutiveLines reader id ]
    sw.Stop()
    printfn "Stream read %d in %d ms" result.Length sw.ElapsedMilliseconds

readFromStream()

What might be the problems with this approach?

Blocking the current thread

This is a synchronous use of reading from a Stream. In fact, StreamReader can only be used in a synchronous fashion. There are no methods that even offer the Asynchronous Programming Model (APM). For that, you’ll need to read from the Stream in chunks and find the line breaks yourself. We’ll get to more on this in a few minutes.

Poor composition

First off, the sample above is performing side effects throughout, and side-effects don’t compose. You might suggest using standard function composition to create a complete message parser, but what if you don’t know ahead of time what type of request body you’ll receive? Can you pause processing to select the appropriate processor? You could certainly address this with some conditional branching, but where exactly do you tie that into your current logic? Also, the current parser breaks on lines using the StreamReader’s built in logic. If you want to do parsing on individual lines, where would you tie in that logic?

Memory consumption

In this example, we are not taking any care about our memory use. To that end, you might argue that we should just use StreamReader.ReadToEnd() and then break up the string using a compositional parser like FParsec. If we don’t care about careful use of memory, that’s actually quite a good idea. However, in most network I/O situations – such as writing high-performance servers – you really want to control your memory use very carefully. StreamReader does allow you to specify the chunk size, so it’s not a complete case against using StreamReader, which is why this is a last-place argument. And of course, you can certainly go after Stream.Read() directly.

IEnumerable-based processing

How can we add better compositionality and refrain from blocking the thread? One way others have solved this problem is through the use of iterators. A common example can be found on Stack Overflow. Iterators allow you to publish a lazy stream of data, either one element at a time or in chunks, and through LINQ or F#’s Seq module, chain together processors for the data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Read the stream byte by byte
let readStreamByByte (stream: System.IO.Stream) =
    seq { while true do
            let x = stream.ReadByte()
            if (int x) < 0 then ()
            else yield x }

// Read the stream by chunks
let readStreamByChunk chunkSize (stream: System.IO.Stream) =
    let buffer = Array.zeroCreate<byte> chunkSize
    seq { while true do
            let bytesRead = stream.Read(buffer, 0, chunkSize)
            if bytesRead = 0 then ()
            else yield buffer }

// When you are sure you want text by lines
let readStreamByLines (reader: System.IO.StreamReader) =
    seq { while not reader.EndOfStream do
            yield reader.ReadLine() }

Three options are presented. In each, I’m using a Stream or StreamReader, but you could just as easily replace those with a byte[], ArraySegment<byte>, or SocketAsyncEventArgs. What could be wrong with these options?

Lazy, therefore resource contentious

Iterators are pull-based, so you’ll only retrieve a chunk when requested. This sounds pretty good for your processing code, but it’s not a very good situation for your sockets. If you have data coming in, you want to get it moved out as quickly as possible to free up your allocated threads and pinned memory buffers for more incoming or outgoing traffic.

Lazy, therefore non-deterministic

Each of these items is lazy; therefore, it’s impossible to know when you can free up resources. Who owns the Stream or StreamReader passed into each method? When is it safe to free the resource? If used immediately within a function, you may be fine, but you’d also be just as well off if you used a list or array comprehension and blocked until all bytes were read. (The one possible exception might be when using a co-routine style async pattern.)

Summary

Iteratees are able to resolve a number of issues related to general stream processing, as well as either lazy, e.g. seq<_>- or Async<_>-based, non-determinism. I’m sure you are wondering why I didn’t dive deeper in explaining the iteratees above. In the next part, we’ll explore a more idiomatic .NET solution for creating iteratees. We’ll then decompose the iteratees above and cover each in more detail. In the meantime, please checkout FSharpx for additional information.

Advertisements