Skip to content

Commit c00e4d8

Browse files
committed
deploy: 4210e65
0 parents  commit c00e4d8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+18158
-0
lines changed

.nojekyll

Whitespace-only changes.

AsyncSeq.fsx

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#r "nuget: FSharp.Control.AsyncSeq,{{package-version}}"
2+
(**
3+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeq.ipynb)
4+
5+
# F# Async: FSharp.Control.AsyncSeq
6+
7+
> NOTE: There is also the option to use [FSharp.Control.TaskSeq](https://github.com/fsprojects/FSharp.Control.TaskSeq) which has a very similar usage model.
8+
9+
An AsyncSeq is a sequence in which individual elements are retrieved using an `Async` computation.
10+
It is similar to `seq<'a>` in that subsequent elements are pulled on-demand.
11+
`AsyncSeq` also bears similarity to `IObservable<'a>` with the former being based on an "asynchronous pull" and the
12+
latter based on a "synchronous push". Analogs for most operations defined for `Seq`, `List` and `IObservable` are also defined for
13+
`AsyncSeq`. The power of `AsyncSeq` lies in that many of these operations also have analogs based on `Async`
14+
allowing composition of complex asynchronous workflows.
15+
16+
The `AsyncSeq` type is located in the `FSharp.Control.AsyncSeq.dll` assembly which can be loaded in F# Interactive as follows:
17+
18+
*)
19+
#r "../../../bin/FSharp.Control.AsyncSeq.dll"
20+
open FSharp.Control
21+
(**
22+
### Generating asynchronous sequences
23+
24+
An `AsyncSeq<'T>` can be generated using computation expression syntax much like `seq<'T>`:
25+
26+
*)
27+
let async12 = asyncSeq {
28+
yield 1
29+
yield 2
30+
}
31+
(**
32+
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
33+
function accepts as an argument a function which can generate individual elements based on a state and
34+
signal completion of the sequence.
35+
36+
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
37+
which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
38+
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
39+
criteria and finally an operation to write the desired tweet to a database.
40+
41+
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
42+
be modeled with type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
43+
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
44+
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
45+
46+
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
47+
of tweet batches as follows:
48+
49+
*)
50+
type Tweet = {
51+
user : string
52+
message : string
53+
}
54+
55+
let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> =
56+
failwith "TODO: call Twitter API"
57+
58+
let tweetBatches : AsyncSeq<Tweet[]> =
59+
AsyncSeq.unfoldAsync getTweetBatch 0
60+
(**
61+
The asynchronous sequence `tweetBatches` will when iterated, incrementally consume the entire tweet stream.
62+
63+
Next, suppose that the tweet filtering function makes a call to a web service which determines
64+
whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
65+
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it as follows:
66+
67+
*)
68+
let filterTweet (t: Tweet) : Async<bool> =
69+
failwith "TODO: call web service"
70+
71+
let filteredTweets : AsyncSeq<Tweet> =
72+
tweetBatches
73+
|> AsyncSeq.concatSeq // flatten
74+
|> AsyncSeq.filterAsync filterTweet // filter
75+
(**
76+
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
77+
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
78+
79+
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
80+
We can store all filtered tweets as follows:
81+
82+
*)
83+
let storeTweet (t: Tweet) : Async<unit> =
84+
failwith "TODO: call database"
85+
86+
let storeFilteredTweets : Async<unit> =
87+
filteredTweets
88+
|> AsyncSeq.iterAsync storeTweet
89+
(**
90+
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
91+
it is a *representation* of the workflow which consists of reading batches of tweets, filtering them and storing them
92+
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
93+
succinctly declared and executed as follows:
94+
95+
*)
96+
AsyncSeq.unfoldAsync getTweetBatch 0
97+
|> AsyncSeq.concatSeq
98+
|> AsyncSeq.filterAsync filterTweet
99+
|> AsyncSeq.iterAsync storeTweet
100+
|> Async.RunSynchronously
101+
(**
102+
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
103+
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
104+
105+
### Comparison with seq<'T>
106+
107+
The central difference between `seq<'T>` and `AsyncSeq<'T>` can be illustrated by introducing the notion of time.
108+
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
109+
running IO-bound operations from within a `seq<'T>` will *block* the thread which calls `MoveNext` on the
110+
corresponding `IEnumerator`. An `AsyncSeq` on the other hand can use facilities provided by the F# `Async` type to make
111+
more efficient use of system resources.
112+
113+
*)
114+
let withTime = seq {
115+
Thread.Sleep(1000) // calling thread will block
116+
yield 1
117+
Thread.Sleep(1000) // calling thread will block
118+
yield 1
119+
}
120+
121+
let withTime' = asyncSeq {
122+
do! Async.Sleep 1000 // non-blocking sleep
123+
yield 1
124+
do! Async.Sleep 1000 // non-blocking sleep
125+
yield 2
126+
}
127+
(**
128+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead,
129+
the *continuation* of the sequence will be scheduled by `Async` while the calling thread will be free to perform other work.
130+
Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>` where subsequent elements are retrieved
131+
in a blocking manner.
132+
133+
### Performance Considerations
134+
135+
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
136+
that this will improve the overall performance of an application. Note however that an async computation does not *require* a
137+
non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming
138+
an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
139+
benefits, it can make a difference in some scenarios.
140+
141+
## Related Articles
142+
143+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
144+
145+
146+
*)
147+

0 commit comments

Comments
 (0)