Reactive Extensions–wprowadzenie

Kiedyś już chciałem poprowadzić cykl wpisów o Reactive Extensions i nawet napisałem pierwszy post wprowadzający do interfejsów IObservable, IObserver. Niestety po drodze przytrafiły się tematy które chciałem najpierw opisać i na końcu zrezygnowałem z tego. W między czasie kilka osób pytało o ten cykl ale nie widziałem sensu ponieważ Maciej Zbrzezny już wykonał kawał dobrej roboty i  opisał to na swoim blogu w bardzo szczegółowy sposób. Dzisiaj postanowiłem jednak napisać kilka postów o RX po swojemu, z trochę innej perspektywy – może komuś przyda to się, taką mam nadzieje. Ponadto oprócz podstaw RX chciałbym pokazać jak pisać bardziej zaawansowane rzeczy jak np. rekurencyjny scheduler, który ostatnio wykorzystałem do odpytywania bazy danych o strukturze rekurencyjnej (korzystamy z OPC Framework).

Najpierw polecam przeczytanie wpisu o IObservable oraz IObserver. Następnie proszę ściągnąć i zainstalować sam framework w wersji 1.0.

Dzisiaj postaram się odpowiedzieć na pytanie: dlaczego potrzebujemy Reactive Extensions? Jak wspomniałem RX opiera się na interfejsach IObservable, IObserver co oznacza, że RX korzysta z kolekcji push-based a nie wymagających odpytywania (pull-based collections). Najbardziej znanym źródłem danych  typu “pull-based” w .NET jest po prostu IEnumerable. Aby uzyskać takie dane należy odpytywać poszczególne elementy tzn.:

string[]elements=new []{"A", "B"};
foreach(string element in elements)
{
      Console.WriteLine(element);
}

Dla powyższego przykładu IEnumerable jest doskonały, problemy pojawią się gdy mamy do czynienia z czasochłonną operacją jak np. czytanie z web service. Na przykład taki kod byłby bardzo zły:

foreach(string element in _webService.GetElements())
{
      Console.WriteLine(element);
}

GetElements prawdopodobnie potrzebuje kilka sekund na wykonanie a to spowoduje zablokowanie wątku. W takich przykładach lepiej stosować kolekcje typu “push-based”, które opierają się na powiadomieniach (“wpychaniu” danych w strumień). W .NET są już one dobrze znane i można wymienić następujące przykłady:

  1. Zdarzenia, delegaty.
  2. ObservableCollection (oparte na zdarzeniach).
  3. Timer (również oparte na zdarzeniach).
  4. Metody asynchroniczne oparte na Begin\End (dziś wypierane przez async w .NET 4.5 ale wciąż bardzo często spotykane)
  5. Wątki (Task, Thread) i wszelkie callback’i

Jak do powyższych rozważań ma się RX? RX to ujednolicona architektura, która pozwala połączyć różne kolekcje push-based w jeden mechanizm. Łatwo zauważyć, że w .NET inaczej obsługuje się zdarzenia a inaczej callback’i a jeszcze inaczej wzorzec asynchroniczny Begin\End. RX traktuje wszystkie źródła danych (nieważne czy jest to kolekcja czy po prostu zdarzenie) jako kolekcje danych. Dzięki temu łatwe jest operowanie na zdarzeniach w których często potrzebujemy kilka ostatnich wartości. Na przykład aby narysować linie tak jak to Microsoft’owy Paint robi, należy pobrać aktualną i poprzednią pozycję kursora na ekranie. W przypadku zdarzeń jest to niemożliwe i należy ręcznie stworzyć pole w klasie, które służy jako bufor. W przypadku RX, traktujemy zdarzenia jak kolekcje i wtedy zwrócenie dwóch ostatnich elementów (dwóch ostatnich pozycji kursora) jest bardzo łatwe.

Ponadto RX oparty jest na języku zapytań LINQ. Z tego względu zwrócenie specyficznej pozycji kursora sprowadza się do jednego zapytania LINQ a nie buforowania serii wartości i tworzenia IF. Jak wspomniałem RX to ujednolicona architektura dla kolekcji push-based. Umożliwia to połączenie wiele źródeł danych (web services, zdarzenia, Begin\End) w jedno zapytanie LINQ. Przydatne to jest gdy należy np. pobrać identyfikatory z jednej bazy danych, następnie wykorzystać je jako klucze do przeczytania wartości z drugiej bazy. Dzisiaj należy obsłużyć to samemu poprzez callback’i, które zawsze są trudne w obsłudze a jeszcze trudniejsze w czytaniu. Innymi słowy, RX umożliwia synchronizacje wielu źródeł danych za pomocą jednego zapytania LINQ, które opisuje całe wykonywane zadanie. Zamiast skakać z jednej metody do drugiej tworzymy jedno (czasami długie) zapytanie, które wygląda jak zdanie opisujące daną akcję.

