F# async + event-based asynchronous pattern
Сегодня предлагаю поговорить о модели асинхронного программирования в .NET, основанной на событиях (event-based asynchronous pattern). Если вкратце, то всё очень просто:
static void AsyncDownloadGoogle() {
var uri = new Uri("http://google.com");
var client = new WebClient();
DownloadDataCompletedEventHandler completed = null;
completed = (_, e) => {
// обрабатываем результат операции
if (e.Cancelled) {
Console.WriteLine("Операция отменена!");
} else if (e.Error != null) {
Console.WriteLine("Ошибка: {0}", e.Error);
} else {
byte[] page = e.Result;
Console.WriteLine("Загружено {0} байт", page.Length);
}
// отписка от события
client.DownloadDataCompleted -= completed;
};
// подписываемся на результат операции
client.DownloadDataCompleted += completed;
// запускаем асинхронную операцию
client.DownloadDataAsync(uri);
// даём возможность отменить операцию
Console.WriteLine("Нажмите [esc] для отмены");
var key = Console.ReadKey(true);
if (key.Key == ConsoleKey.Escape) {
client.CancelAsync();
}
}
То есть основа паттерна – получения результатов асинхронной операции через аргументы события (наследник класса System.ComponentModel .AsyncCompletedEventArgs
), при этом существует возможность определить, что операция была отменена или завершена с ошибкой. Нюансы возникают в двух случаях:
- Если экземпляр класса, предоставляющего асинхронную операцию, планируется переиспользовать и результаты обрабатывать другим обработчиком, то следует отписывать обработчики после завершения операции (что может приводить к лишним телодвижениям, если вы подписываетесь анонимным методом или лямбда-выражением, как в примере выше).
- Экземпляр класса, предоставляющий асинхронную операцию, может допускать одновременное исполнение некоторой операции (класс
System.Net.WebClient
, к сожалению, не из таких). В этом случае все методы (запуск, отмена асинхронной операции) должны обладать перегрузками, принимающими дополнительный параметр видаobject userState
, который позволяет отличать независимые операции друг от друга (обработчики результатов должны проверять значение свойстваUserState
в аргументе события).
Возникает вопрос, как всё это дело использовать в F# и обязательно ли асинхронные операции должны выглядеть так же ущербно? На помощь приходят F# async workflows, позволяющие записывать асинхронные операции так же лаконично, как синхронные.
Однако, чтобы использовать асинхронные операции внутри async workflow, требуются специальные метод, запускающие асинхронные операции и возвращающие объекты типа Async<’a>
, представляющие собой некое асинхронное вычисление. В стандартную библиотеку F# входит несколько методов-расширений подобного рода, предназначенных для некоторых стандартных классов .NET, а так же метод Async.FromBeginEnd()
позволяющие получить Async<’a>
из асинхронных операций, заданных в виде пары методов Begin и End (старый добрый APM-шаблон).
Я это всё к тому, что event-based asynchronous pattern конечно же забыли, поэтому предлагаю вашему вниманию пару методов-расширений, предназначенных для преобразования асинхронных операций, выполненных в рамках данного паттерна, в родной для F# тип Async<’a>
(получился неплохой пример применения метода Async.FromContinuations
, надеюсь, комментариев будет достаточно):
module AsyncExtensions
open System
open System.ComponentModel
#nowarn "40"
type Async with
/// Преобразует асинхронную операцию, заданную в виде метода
/// запуска и события завершения (event-based asynchronous
/// pattern) в асинхронное вычисление F# async
static member FromEventPattern
(completedEvent : IObservable<_>, // событие завершения
executeAction : unit -> unit, // запуск операции
?cancelAction : unit -> unit) = // отмена операции
// функция запуска асинхронной операции
let comp (onValue, onError, onCancel) =
let onCancel () =
onCancel (OperationCanceledException())
// подписываемся на событие завершения операции
let rec subscription : IDisposable =
completedEvent.Subscribe {
new IObserver<#AsyncCompletedEventArgs> with
// если событие было возбуждено, проверяем статус
member x.OnNext(args) =
use __ = subscription // отписываемся при выходе
if args.Cancelled then onCancel ()
elif args.Error = null then onValue args
else onError args.Error
// для обычных событий никогда не будет вызываться,
// но для любых IObservable<_> лучше предусмотреть
member x.OnError(exc) =
use __ = subscription in onError exc
member x.OnCompleted() =
use __ = subscription in onCancel ()
}
try executeAction () // и запускаем асинхронную операцию
with _ ->
use __ = subscription // если запуск упадёт,
reraise () // то сразу отписываемся
// формируем асинхронное вычисление
let operation = Async.FromContinuations comp
match cancelAction with // если указали метод отмены,
| Some action -> // то оборачиваем в Async.OnCancel
async { use! __ = Async.OnCancel action
return! operation }
| None -> operation
/// Преобразует асинхронную операцию, заданную в виде метода
/// запуска и события завершения (event-based asynchronous
/// pattern) и поддерживающую несколько одновременных вызовов
/// в асинхронное вычисление F# async
static member FromEventPattern
(completedEvent : IObservable<_>, // событие завершения
executeAction : obj -> unit, // ф-ция запуск операции
?cancelAction : obj -> unit, // ф-ция отмены операции
?userToken : obj) = // идентификатор операции
// если идентификатор операции не задан, то создаём новый
let token = match userToken with Some token -> token
| None -> new obj()
// если задан метод отмены операции
let cancel = Option.map (fun f () -> f token) cancelAction
Async.FromEventPattern<#AsyncCompletedEventArgs>(
completedEvent = // фильтруем события
Observable.filter // по идентификатору
(fun e -> e.UserState = token) completedEvent,
?cancelAction = cancel,
executeAction = fun() -> executeAction token)
Теперь можно очень легко определить тип-расширение для операции DownloadData
класса WebClient
(обратите внимание на соглашение об именовании подобных методов – префикс Async
):
type WebClient with
member this.AsyncDownloadData(uri: Uri) =
Async.FromEventPattern(
this.DownloadDataCompleted,
(fun()-> this.DownloadDataAsync uri),
(fun()-> this.CancelAsync()))
Можно дополнительно преобразовывать результат операции, доставая из аргументов события результат операции:
type WebClient with
member this.AsyncDownloadData(uri: Uri) =
async {
let! e = Async.FromEventPattern(
this.DownloadDataCompleted,
(fun()-> this.DownloadDataAsync uri),
(fun()-> this.CancelAsync()))
return e.Result
}
Теперь исходный пример можно выразить на F# следующим образом:
open System
open System.Net
open System.Threading
let asyncDownloadGoogle() =
let uri = Uri("http://google.com")
let client = new WebClient()
use token = new CancellationTokenSource()
let work = async {
// обработчик отмены async workflow
use! cancel = Async.OnCancel (fun() ->
printfn "Операция отменена!")
// вызов асинхронной операции и работа с результатом
try let! page = client.AsyncDownloadData(uri)
printfn "Загружено %d байт" page.Length
// обработка асинхронных ошибок
with e -> printfn "Ошибка: %s" e.Message
}
Async.RunSynchronously(work, cancellationToken = token.Token)
let key = Console.ReadKey true
if (key.Key = ConsoleKey.Escape) then token.Cancel()
Использование CancellationTokenSource
выглядит не очень симпатично (определение обработчика внутри workflow), однако это мощный и обобщённый механизм отмены асинхронных операций, при этом от пользователя скрывается передача токена по всему workflow, что существенно упрощает код.