Skip to content

Tarmil/FSharp.Control.TaskSeq

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

195 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

build test

TaskSeq

An implementation IAsyncEnumerable<'T> as a taskSeq CE for F# with accompanying TaskSeq module.

The IAsyncEnumerable interface was added to .NET in .NET Core 3.0 and is part of .NET Standard 2.1. The main use-case was for iterative asynchronous enumeration over some resource. For instance, an event stream or a REST API interface with pagination, where each page is a MoveNextAsync call on the IAsyncEnumerator<'T> given by a call to GetAsyncEnumerator(). It has been relatively challenging to work properly with this type and dealing with each step being asynchronous, and the enumerator implementing IAsyncDisposable as well, which requires careful handling.


Table of contents


Short-term feature planning

Not necessarily in order of importance:

  • Stabilize and battle-test taskSeq resumable code. DONE
  • A growing set of module functions TaskSeq, see below for progress. DONE & IN PROGRESS
  • Packaging and publishing on Nuget, PLANNED: 13 November 2022.
  • Add Async variants for functions taking HOF arguments. DONE
  • Add generated docs to https://fsprojects.github.io
  • Expand surface area based on AsyncSeq.
  • User requests?

Implementation progress

taskSeq CE

The resumable state machine backing the taskSeq CE is now finished. Focus is now on adding functionality there, like adding more useful overloads for yield and let!. Suggestions are welcome!

TaskSeq module functions

We are working hard on getting a full set of module functions on TaskSeq that can be used with IAsyncEnumerable sequences. Our guide is the set of F# Seq functions in F# Core and, where applicable, the functions provided from AsyncSeq. Each implemented function is documented through XML doc comments to provide the necessary context-sensitive help.

The following is the progress report:

Implemented Seq function TaskSeq function Extra async variant Remarks
allPairs allPairs
append append
average averageBy averageByAsync
cache cache
cast cast
box
unbox
choose choose chooseAsync
chunkBySize chunkBySize
collect collect collectAsync
compareWith compareWith compareWithAsync
concat concat
contains contains
delay delay
distinct distinct
distinctBy dictinctBy distinctByAsync
empty empty
exactlyOne exactlyOne
except except
exists exists
exists2 exists2
filter filter filterAsync
find find findAsync
not planned findBack iteration from back not possible
findIndex findIndex findIndexAsync
not planned findIndexBack n/a n/a iteration from back not possible
fold fold foldAsync
fold2 fold2 fold2Async
not planned foldBack iteration from back not possible
not planned foldBack2 iteration from back not possible
forall forall forallAsync
forall2 forall2 forall2Async
maybe groupBy groupBy groupByAsync
head head
indexed indexed
init init initAsync
initInfinite initInfinite initInfiniteAsync
insertAt insertAt
insertManyAt insertManyAt
isEmpty isEmpty
item item
iter iter iterAsync
iter2 iter2 iter2Async
iteri iteri iteriAsync
iteri2 iteri2 iteri2Async
last last
length length
lengthBy lengthByAsyn
map map mapAsync
map2 map2 map2Async
map3 map3 map3Async
mapFold mapFold mapFoldAsync
not planned mapFoldBack iteration from back not possible
mapi mapi mapiAsync
mapi2 mapi2 mapi2Async
max max
maxBy maxBy maxByAsync
min min
minBy minBy minByAsync
ofArray ofArray
ofAsyncArray
ofAsyncList
ofAsyncSeq
ofList ofList
ofTaskList
ofResizeArray
ofSeq
ofTaskArray
ofTaskList
ofTaskSeq
pairwise pairwise
permute permute permuteAsync
pick pick pickAsync
not planned readOnly all TaskSeq sequences are readonly
reduce reduce reduceAsync
not planned reduceBack iteration from back not possible
removeAt removeAt
removeManyAt removeManyAt
replicate replicate
maybe rev
scan scan scanAsync
not planned scanBack iteration from back not possible
singleton singleton
skip skip
skipWhile skipWhile skipWhileAsync
maybe sort
maybe sortBy
maybe sortByAscending
maybe sortByDescending
maybe sortWith
splitInto splitInto
sum sum
sumBy sumBy sumByAsync
tail tail
take take
takeWhile takeWhile takeWhileAsync
toArray toArray toArrayAsync
toIList toIListAsync
toList toList toListAsync
toResizeArray toResizeArrayAsync
toSeq toSeqAsync
[…] more convenience conversions considered
maybe transpose
truncate truncate
tryExactlyOne tryExactlyOne tryExactlyOneAsync
tryFind tryFind tryFindAsync
not planned tryFindBack iteration from back not possible
tryFindIndex tryFindIndex tryFindIndexAsync
not planned tryFindIndexBack iteration from back not possible
tryHead tryHead
tryItem tryItem
tryLast tryLast
tryPick tryPick tryPickAsync
unfold unfold unfoldAsync
updateAt updateAt
where where whereAsync
windowed windowed
zip zip
zip3 zip3
zip4

