Kiedyś już chciałem poprowadzić cykl wpisów o Reactive Extensions i nawet napisałem pierwszy post wprowadzający do interfejsów IObservable, IObserver. Niestety po drodze przytrafiły się tematy które chciałem najpierw opisać i na końcu zrezygnowałem z tego. W między czasie kilka osób pytało o ten cykl ale nie widziałem sensu ponieważ Maciej Zbrzezny już wykonał kawał dobrej roboty i opisał to na swoim blogu w bardzo szczegółowy sposób. Dzisiaj postanowiłem jednak napisać kilka postów o RX po swojemu, z trochę innej perspektywy – może komuś przyda to się, taką mam nadzieje. Ponadto oprócz podstaw RX chciałbym pokazać jak pisać bardziej zaawansowane rzeczy jak np. rekurencyjny scheduler, który ostatnio wykorzystałem do odpytywania bazy danych o strukturze rekurencyjnej (korzystamy z OPC Framework).
Najpierw polecam przeczytanie wpisu o IObservable oraz IObserver. Następnie proszę ściągnąć i zainstalować sam framework w wersji 1.0.
Dzisiaj postaram się odpowiedzieć na pytanie: dlaczego potrzebujemy Reactive Extensions? Jak wspomniałem RX opiera się na interfejsach IObservable, IObserver co oznacza, że RX korzysta z kolekcji push-based a nie wymagających odpytywania (pull-based collections). Najbardziej znanym źródłem danych typu “pull-based” w .NET jest po prostu IEnumerable. Aby uzyskać takie dane należy odpytywać poszczególne elementy tzn.:
string[]elements=new []{"A", "B"}; foreach(string element in elements) { Console.WriteLine(element); }
Dla powyższego przykładu IEnumerable jest doskonały, problemy pojawią się gdy mamy do czynienia z czasochłonną operacją jak np. czytanie z web service. Na przykład taki kod byłby bardzo zły:
foreach(string element in _webService.GetElements()) { Console.WriteLine(element); }
GetElements prawdopodobnie potrzebuje kilka sekund na wykonanie a to spowoduje zablokowanie wątku. W takich przykładach lepiej stosować kolekcje typu “push-based”, które opierają się na powiadomieniach (“wpychaniu” danych w strumień). W .NET są już one dobrze znane i można wymienić następujące przykłady:
- Zdarzenia, delegaty.
- ObservableCollection (oparte na zdarzeniach).
- Timer (również oparte na zdarzeniach).
- Metody asynchroniczne oparte na Begin\End (dziś wypierane przez async w .NET 4.5 ale wciąż bardzo często spotykane)
- Wątki (Task, Thread) i wszelkie callback’i
Jak do powyższych rozważań ma się RX? RX to ujednolicona architektura, która pozwala połączyć różne kolekcje push-based w jeden mechanizm. Łatwo zauważyć, że w .NET inaczej obsługuje się zdarzenia a inaczej callback’i a jeszcze inaczej wzorzec asynchroniczny Begin\End. RX traktuje wszystkie źródła danych (nieważne czy jest to kolekcja czy po prostu zdarzenie) jako kolekcje danych. Dzięki temu łatwe jest operowanie na zdarzeniach w których często potrzebujemy kilka ostatnich wartości. Na przykład aby narysować linie tak jak to Microsoft’owy Paint robi, należy pobrać aktualną i poprzednią pozycję kursora na ekranie. W przypadku zdarzeń jest to niemożliwe i należy ręcznie stworzyć pole w klasie, które służy jako bufor. W przypadku RX, traktujemy zdarzenia jak kolekcje i wtedy zwrócenie dwóch ostatnich elementów (dwóch ostatnich pozycji kursora) jest bardzo łatwe.
Ponadto RX oparty jest na języku zapytań LINQ. Z tego względu zwrócenie specyficznej pozycji kursora sprowadza się do jednego zapytania LINQ a nie buforowania serii wartości i tworzenia IF. Jak wspomniałem RX to ujednolicona architektura dla kolekcji push-based. Umożliwia to połączenie wiele źródeł danych (web services, zdarzenia, Begin\End) w jedno zapytanie LINQ. Przydatne to jest gdy należy np. pobrać identyfikatory z jednej bazy danych, następnie wykorzystać je jako klucze do przeczytania wartości z drugiej bazy. Dzisiaj należy obsłużyć to samemu poprzez callback’i, które zawsze są trudne w obsłudze a jeszcze trudniejsze w czytaniu. Innymi słowy, RX umożliwia synchronizacje wielu źródeł danych za pomocą jednego zapytania LINQ, które opisuje całe wykonywane zadanie. Zamiast skakać z jednej metody do drugiej tworzymy jedno (czasami długie) zapytanie, które wygląda jak zdanie opisujące daną akcję.
RX wspiera wiele platform:
-
.NET Framework
-
WPF\Silverlight – m.in. specjalne biblioteki ułatwiające synchronizacje z UI.
-
Windows Phone
-
JavaScript
To co warto zauważyć kolekcje push-based oraz pull-based służą do dokładnie tego samego – dostępu do danych. W Internecie można nawet znaleźć matematyczną transformację pomiędzy interfejsami IEnumerable\IEnumerator do IObservable\IObserver:
public interface IEnumerable<T> { IEnumerator<T> GetEnumerator(); } public interface IEnumerator<T>: IDisposable { T Current { get; } bool MoveNext(); void Reset(); } // równoważne do public interface IObservable<T> { IDisposable Subscribe( IObserver<T>); } public interface IObserver<T> { void OnNext(T); void OnError(Exception); void OnCompleted (); }
Nie chcę na blogu pokazywać jak kro po kroku wygląda dowód matematyczny ale zobaczmy jak to wygląda po prostu od strony użytkownika. Dla pull-based odpytywanie danych wygląda oczywiście następująco:
string[]elements=new []{"A","B"}; foreach(string element in elements) { Console.WriteLine(element); }
Dla IObservable\IObserver wygląda to:
string[] elements = new[] {"A", "B"}; IObservable<string> observableElements = elements.ToObservable(); observableElements.Subscribe(Console.WriteLine);
Oba przykłady wyświetlą na ekranie taki sam wynik. IObserver\IObservable są interfejsami należącymi do .NET Framework ale z kolei metoda ToObservable to już RX Extensions. RX to zestaw rozszerzeń, które ułatwiają pracę z IObservable\IObserver. Dzięki nim możemy np. skonwertować zdarzenia .NET do IObservable. Analogicznie Metoda Observable.ToObservable służy do skonwertowania IEnumerable do IObservable. RX to nic innego jak zestaw helper’ów i metod rozszerzających.
Jak widać korzystanie IObservable sprowadza się do kilku kroków:
-
Stworzenia źródła danych IObservable. W powyższym fragmencie wykorzystano metodę ToObservable zamieniającą IEnumerable do tego interfejsu. W RX Istnieją jeszcze inne metody do konwersji zdarzeń czy wzorca Begin\End – o tym w następnych postach.
-
Implementacji IObserver, który zawiera trzy metody OnNext, OnError, OnCompleted. Jest to etap opcjonalny bo można użyć wyrażenia lambda jak to zostało przedstawione w powyższym fragmencie kodu.
-
Dokonanie subskrypcji za pomocą metody Subscribe. Można użyć klasy stworzonej w punkcie drugim albo po prostu wyrażeń lambda.
Jeśli krok drugi nie jest jasny to proszę rozważyć następujący kod:
class CustomObserver:IObserver<string> { public void OnNext(string value) { Console.WriteLine("Nastepny element:{0}",value); } public void OnError(Exception error) { Console.WriteLine("Błąd:{0}",error.Message); } public void OnCompleted() { Console.WriteLine("Koniec danych"); } } internal class Program { private static void Main(string[] args) { string[] elements = new[] {"A", "B"}; IObservable<string> observableElements = elements.ToObservable(); // z użyciem IObserver observableElements.Subscribe(new CustomObserver()); // z użyciem lambda observableElements.Subscribe( value => Console.WriteLine("Nastepny element:{0}", value), error => Console.WriteLine("Błąd:{0}", error.Message), () => Console.WriteLine("Koniec danych")); } }
Proszę zauważyć, że jest to lepsze rozwiązanie niż zwykle zdarzenia w .NET. Oprócz czystych danych otrzymywanych przez OnNext dostajemy informację o ewentualnych błędach (OnError) oraz zakończeniu strumienia danych OnCompleted. Oczywiście nie wszystkie źródła danych mają swój koniec np. MouseMove zawsze będzie wywoływało OnNext ponieważ w każdej chwili użytkownik może poruszyć myszką. W przypadku jednak Begin\End czy powyższego przykładu z elementami “A”,”B”, OnCompleted zostanie wywołany bardzo szybko.
W następnym poście przedstawię więcej metod rozszerzających RX pozwalających na tworzenie bardziej skomplikowanych źródeł danych niż zwykła konwersja IEnumerable->IObservable. Wiem, że nie jest to na razie zbyt praktyczne ale trzeba przebrnąć przez te podstawy.