RX wspiera wiele platform:

  1. .NET Framework
  2. WPF\Silverlight – m.in. specjalne biblioteki ułatwiające synchronizacje z UI.
  3. Windows Phone
  4. JavaScript

To co warto zauważyć kolekcje push-based oraz pull-based służą do dokładnie tego samego – dostępu do danych. W Internecie można nawet znaleźć matematyczną transformację pomiędzy interfejsami IEnumerable\IEnumerator do IObservable\IObserver:

public interface IEnumerable<T>
{
    IEnumerator<T> GetEnumerator();
}

public interface IEnumerator<T>: IDisposable
{
    T Current { get; }
    bool MoveNext();
    void Reset();
}

// równoważne do

public interface IObservable<T>
{
    IDisposable Subscribe( IObserver<T>);
}
public interface IObserver<T>
{
    void OnNext(T);
    void OnError(Exception);
    void OnCompleted ();
}

Nie chcę na blogu pokazywać jak kro po kroku wygląda dowód matematyczny ale zobaczmy jak to wygląda po prostu od strony użytkownika. Dla pull-based odpytywanie danych wygląda oczywiście następująco:

string[]elements=new []{"A","B"};
foreach(string element in elements)
{
    Console.WriteLine(element);
}

Dla IObservable\IObserver wygląda to:

string[] elements = new[] {"A", "B"};
IObservable<string> observableElements = elements.ToObservable();
observableElements.Subscribe(Console.WriteLine);

Oba przykłady wyświetlą na ekranie taki sam wynik. IObserver\IObservable są interfejsami należącymi do .NET Framework ale z kolei metoda ToObservable to już RX Extensions. RX to zestaw rozszerzeń, które ułatwiają pracę z IObservable\IObserver. Dzięki nim możemy np. skonwertować zdarzenia .NET do IObservable. Analogicznie Metoda Observable.ToObservable służy do skonwertowania IEnumerable do IObservable. RX to nic innego jak zestaw helper’ów i metod rozszerzających.

Jak widać korzystanie IObservable sprowadza się do kilku kroków:

  1. Stworzenia źródła danych IObservable. W powyższym fragmencie wykorzystano metodę ToObservable zamieniającą IEnumerable do tego interfejsu. W RX Istnieją  jeszcze inne metody do konwersji zdarzeń czy wzorca Begin\End – o tym w następnych postach.
  2. Implementacji IObserver, który zawiera trzy metody OnNext, OnError, OnCompleted. Jest to etap opcjonalny bo można użyć wyrażenia lambda jak to zostało przedstawione w powyższym fragmencie kodu.
  3. Dokonanie subskrypcji za pomocą metody Subscribe.  Można użyć klasy stworzonej w punkcie drugim albo po prostu wyrażeń lambda.

Jeśli krok drugi nie jest jasny to proszę rozważyć następujący kod:

class CustomObserver:IObserver<string>
{
    public void OnNext(string value)
    {
        Console.WriteLine("Nastepny element:{0}",value);
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("Błąd:{0}",error.Message);
    }

    public void OnCompleted()
    {
        Console.WriteLine("Koniec danych");
    }
}
internal class Program
{
    private static void Main(string[] args)
    {
        string[] elements = new[] {"A", "B"};
        IObservable<string> observableElements = elements.ToObservable();

        // z użyciem IObserver
        observableElements.Subscribe(new CustomObserver());
     
        // z użyciem lambda
        observableElements.Subscribe(
            value => Console.WriteLine("Nastepny element:{0}", value),
            error => Console.WriteLine("Błąd:{0}", error.Message),
            () => Console.WriteLine("Koniec danych"));
    }
}

Proszę zauważyć, że jest to lepsze rozwiązanie niż zwykle zdarzenia w .NET. Oprócz czystych danych otrzymywanych przez OnNext dostajemy informację o ewentualnych błędach (OnError) oraz zakończeniu strumienia danych OnCompleted. Oczywiście nie wszystkie źródła danych mają swój koniec np. MouseMove zawsze będzie wywoływało OnNext ponieważ w każdej chwili użytkownik może poruszyć myszką. W przypadku jednak Begin\End czy powyższego przykładu z elementami “A”,”B”, OnCompleted zostanie wywołany bardzo szybko.

W następnym poście przedstawię więcej metod rozszerzających RX pozwalających na tworzenie bardziej skomplikowanych źródeł danych niż zwykła konwersja IEnumerable->IObservable. Wiem, że nie jest to na razie zbyt praktyczne ale trzeba przebrnąć przez te podstawy.

3 thoughts on “Reactive Extensions–wprowadzenie”

Leave a Reply

Your email address will not be published.