Futher reading IAsyncEnumerable

  • A good C#-based introduction can be found in this blog.
  • An MSDN article written shortly after it was introduced.
  • Converting a seq to an IAsyncEnumerable demo gist as an example, though TaskSeq contains many more utility functions and uses a slightly different approach.
  • If you're looking for using IAsyncEnumerable with async and not task, the excellent AsyncSeq library should be used. While TaskSeq is intended to consume async just like task does, it won't create an AsyncSeq type (at least not yet). If you want classic Async and parallelism, you should get this library instead.

Futher reading on resumable state machines

Further reading on computation expressions

Building & testing

TLDR: just run build. Or load the sln file in Visual Studio or VS Code and compile.

Prerequisites

  • .NET 6 or .NET 7 Preview
  • F# 6.0 or 7.0 compiler
  • To use build.cmd, the dotnet command must be accessible from your path.

Just check-out this repo locally. Then, from the root of the repo, you can do:

Build the solution

build [build] [release|debug]

With no arguments, defaults to release.

Run the tests

build test [release|debug]

With no arguments, defaults to release. By default, all tests are output to the console. If you don't want that, you can use --logger console;verbosity=summary. Furthermore, no TRX file is generated and the --blame-xxx flags aren't set.

Run the CI command

build ci [release|debug]

With no arguments, defaults to release. This will run dotnet test with the --blame-xxx settings enabled to prevent hanging tests caused by an xUnit runner bug.

There are no special CI environment variables that need to be set for running this locally.

Advanced

You can pass any additional options that are valid for dotnet test and dotnet build respectively. However, these cannot be the very first argument, so you should either use build build --myadditionalOptions fizz buzz, or just specify the build-kind, i.e. this is fine:

build debug --verbosity detailed
build test --logger console;verbosity=summary

At this moment, additional options cannot have quotes in them.

Command modifiers, like release and debug, can be specified with - or / if you so prefer: dotnet build /release.

Get help (duh!)

build help

For more info, see this PR: fsprojects#29.

In progress!!!

It's based on Don Symes taskSeq.fs but expanded with useful utility functions and a few extra binding overloads.

Current set of TaskSeq utility functions

The following is the current surface area of the TaskSeq utility functions. This is just a dump of the signatures with doc comments to be used as a quick ref.

module TaskSeq =
    open System.Collections.Generic
    open System.Threading.Tasks
    open FSharp.Control.TaskSeqBuilders

    /// Initialize an empty taskSeq.
    val empty<'T> : taskSeq<'T>

    /// <summary>
    /// Returns <see cref="true" /> if the task sequence contains no elements, <see cref="false" /> otherwise.
    /// </summary>
    val isEmpty: taskSeq: taskSeq<'T> -> Task<bool>

    /// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
    val toList: t: taskSeq<'T> -> 'T list

    /// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
    val toArray: taskSeq: taskSeq<'T> -> 'T[]

    /// Returns taskSeq as a seq, similar to Seq.cached. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
    val toSeqCached: taskSeq: taskSeq<'T> -> seq<'T>

    /// Unwraps the taskSeq as a Task<array<_>>. This function is non-blocking.
    val toArrayAsync: taskSeq: taskSeq<'T> -> Task<'T[]>

    /// Unwraps the taskSeq as a Task<list<_>>. This function is non-blocking.
    val toListAsync: taskSeq: taskSeq<'T> -> Task<'T list>

    /// Unwraps the taskSeq as a Task<ResizeArray<_>>. This function is non-blocking.
    val toResizeArrayAsync: taskSeq: taskSeq<'T> -> Task<ResizeArray<'T>>

    /// Unwraps the taskSeq as a Task<IList<_>>. This function is non-blocking.
    val toIListAsync: taskSeq: taskSeq<'T> -> Task<IList<'T>>

    /// Unwraps the taskSeq as a Task<seq<_>>. This function is non-blocking,
    /// exhausts the sequence and caches the results of the tasks in the sequence.
    val toSeqCachedAsync: taskSeq: taskSeq<'T> -> Task<seq<'T>>

    /// Create a taskSeq of an array.
    val ofArray: array: 'T[] -> taskSeq<'T>

    /// Create a taskSeq of a list.
    val ofList: list: 'T list -> taskSeq<'T>

    /// Create a taskSeq of a seq.
    val ofSeq: sequence: seq<'T> -> taskSeq<'T>

    /// Create a taskSeq of a ResizeArray, aka List.
    val ofResizeArray: data: ResizeArray<'T> -> taskSeq<'T>

    /// Create a taskSeq of a sequence of tasks, that may already have hot-started.
    val ofTaskSeq: sequence: seq<#Task<'T>> -> taskSeq<'T>

    /// Create a taskSeq of a list of tasks, that may already have hot-started.
    val ofTaskList: list: #Task<'T> list -> taskSeq<'T>

    /// Create a taskSeq of an array of tasks, that may already have hot-started.
    val ofTaskArray: array: #Task<'T> array -> taskSeq<'T>

    /// Create a taskSeq of a seq of async.
    val ofAsyncSeq: sequence: seq<Async<'T>> -> taskSeq<'T>

    /// Create a taskSeq of a list of async.
    val ofAsyncList: list: Async<'T> list -> taskSeq<'T>

    /// Create a taskSeq of an array of async.
    val ofAsyncArray: array: Async<'T> array -> taskSeq<'T>

    /// Iterates over the taskSeq applying the action function to each item. This function is non-blocking
    /// exhausts the sequence as soon as the task is evaluated.
    val iter: action: ('T -> unit) -> taskSeq: taskSeq<'T> -> Task<unit>

    /// Iterates over the taskSeq applying the action function to each item. This function is non-blocking,
    /// exhausts the sequence as soon as the task is evaluated.
    val iteri: action: (int -> 'T -> unit) -> taskSeq: taskSeq<'T> -> Task<unit>

    /// Iterates over the taskSeq applying the async action to each item. This function is non-blocking
    /// exhausts the sequence as soon as the task is evaluated.
    val iterAsync: action: ('T -> #Task<unit>) -> taskSeq: taskSeq<'T> -> Task<unit>

    /// Iterates over the taskSeq, applying the async action to each item. This function is non-blocking,
    /// exhausts the sequence as soon as the task is evaluated.
    val iteriAsync: action: (int -> 'T -> #Task<unit>) -> taskSeq: taskSeq<'T> -> Task<unit>

    /// Maps over the taskSeq, applying the mapper function to each item. This function is non-blocking.
    val map: mapper: ('T -> 'U) -> taskSeq: taskSeq<'T> -> taskSeq<'U>

    /// Maps over the taskSeq with an index, applying the mapper function to each item. This function is non-blocking.
    val mapi: mapper: (int -> 'T -> 'U) -> taskSeq: taskSeq<'T> -> taskSeq<'U>

    /// Maps over the taskSeq, applying the async mapper function to each item. This function is non-blocking.
    val mapAsync: mapper: ('T -> #Task<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>

    /// Maps over the taskSeq with an index, applying the async mapper function to each item. This function is non-blocking.
    val mapiAsync: mapper: (int -> 'T -> #Task<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>

    /// Applies the given function to the items in the taskSeq and concatenates all the results in order.
    val collect: binder: ('T -> #taskSeq<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>

    /// Applies the given function to the items in the taskSeq and concatenates all the results in order.
    val collectSeq: binder: ('T -> #seq<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>

    /// Applies the given async function to the items in the taskSeq and concatenates all the results in order.
    val collectAsync: binder: ('T -> #Task<'TSeqU>) -> taskSeq: taskSeq<'T> -> taskSeq<'U> when 'TSeqU :> taskSeq<'U>

    /// Applies the given async function to the items in the taskSeq and concatenates all the results in order.
    val collectSeqAsync: binder: ('T -> #Task<'SeqU>) -> taskSeq: taskSeq<'T> -> taskSeq<'U> when 'SeqU :> seq<'U>

    /// <summary>
    /// Returns the first element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence is empty.
    /// </summary>
    /// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
    val tryHead: taskSeq: taskSeq<'T> -> Task<'T option>

    /// <summary>
    /// Returns the first element of the <see cref="IAsyncEnumerable" />.
    /// </summary>
    /// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
    val head: taskSeq: taskSeq<'T> -> Task<'T>

    /// <summary>
    /// Returns the last element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence is empty.
    /// </summary>
    /// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
    val tryLast: taskSeq: taskSeq<'T> -> Task<'T option>

    /// <summary>
    /// Returns the last element of the <see cref="IAsyncEnumerable" />.
    /// </summary>
    /// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
    val last: taskSeq: taskSeq<'T> -> Task<'T>

    /// <summary>
    /// Returns the nth element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence
    /// does not contain enough elements, or if <paramref name="index" /> is negative.
    /// Parameter <paramref name="index" /> is zero-based, that is, the value 0 returns the first element.
    /// </summary>
    val tryItem: index: int -> taskSeq: taskSeq<'T> -> Task<'T option>

    /// <summary>
    /// Returns the nth element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence
    /// does not contain enough elements, or if <paramref name="index" /> is negative.
    /// </summary>
    /// <exception cref="ArgumentException">Thrown when the sequence has insufficient length or
    /// <paramref name="index" /> is negative.</exception>
    val item: index: int -> taskSeq: taskSeq<'T> -> Task<'T>

    /// <summary>
    /// Returns the only element of the task sequence, or <see cref="None" /> if the sequence is empty of
    /// contains more than one element.
    /// </summary>
    val tryExactlyOne: source: taskSeq<'T> -> Task<'T option>

    /// <summary>
    /// Returns the only element of the task sequence.
    /// </summary>
    /// <exception cref="ArgumentException">Thrown when the input sequence does not contain precisely one element.</exception>
    val exactlyOne: source: taskSeq<'T> -> Task<'T>

    /// <summary>
    /// Applies the given function <paramref name="chooser" /> to each element of the task sequence. Returns
    /// a sequence comprised of the results "x" for each element where
    /// the function returns <c>Some(x)</c>.
    /// If <paramref name="chooser" /> is asynchronous, consider using <see cref="TaskSeq.chooseAsync" />.
    /// </summary>
    val choose: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> taskSeq<'U>

    /// <summary>
    /// Applies the given asynchronous function <paramref name="chooser" /> to each element of the task sequence. Returns
    /// a sequence comprised of the results "x" for each element where
    /// the function returns <see cref="Some(x)" />.
    /// If <paramref name="chooser" /> does not need to be asynchronous, consider using <see cref="TaskSeq.choose" />.
    /// </summary>
    val chooseAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> taskSeq<'U>

    /// <summary>
    /// Returns a new collection containing only the elements of the collection
    /// for which the given <paramref name="predicate" /> function returns <see cref="true" />.
    /// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.filterAsync" />.
    /// </summary>
    val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

    /// <summary>
    /// Returns a new collection containing only the elements of the collection
    /// for which the given asynchronous function <paramref name="predicate" /> returns <see cref="true" />.
    /// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.filter" />.
    /// </summary>
    val filterAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>

    /// <summary>
    /// Applies the given function <paramref name="chooser" /> to successive elements of the task sequence
    /// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
    /// If <paramref name="chooser" /> is asynchronous, consider using <see cref="TaskSeq.tryPickAsync" />.
    /// </summary>
    val tryPick: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> Task<'U option>

    /// <summary>
    /// Applies the given asynchronous function <paramref name="chooser" /> to successive elements of the task sequence
    /// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
    /// If <paramref name="chooser" /> does not need to be asynchronous, consider using <see cref="TaskSeq.tryPick" />.
    /// </summary>
    val tryPickAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> Task<'U option>

    /// <summary>
    /// Returns the first element of the task sequence in <paramref name="source" /> for which the given function
    /// <paramref name="predicate" /> returns <see cref="true" />. Returns <see cref="None" /> if no such element exists.
    /// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.tryFindAsync" />.
    /// </summary>
    val tryFind: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<'T option>

    /// <summary>
    /// Returns the first element of the task sequence in <paramref name="source" /> for which the given asynchronous function
    /// <paramref name="predicate" /> returns <see cref="true" />. Returns <see cref="None" /> if no such element exists.
    /// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.tryFind" />.
    /// </summary>
    val tryFindAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<'T option>


    /// <summary>
    /// Applies the given function <paramref name="chooser" /> to successive elements of the task sequence
    /// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
    /// If <paramref name="chooser" /> is asynchronous, consider using <see cref="TaskSeq.pickAsync" />.
    /// <exception cref="KeyNotFoundException">Thrown when every item of the sequence
    /// evaluates to <see cref="None" /> when the given function is applied.</exception>
    /// </summary>
    val pick: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> Task<'U>

    /// <summary>
    /// Applies the given asynchronous function <paramref name="chooser" /> to successive elements of the task sequence
    /// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
    /// If <paramref name="chooser" /> does not need to be asynchronous, consider using <see cref="TaskSeq.pick" />.
    /// <exception cref="KeyNotFoundException">Thrown when every item of the sequence
    /// evaluates to <see cref="None" /> when the given function is applied.</exception>
    /// </summary>
    val pickAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> Task<'U>

    /// <summary>
    /// Returns the first element of the task sequence in <paramref name="source" /> for which the given function
    /// <paramref name="predicate" /> returns <see cref="true" />.
    /// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.findAsync" />.
    /// </summary>
    /// <exception cref="KeyNotFoundException">Thrown if no element returns <see cref="true" /> when
    /// evaluated by the <paramref name="predicate" /> function.</exception>
    val find: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<'T>

    /// <summary>
    /// Returns the first element of the task sequence in <paramref name="source" /> for which the given
    /// asynchronous function <paramref name="predicate" /> returns <see cref="true" />.
    /// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.find" />.
    /// </summary>
    /// <exception cref="KeyNotFoundException">Thrown if no element returns <see cref="true" /> when
    /// evaluated by the <paramref name="predicate" /> function.</exception>
    val findAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<'T>

    /// <summary>
    /// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException
    /// if the sequences are or unequal length.
    /// </summary>
    /// <exception cref="ArgumentException">The sequences have different lengths.</exception>
    val zip: taskSeq1: taskSeq<'T> -> taskSeq2: taskSeq<'U> -> IAsyncEnumerable<'T * 'U>

    /// <summary>
    /// Applies the function <paramref name="folder" /> to each element in the task sequence,
    /// threading an accumulator argument of type <paramref name="'State" /> through the computation.
    /// If the accumulator function <paramref name="folder" /> is asynchronous, consider using <see cref="TaskSeq.foldAsync" />.
    /// </summary>
    val fold: folder: ('State -> 'T -> 'State) -> state: 'State -> taskSeq: taskSeq<'T> -> Task<'State>

    /// <summary>
    /// Applies the asynchronous function <paramref name="folder" /> to each element in the task sequence,
    /// threading an accumulator argument of type <paramref name="'State" /> through the computation.
    /// If the accumulator function <paramref name="folder" /> does not need to be asynchronous, consider using <see cref="TaskSeq.fold" />.
    /// </summary>
    val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> taskSeq: taskSeq<'T> -> Task<'State>

About

A computation expression and module for seamless working with IAsyncEnumerable<'T> as if it is just another sequence

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • F# 98.6%
  • Batchfile 1.4%