Category Archives: Reactive Programming

Reactive Extensions: Zdarzenia w RX

Opisywałem już mechanizm konwersji zdarzeń .NET do IObservable. W RX istnieje dodatkowo nowy mechanizm, mający na celu zastąpić standardowe zdarzenia  .NET – przynajmniej w części przypadków. Zastanówmy się, co jest złego tak naprawdę w obsłudze zdarzeń w .NET?

1. Dość często programiści zapominają usunąć zdarzenie co może skutkować memory leak. Czasami jest ciężko zdefiniować moment, w którym należy usunąć zdarzenie. Pewnym rozwiązaniem problemu może być zastosowanie wzorca weak event pattern ale jak wiemy z archiwalnych wpisów, nie jest to też takie proste.

2. Zdarzenia, które zawierają anonimowe metody są szczególnie trudne do usunięcia. Na przykład:

internal class Program
{
   public static event EventHandler MessageReceived;
   private static void Main(string[] args)
   {
       MessageReceived += (sender,e) => Console.WriteLine(e.ToString());
   }
}

W jaki sposób usunąć powyższe zdarzenie? Niestety trzeba stworzyć tymczasową zmienną i ją przechowywać gdzieś:

public static event EventHandler MessageReceived;

private static void Main(string[] args)
{
  EventHandler action=(sender, e) => Console.WriteLine(e.ToString());

  MessageReceived += action;
  // disposing
  MessageReceived -= action;
}

3.  Jak już wspomniałem i pokazałem w jednym z poprzednich wpisów, nie ma możliwości odczytania poprzednich wartości, jakie zostały wygenerowane przez zdarzenie (np. poprzednia pozycja myszki).

4. Zdarzenia to właściwości a nie obiekty co czasami może być niewygodne.

5. Z punktu pisania testów jednostkowych, zdarzenia również nie są szczególnie łatwe do obsługi.

6. Niestety zdarzeń nie można traktować jako kolekcji, z tego względu nie ma możliwości wykorzystania zapytań  LINQ.

Dość narzekania, czas pokazać mechanizm jaki dostarcza RX. Mam nadzieję, że wszyscy są już obeznani z wzorcem projektowym Observer. Jeśli nie, polecam poczytanie o nim( np. na tym blogu, kiedyś o tym już pisałem). Znając zasady działania wzorca obserwator, mechanizm działania zdarzeń RX nie powinien zaskakiwać. Najważniejszym interfejsem jest ISubject, który wygląda następująco:

public interface ISubject<in TSource, out TResult> 
: IObserver<TSource>, IObservable<TResult>
{ 
}
public interface ISubject<T>  : ISubject<T, T>
{ 
}

Proszę zauważyć, że implementuje on zarówno IObserver jak IObservable. W praktyce oznacza to, że klasy implementujące ISubject, mogą zarówno publikować jak i dokonywać subskrypcji (nasłuchiwać). Przykład:

internal class Program
{
   public static  Subject<string> MessageReceived=new Subject<string>();

   private static void Main(string[] args)
   {            
       MessageReceived.Subscribe(Console.WriteLine);
       MessageReceived.OnNext("Hello World");
       Console.ReadLine();
   }
}

W zwykłych zdarzeniach, operator += służy do podłączenia metod. W RX analogiczną funkcję pełni metoda Subscribe. Zasada działania jest taka sama, jak w przypadku dobrze znanego IObservable ( w końcu ISubject implementuje IObservable). W celu wywołania zdarzenia, korzystamy analogicznie z OnNext. Dzięki RX, łatwo zwolnić zdarzenie:

internal class Program
{
   public static  Subject<string> MessageReceived=new Subject<string>();

   private static void Main(string[] args)
   {            
       IDisposable disposable=MessageReceived.Subscribe(Console.WriteLine);
       MessageReceived.OnNext("Hello World");
       disposable.Dispose();
       MessageReceived.OnNext("You will not see this text");

       Console.ReadLine();
   }
}

Oczywiście, można (a nawet jest to polecane) użyć klauzuli using, dla powyższego przykładu.

internal class Program
{
   public static  Subject<string> MessageReceived=new Subject<string>();

   private static void Main(string[] args)
   {            
       using(MessageReceived.Subscribe(Console.WriteLine))
       {
           MessageReceived.OnNext("Hello World");
       }

       MessageReceived.OnNext("You will not see this text");

       Console.ReadLine();
}

Zdarzenie automatycznie zostanie zwolnione (a wszelkie metody połączonego do niej, usunięte) kiedy zmienna nie ma żadnych referencji. Przypomina to trochę weak event pattern, którego cel był analogiczny – ale wtedy należało wykonać dużo więcej roboty. Dzięki RX mamy możliwość większej kontroli nad przepływem zdarzeń za pomocą metod OnNext, OnError, OnCompleted:

internal class Program
{
   public static Subject<string> MessageReceived = new Subject<string>();

   private static void Main(string[] args)
   {
       using (MessageReceived.Subscribe(OnNext,OnCompleted,OnError))
       {
           MessageReceived.OnNext("Processing...");
           try
           {
               // do sth
           }
           catch (Exception e)
           {
               MessageReceived.OnError(e);
           }
           finally
           {
               MessageReceived.OnCompleted();
           }
       }

       Console.ReadLine();
   }
   private static void OnNext(string obj)
   {
   }
   private static void OnCompleted(Exception error)
   {
   }
   private static void OnError()
   {
   }
}

W przypadku standardowych zdarzeń, nie ma oczywiście takiej możliwości. Jeśli ktoś jest zaznajomiony z RX to Subject wygląda bardzo prosto – implementuje on interfejsy, które były bardzo dokładnie omówione w poprzednich wpisach.

Reactive Extensions: TakeUntil, Repeat, dalsza cześć przykładu obsługi zdarzeń

Kilka postów wcześniej, pokazałem jak narysować linię za pomocą RX oraz przechwytywania zdarzeń. Dla przypomnienia udało nam się narysować prostą linie z punktu (0,0) do aktualnej pozycji kursora:

public class MyCanvas : Canvas
{
   private Point _endPoint;

   public MyCanvas()
   {
       var eventsSource =
               Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
               Where(e => e.EventArgs.LeftButton == MouseButtonState.Pressed).
               Select(e => e.EventArgs.GetPosition(this));

       eventsSource.Subscribe(pos =>
                                  {
                                      _endPoint = pos;
                                      InvalidateVisual();
                                  });
   }
   protected override void OnRender(DrawingContext dc)
   {
       base.OnRender(dc);
       dc.DrawLine(new Pen(Brushes.Black, 1), new Point(), _endPoint);
   }
}

Następnym celem jest narysowanie linii od punktu gdzie użytkownik naciska lewy przycisk myszy do miejsca gdy zwolnienia myszkę. Dla uproszczenia, na razie nie będziemy rysować linii w czasie rzeczywistym tzn. kiedy użytkownik rusza myszką. Po prostu najpierw pobieramy punkt w zdarzeniu MouseDown, czekamy na MouseUp i po otrzymaniu sygnału rysujemy linie pomiędzy tymi dwoma punktami.

Rozwiązaniem jest użycie funkcji Zip, którą przedstawiałem w poprzednim poście:

var mouseDown =
 Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown").
 Select(r => r.EventArgs.GetPosition(this));

var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp").
Select(r => r.EventArgs.GetPosition(this));

mouseDown.Zip(mouseUp,(a,b)=>new{P1=a,P2=b}).
Subscribe((v) =>
               {
                   start = v.P1;
                   end = v.P2;
                   InvalidateVisual();
               })

Dlaczego nie CombineLatest? Jak wiemy, Zip czeka na drugą wartość. W praktyce oznacza to, gdy zostanie wywołane zdarzenie mouseDown, Zip będzie czekał aż do momentu gdy MouseUp jest wywołane. CombineLatest po prostu użyłoby ostatniego dostępnego zdarzenia (a nie tego po MouseDown), co oczywiście jest całkowicie błędne.

Nie jest jeszcze to idealne rozwiązanie.  Przede wszystkim, gdy użytkownik zwolni myszkę za oknem, wtedy następna narysowana linia będzie nieprawidłowa.  Powrócimy do tego na końcu wpisu ponieważ będzie potrzebne użycie funkcji TakeUntil oraz Repeat. Kolejnym krokiem jest zaimplementowanie rysowania w taki sposób, że linia jest  renderowana już w momencie naciśnięcia przycisku. Innymi słowy, najpierw chcemy czekać na zdarzenie MouseDown, następnie połączyć je z aktualną pozycją kursora, a na końcu zakończyć zapytanie gdy zostanie wywołany MouseUp:

var mouseDown =
      Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown").
      Select(r => r.EventArgs.GetPosition(this));
  
  var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp").
      Select(r => r.EventArgs.GetPosition(this));

  var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
      Select(r => r.EventArgs.GetPosition(this));


  mouseDown.CombineLatest(mouseMove, (a, b) => new { P1 = a, P2 = b }).TakeUntil(mouseUp).Repeat().
      Subscribe((a) =>
                    {
                        start = a.P1;
                        end = a.P2;
                        InvalidateVisual();
                    });

CombineLatest łączy zdarzenie zawierające pozycje kursora w momencie naciśnięcia przycisku (MouseDown,a) oraz również gdy użytkownik rusza myszą (MouseMove,b). Następnie chcemy te wartości przetwarzać aż do momentu MouseUp. Funkcja TakeUntil publikuje wartości aż do momentu zdarzenia przekazanego jako parametr – w tym przypadku jest to MouseUp. W momencie gdy MouseUp zostanie wywołany, wtedy zostaje wykonane OnCompleted i źródło danych kończy generowanie elementów. Oczywiście jedna sekwencja MouseDown, MouseMove, MouseUp nie jest wystarczająca i dlatego na końcu wywołujemy Repeat. Powoduje to, że cała sekwencja będzie powtarzana nieskończoną liczbę razy. Jeśli TakeUntil wciąż nie jest zrozumiały, proszę przeanalizować poniższy kod:

var cancelEvent = Observable.FromEventPattern(this,”Cancel”);

var query = readDataSource.TakeUntil(cancelEvent);

Źródło readDataSource będzie generowało elementy aż do momentu przyjścia zdarzenia Cancel. TakeUnti zachowuje się po prostu jak zawór, który jest zakręcany gdy inne źródło wygeneruje element.

Na zakończenie, powróćmy do poprzedniego problemu o rysowaniu linii pomiędzy mouseDown oraz MouseUp. Mając już wiedzę o Repeat oraz TakeUntil możemy napisać:

var mouseDown =
    Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown").
    Select(r => r.EventArgs.GetPosition(this));

var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp").
    Select(r => r.EventArgs.GetPosition(this));

mouseDown.
    TakeUntil(mouseUp).
    CombineLatest(mouseUp,(a,b)=>new{P1=a,P2=b}).
    Take(1).
    Repeat().
    Subscribe((a) =>
                {
                    start = a.P1;
                    end = a.P2;
                    InvalidateVisual();
                });

Po co nam Take(1)? Bez tego CombineLatest nigdy nie zakończy działania – będzie czekał na nowe elementy. Take(1) bierze po prostu pierwszą parę punktów i powoduje opublikowanie danych.

Reactive Extensions: Scalanie źródeł danych

W dzisiejszym wpisie znów wracamy do tematu RX. Postaram się wyjaśnić jak można ze sobą połączyć kilka IObservable. W RX istnieje naprawdę wiele metod umożliwiających wykonanie tego i na początku może wydawać się to dość skomplikowane, ze względu na liczbę sposobów w jaki można to wykonać.

1. Observable.Amb – zawsze zwraca wyłącznie tą sekwencje, która została jako pierwsza wygenerowana. Jeśli zatem mamy MouseMove i MouseUp wtedy zostanie zwrócone te zdarzenie, które jako pierwsze miało miejsce. Przykład:

internal class Program
{
   public static void Main()
   {
       var fasterSeq = Observable.Interval(TimeSpan.FromSeconds(1));
       var slowerSeq = Observable.Interval(TimeSpan.FromSeconds(5));

       var result = slowerSeq.Amb(fasterSeq);
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }  
}

2. Observable.Merge – scala ze sobą elementy (pary) z dwóch źródeł. Jeśli zatem pierwsza sekwencja ma wartości A,B,C,D a druga 1,2,3,4,5,6 to na wyjściu będzie A,1,B,2,C,3,D,4,5,6. Przykład:

public static void Main()
{
  var a = new []{"A", "B", "C", "D"}.ToObservable();
  var b = new[] {"1", "2", "3", "4", "5", "6"}.ToObservable();

  var result = a.Merge(b);
  result.Subscribe(Console.WriteLine);
  Console.ReadLine();
} 

3. Observable.Concat – na pierwszy rzut oka wygląda podobnie jak Observable.Merge. Jeśli scalane są źródła A,B,C,D z 1,2,3,4,5,6 na wyjściu będzie A,B,C,D,1,2,3,4,5,6. Najpierw zatem zostanie wyświetlona pierwsza sekwencja a potem druga. Przykład:

internal class Program
{
   public static void Main()
   {
       var a = new []{"A", "B", "C", "D"}.ToObservable();
       var b = new[] {"1", "2", "3", "4", "5", "6"}.ToObservable();

       var result = a.Concat(b);
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }  
}

Jakie to ma znaczenie w praktyce? Zależy od typu źródła. W powyższych przykładach nie ma większej różnicy ponieważ wszystkie IObservable zawsze zwrócą taki sam zbiór danych, niezależny od aktualnego czasu. Jeśli oczywiście generowane dane zależałby od czasu (np. zdarzenia) wtedy wynik mógłby zawierać inne elementy. Ponadto, w przypadku Concat może okazać się przydatna metoda SubscribeOn, o której będę pisał w innym poście, poświęconym synchronizacji w RX. W skrócie pisząc, gdy korzystamy z Concat, najpierw RX podpina się do źródła A, generuje wszystkie elementy a dopiero potem dokonuje subskrypcji do B. Z tego wynika, że źródło B dziedziczy kontekst (wątek) po A ponieważ, to w A wywoływany jest Subscribe do B. Jeśli to jest niezrozumiałe, to w jednym z następnych postów zostanie to wyjaśnione w szczegółach…

4. Observable.Zip – ten typ scalania trochę różni się od powyższych. Wszystkie przedstawione wcześniej sposoby, polegały na zwróceniu IObservable zawierającego ten sam typ. Jeśli zatem scalano dwa źródła danych zawierające po 2 elementy, wynikiem była taka sama sekwencja ale zawierająca po prostu 4 elementy (4 powiadomienia po subskrypcji). Obserbable.Zip polega na łączeniu w pary elementów z A i B. Oczywiście nic nie stoi na przeszkodzie aby elementy były innych typów. Jeśli zatem mamy “A”,”B”,”C”,”D” oraz 1,2,4 wtedy możemy to połączyć w nową sekwencje, zawierającą również 4 elementy, ale będącą po prostu rezultatem połączenia dwóch IObservable np.:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.Zip(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }
}

Na ekranie wyświetli się A:1,B:2,C:3,D:4. Co w przypadku jednak, gdy sekwencja A ma więcej elementów niż B? Innymi słowy, jak zachowa się Zip, jeśli nie można dopasować pary ponieważ brakuje odpowiadającego elementu w drugim źródle danych:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D","E"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.Zip(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine,()=>Console.WriteLine("Koniec"));
       Console.ReadLine();
   }
}

Na ekranie zostanie wyświetlony napis "Koniec” ponieważ, element “E” nie ma odpowiadającego elementu. Observable.Zip po prostu łączy pary po indeksach. Zip czeka aż drugie źródło wygeneruje odpowiedni element. W powyższym przypadku b nie wygeneruje tego elementu a zakończy swoje działanie przez OnCompleted – w takiej sytuacji Zip kończy również wykonywanie. Jeśli element “E” zostałby wygenerowany w 10 sekundzie, a wartość 5 w 20 sekundzie, wtedy Zip czekałby te 10 sekund na wartość. 5. W momencie jednak gdy b wywołał OnCompleted, całość kończy działanie.

5. Observable.CombineLatest – analogiczne do Zip, z tym, że nie czeka na odpowiadający element. Zawsze bierze ostatni dostępny. W powyższym przykładzie, E po prostu zostanie połączone z 4:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D","E"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.CombineLatest(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine,()=>Console.WriteLine("Koniec"));
       Console.ReadLine();
   }
}

Reactive Extensions– Observable.FromAsyncPattern, dalsza część przykładu

W poprzednim poście pokazałem jak korzystać z funkcji FromAsyncPattern na przykładzie usługi sieciowej. Dzisiaj zaprezentuję kilka dodatkowych funkcji. Najpierw zdefiniujmy co chcemy uzyskać:

  1. Użytkownik może wpisać szukaną frazę w pole edycyjne.
  2. Usługa sieciowa ma za zadanie wyszukanie fraz wpisanych w pole zdefiniowane w punkcie 1.
  3. Wyłącznie frazy dłuższe niż 3 znaki mają być przetwarzane.
  4. Jeśli użytkownik wpisze dwa razy tą samą frazę to tylko pierwsza ma zostać wysłana do usługi (optymalizacja).
  5. Zdarzenie TextChanged jest wywoływane dla każdego wpisanego znaku. Z tego względu, poprzednie rozwiązanie wysyłało dużo niepotrzebnych zapytań. W celu optymalizacji wprowadzimy opóźnienie takie jakie jest np. w Intellisense – propozycje nie są wyświetlane za każdym razem gdy jest wpisywany pojedynczy znak ale dopiero gdy użytkownik chwilę odczeka.

Klasyczne rozwiązanie problemu, bez użycia RX mogłoby wyglądać następująco:

public partial class MainWindow : Window
{
   private string _previousText;
   private DateTime _lastRead;
   private const long MinimalTimeBetweenRequests = 1000*5;

   public MainWindow()
   {
       InitializeComponent();
   }
   private void TextBoxTextChanged(object sender, TextChangedEventArgs e)
   {
       TimeSpan timeDiff = DateTime.Now - _lastRead;
       if (termTextBox.Text.Length >= 3 && _previousText != termTextBox.Text && timeDiff.TotalMilliseconds > MinimalTimeBetweenRequests)
       {
           DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap");
           _lastRead = DateTime.Now;
           client.BeginMatchInDict("wn", termTextBox.Text, "prefix", SearchCallback, client);
       }
       _previousText = termTextBox.Text;
   }
   private void SearchCallback(IAsyncResult result)
   {
       var client = (DictServiceSoapClient) result.AsyncState;
       DictionaryWord[] words = client.EndMatchInDict(result);            
       Dispatcher.Invoke(new Action(() => UpdateResults(words)));
   }
   private void UpdateResults(IEnumerable<DictionaryWord> words)
   {
       StringBuilder resultsBuilder = new StringBuilder();
       foreach (var word in words)
       {
           resultsBuilder.Append(word.Word);
           resultsBuilder.AppendLine();
       }
       results.Text = resultsBuilder.ToString();
   }
}

Szczególnie nie lubię callback’ów i definiowania pól w klasie, które naprawdę wykorzystywane są tylko w jednym miejscu.  Należy dążyć do sytuacji gdzie jest jak najmniej pól – upraszcza to czytanie kodu. Jeśli to tylko możliwe lepiej korzystać z zasobów lokalnych.

RX znacząco uprości powyższy problem. Najpierw warto jednak przyjrzeć się następującym metodom:

  1. Throttle – tłumi przetwarzanie danych. Dzięki temu łatwo zrealizować wymaganie numer 5. Wystarczy wywołać tą metodę z odpowiednim parametrem (czas) a RX zajmie się resztą.
  2. DistinctUntilChanged – zwraca wyłącznie jednorazowe, unikatowe wartości. Jeśli zdarzenie TextChanged generuje “Piotr”,”Paweł’,”Paweł na wyjściu pojawi się “Piotr”,”Paweł”. Z kolei jeśli zdarzenia dostarczają “Piotr”,”Paweł”,”Piotr” na wyjściu ukażę się identyczna sekwencja. Z tego względu DistinctUntilChanged nie jest tym samym co Distinct.

Zatem rozwiązanie za pomocą RX może wyglądać następująco:

public partial class MainWindow : Window
{
   public MainWindow()
   {
       InitializeComponent();

       DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap");

       var dictionarySource =
           Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>(client.BeginMatchInDict,
                                                                                 client.EndMatchInDict);
       var inputSource =
           Observable.FromEventPattern<TextChangedEventArgs>(termTextBox, "TextChanged").Select(i =>
                                                                                                ((TextBox) i.Sender)
                                                                                                    .Text);

       inputSource.Throttle(TimeSpan.FromSeconds(1)).
                   DistinctUntilChanged().
                   Where(input => input.Length >= 3).
                   SelectMany(input => dictionarySource("wn", input, "prefix")).
                   ObserveOn(SynchronizationContext.Current).
                   Subscribe(UpdateResults, () => MessageBox.Show("Completed"));
   }
   private void UpdateResults(IEnumerable<DictionaryWord> words)
   {
       StringBuilder resultsBuilder = new StringBuilder();
       foreach (var word in words)
       {
           resultsBuilder.Append(word.Word);
           resultsBuilder.AppendLine();
       }
       results.Text = resultsBuilder.ToString();
   }
}

Dzięki RX programista może opisywać swoją intencję za pomocą jednego zdania. Taki kod jest czytelniejszy ponieważ nie trzeba skakać z jednego miejsca do drugiego – wszystko jest zawarte w tym zdaniu.

Reactive Extensions–Observable.FromAsyncPattern

W wcześniejszych wersjach .NET Framework metody asynchroniczne implementujące wzorzec Begin\End były bardzo powszechne w użyciu. RX posiada metody pomocnicze, umożliwiające konwersję asynchronicznego źródła danych do IObservable.

Rozważmy to na przykładzie. Załóżmy, że mamy web service, zawierający jakieś metody. Można oczywiście dla takiego serwisu wygenerować asynchroniczne metody. Jeśli chcecie popraktykować możecie skorzystać z tej, darmowej usługi:

http://services.aonaware.com/DictService/DictService.asmx

Oczywiście dodając w Visual Studio referencję do usługi należy zaznaczyć pole Generate Asynchronous operations:

image

 

Zostaną m.in. wygenerowane metody BeginMatchInDict oraz EndMatchInDict. Powyższa usługa pozwala na wyszukiwanie słów w słowniku. MatchInDict posiada kilka parametrów przyjmujących np. słowo kluczowe, typ wyszukiwania czy typ słownika. Chcąc z nich skorzystać bez użycia RX należy zdefiniować własny callback:

private void TextBoxTextChanged(object sender, TextChangedEventArgs e)
{  
    DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap"); 
    client.BeginMatchInDict("wn", termTextBox.Text, "prefix", SearchCallback, client);  
}
private void SearchCallback(IAsyncResult result)
{
    var client = (DictServiceSoapClient) result.AsyncState;
    DictionaryWord[] words = client.EndMatchInDict(result);            
    UpdateResults(words);
}

Za pomocą RX można ten sam efekt uzyskać korzystając z Observable.FromAsyncPattern:

DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap");

var dictionarySource =
Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>(client.BeginMatchInDict,
                                                             client.EndMatchInDict);




var inputSource =
Observable.FromEventPattern<TextChangedEventArgs>(termTextBox, "TextChanged").Select(i =>
                                                                            ((TextBox) i.Sender)
                                                                                .Text);

inputSource.SelectMany(input => dictionarySource("wn", input, "prefix")).
ObserveOn(SynchronizationContext.Current).Subscribe(UpdateResults);

Na początku tworzymy dwa źródła danych. Jedno dla metod asynchronicznych drugie dla zdarzenia TextChanged obiektu TextBox, w którym użytkownik może wpisać szukaną frazę. Następnie za pomocą LINQ, przechodzimy przez każdy element inputSource (czyli przez każde wpisane słowo) i i wywołujemy dictionarySource. Wynikiem będzie oczywiście lista zawierająca szukane słowa. W następnym poście rozwinę trochę powyższy przykład aby pokazać, że posiadanie IObservable ułatwia znacząco przetwarzanie danych.

Reactive Extensions: Observable.Buffer

W ostatnim poście pisałem o konwersji zdarzeń .NET do RX. Dziś chciałbym zaprezentować przydatną funkcję, dostępną w rozszerzeniach Observable – Buffer. Służy ona do podzielenia danych w bufory. Załóżmy, że mamy źródło, które generuje dane co 1 sekundę. W każdej sekundzie zatem otrzymujemy jedno powiadomione OnNext. Co w przypadku jednak gdybyśmy chcieli dostawać w każdym powiadomieniu kilka wartości (tablicę elementów) ? Na przykład zamiast 10 OnNext, chcemy dwa powiadomienia, w którym każde zawiera listę 5 elementów.

Do tego właśnie służy Buffer: grupowania kilku powiadomień w jedną kolekcję.

Załóżmy, że mamy następujące źródło danych:

var source=Observable.Interval(TimeSpan.FromSeconds(1));

Co jedną sekundę tworzymy następny element. Gdybyśmy teraz dokonali subskrypcji, otrzymywalibyśmy OnNext dla każdej wartości od tzn.: 1,2,3,4…n. Załóżmy, że w jednym powiadomieniu chcemy mieć 5 liczb:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(5);
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

Jako parametr Buffer, podajemy liczbę elementów w buforze. W taki sposób, pierwszy OnNext zostanie wywołany dopiero w momencie gdy 5 elementów będzie gotowych. Po uruchomieniu na wyjściu zobaczymy:

image

Bez buforu oczywiście sprawa wyglądałby następująco:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1));
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(long value)
   {
       Console.WriteLine(value);
   }
}

image

Oprócz liczby elementów w jednym buforze można wskazać ile elementów ma być omijanych. Najlepiej to zrozumieć na przykładzie:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(count:5,skip:2);
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

image

W pierwszym buforze mamy zatem wartości 0,1,2,3,4 (nic tutaj nie zmienia się), z kolei drugi, zamiast od 5 zaczyna się od 2 – dwie pierwszy wartości zostały ominięte. Jak widać, domyślnie parametr skip przyjmuje wartość rozmiaru bufora.

Oprócz buforów iloościowych, można tworzyć czasowe:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(TimeSpan.FromSeconds(5));
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

image

Elementy są generowane co jedną sekundę, z kolei buforowane co 5 sekund. Wynika z tego, że każdy bufor będzie średnio zawierał po 5 elementów. Innymi słowy, po 5 sekundach dopiero zostanie wywołany OnNext, z kolekcją elementów, które zdążyły się wygenerować podczas tych 5 sekund.

Na zakończenie dzisiejszego wpisu małe zadanie. Spróbujmy napisać program, który zlicza liczbę zdarzeń MouseMove w ciągu jednej sekundy.  Na podstawie dwóch ostatnich postów, można napisać:

 var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
                Buffer(TimeSpan.FromSeconds(1)).
                Select(e => e.Count).
                ObserveOn(SynchronizationContext.Current).
                Subscribe(count => Title = count.ToString());

FromEventPattern konwertuje zdarzenie MouseMove do źródła danych RX. Następnie buforujemy elementy co sekundę. Wynika z tego, że pojedynczy bufor zawiera wszystkie zdarzenia jakie przytrafiły się w przeciągu jednej sekundy. Następnie za pomocą Select wybieramy liczbę elementów w buforze czyli liczbę zdarzeń w ciągu jednej sekundy. ObserveOn służy do synchronizacji – o tym w następnych wpisach. W skrócie, musimy skorzystać z ObserveOn aby nie mieć problemów z wyjątkiem cross-thread operation (aktualizacja interfejsu użytkownika z innego wątku). Następnie w Subscribe możemy wyświetlić liczbę zdarzeń na sekundę.

Do następnego wpisu!

Reactive Extensions: Konwersja zdarzeń .NET do RX

Po przeczytaniu poprzednich wpisów, zasada działania i zastosowanie RX powinny być już jasne. Jeśli tak nie jest, koniecznie zachęcam do przeczytania tamtych postów. Jak wspomniałem, RX to ujednolicony model, umożliwiający korzystanie z kolekcji typu “push-based” w jednakowy sposób. Dzisiaj pokażę jak sprawa wygląda dla zdarzeń czyli jak skonwertować EventHandler do IObervable.

Oczywiście kluczem do rozwiązania jest klasa Observable, która zawiera mnóstwo rozszerzeń dla interfejsów IObservable\IObserver. Znajduje się tam również metoda FromEventPattern, odpowiedzialna właśnie za konwersje zdarzenia do IObservable. Istnieje kilka sygnatur i używamy ich w zależności od tego jaki sposób bardziej odpowiada nam (od strony dobrych praktyk):

// I
var inputSource =
Observable.FromEventPattern<TextChangedEventArgs>(termTextBox, "TextChanged");

// II
var movingEvents = 
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => this.MouseMove += h, h => this.MouseMove -= h); 

Sposób pierwszy polega na przekazaniu instancji obiektu zawierającego zdarzenie i nazwy tego zdarzenia w formie string. Druga sygnatura przyjmuje delegaty odpowiedzialne za dodanie oraz usunięcie zdarzenia.  Spróbujmy więc napisać pierwszy kod, który coś robi. Załóżmy, że mamy pole edycyjne w aplikacji WPF i chcemy wyświetlać komunikat, gdy wprowadzony tekst jest dłuższy niż 5 znaków:

var eventSource = Observable.
                  FromEventPattern<TextChangedEventArgs>(textbox1, "TextChanged").
                  Select(e => ((TextBox) e.Sender).Text).
                  Where(text => text.Length > 5);

eventSource.Subscribe(text=>MessageBox.Show(text));

Wywołanie FromEventPattern zwraca IObservable dla zdarzenia o nazwie TextChanged. Następnie za pomocą LINQ zwracamy tylko tekst – nie potrzebujemy informacji o Sender czy EventArgs. Za pomocą Where nakładamy filtr. Proszę zauważyć, że w taki sposób korzystanie z zdarzeń niczym nie różni się od zwykłej kolekcji. Nie musimy martwić się już o buforowanie poprzednich elementów ponieważ traktujemy serie zdarzeń jak najzwyklejszą kolekcję danych.

Kolejnym przykładem może być rysowanie linii. W dzisiejszym poście ograniczymy się do narysowania linii od punktu (0,0) do aktualnej pozycji kursora, pod warunkiem, że lewy przycisk myszy jest wciśnięty. W tym celu stworzono kontrolkę dziedziczącą po Canvas:

public class MyCanvas : Canvas
{
   private Point _endPoint;

   public MyCanvas()
   {
       var eventsSource =
               Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
               Where(e => e.EventArgs.LeftButton == MouseButtonState.Pressed).
               Select(e => e.EventArgs.GetPosition(this));

       eventsSource.Subscribe(pos =>
                                  {
                                      _endPoint = pos;
                                      InvalidateVisual();
                                  });
   }     
   protected override void OnRender(DrawingContext dc)
   {
       base.OnRender(dc);
       dc.DrawLine(new Pen(Brushes.Black, 1), new Point(), _endPoint);
   }
}

Efekt końcowy:

image

Miałem zamiar w dzisiejszym wpisie pokazać bardziej praktyczny mechanizm drag&drop w RX ale stwierdziłem, że musi najpierw powstać jeszcze jeden post o funkcjach służących do scalania dwóch różnych źródeł. W ten sposób będziemy mogli połączyć MouseDown z MouseMove i na końcu znów z MouseUp.

Oprócz łatwej możliwości dostępu do dowolnych zdarzeń (brak konieczności buforowania), bardzo lubię fakt, że Subscribe zwraca tak naprawdę IDisposable:

using(eventsSource.Subscribe(pos =>
                             {
                                 _endPoint = pos;
                                 InvalidateVisual();
                             }))
{
      // zdarzenie podlaczone
}
  // tutaj zdarzenie jest juz odlaczone poniewaz nastapilo wywolanie IDisposable.Dispose()

Jak widać, łatwo sprzątać po sobie zasoby. Zdarzenia są zdecydowanie w .NET zbyt trudne do usuwania i często powodują memory leak. Pisałem już kiedyś o tzw. weak event pattern, ale osobiście preferuje RX. To nie koniec przygody ze zdarzeniami w RX – za kilka postów przedstawię kolejny mechanizm, przeznaczony wyłącznie dla zdarzeń.

Warto pamiętać, że RX dostarcza oprócz OnNext, takie metody jak OnCompleted oraz OnEror co ułatwia obsługę błędów. Dzięki LINQ łatwo tworzyć skomplikowane zapytania czy transformacje typu scalenie dwóch zdarzeń (o tym w następnym wpisie).

Reactive Extensions–jak wygenerować proste źródła danych

W poprzednim poście pokazałem jak dokonać subskrypcji aby otrzymywać powiadomienia o nowych danych oraz jak skonwertować IEnumerable to IObservable. Dzisiaj chciałbym pokazać kilka metod klasy Observable, które są szczególnie ważne przy pisaniu testów jednostkowych oraz przy nauce RX.  Muszę przyznać, że na co dzień korzystam wyłącznie tylko z kilku z nich ale w przypadku UnitTest’ów są już bardzo praktyczne.
Observable to zbiór statycznych metod (często rozszerzających) usprawniających pracę z IObservable\IObserver (dla przypomnienia oba interfejsy należą do .NET a nie RX).

1. Observable.Return

Return zwraca IObservable, który zawiera wyłącznie jeden element. Przykład:

IObservable<int> source=Observable.Return(54);
source.Subscribe(Console.WriteLine);

Na wyjściu zostanie wyświetlone 54. Można byłoby oczywiście stworzyć IEnumerable a potem skonwertować do IObservable za pomocą ToObservable (patrz poprzedni post), ale tak jest po prostu szybciej.

2. Observable.Never

Zwraca źródło danych nie zawierające żadnych elementów. Nigdy nie zostaną wywołane OnNext, OnCompleted czy OnError. Przykład:

var source = Observable.Never<int>();
source.Subscribe(Console.WriteLine, (error) => Console.WriteLine("Błąd"), () => Console.WriteLine("Koniec"));

3. Observable.Empty

Generuje pusty zbiór danych. W przeciwieństwie do Never, OnCompleted zostanie wywołany:

 var source = Observable.Empty<int>();
source.Subscribe(Console.WriteLine, (error) => Console.WriteLine("Błąd"), () => Console.WriteLine("Koniec"));

4. Observable.Range

Generuje liczby całkowite począwszy od zadanego indeksu:

var source = Observable.Range(5, 10);
source.Subscribe(Console.WriteLine, (error) => Console.WriteLine("Błąd"), () => Console.WriteLine("Koniec"));
// output to 10 liczb zaczynajacych się od 5.

5. Observable.Error

Tworzy zbiór danych, który zakończy się błędem. Z punktu praktycznego, po prostu OnError zostanie wywołany:

var source = Observable.Throw<int>(new ArgumentException());
source.Subscribe(Console.WriteLine, (error) => Console.WriteLine("Błąd"), () => Console.WriteLine("Koniec"));

6. Observable.Generate

Observable.Generate generuje liczby na podstawie dostarczonych warunków:

var source = Observable.Generate(0, i => i < 100, i => i + 5, i => i*2);
source.Subscribe(Console.WriteLine, (error) => Console.WriteLine("Błąd"), () => Console.WriteLine("Koniec"));

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState,
    Func<TState, bool> condition,
    Func<TState, TState> iterate,
    Func<TState, TResult> resultSelector
)

Parametry kolejno to: początkowy stan (wartość), warunek określający kiedy ma zakończyć się generowanie danych, funkcja (iterate) generująca następną wartość, selektor zwracający końcową wartość (w naszym przypadku będzie to liczba pomnożona przez dwa).

7. Observable.Interval

Funkcja generuje liczby całkowite co dany czas np.:

var source = Observable.Interval(TimeSpan.FromSeconds(1));
source.Subscribe(Console.WriteLine, (error) => Console.WriteLine("Błąd"), () => Console.WriteLine("Koniec"));

Na wyjściu otrzymamy wartości 0,1,2,3…n z częstotliwością równą jednej sekundzie.

W następnym wpisie pokażę już coś dużo bardziej praktyczniejszego – jak korzystać ze zdarzeń w RX. Zaprezentuję również kilka praktycznych i bardziej “zaawansowanych” zapytań.

Reactive Extensions–wprowadzenie

Kiedyś już chciałem poprowadzić cykl wpisów o Reactive Extensions i nawet napisałem pierwszy post wprowadzający do interfejsów IObservable, IObserver. Niestety po drodze przytrafiły się tematy które chciałem najpierw opisać i na końcu zrezygnowałem z tego. W między czasie kilka osób pytało o ten cykl ale nie widziałem sensu ponieważ Maciej Zbrzezny już wykonał kawał dobrej roboty i  opisał to na swoim blogu w bardzo szczegółowy sposób. Dzisiaj postanowiłem jednak napisać kilka postów o RX po swojemu, z trochę innej perspektywy – może komuś przyda to się, taką mam nadzieje. Ponadto oprócz podstaw RX chciałbym pokazać jak pisać bardziej zaawansowane rzeczy jak np. rekurencyjny scheduler, który ostatnio wykorzystałem do odpytywania bazy danych o strukturze rekurencyjnej (korzystamy z OPC Framework).

Najpierw polecam przeczytanie wpisu o IObservable oraz IObserver. Następnie proszę ściągnąć i zainstalować sam framework w wersji 1.0.

Dzisiaj postaram się odpowiedzieć na pytanie: dlaczego potrzebujemy Reactive Extensions? Jak wspomniałem RX opiera się na interfejsach IObservable, IObserver co oznacza, że RX korzysta z kolekcji push-based a nie wymagających odpytywania (pull-based collections). Najbardziej znanym źródłem danych  typu “pull-based” w .NET jest po prostu IEnumerable. Aby uzyskać takie dane należy odpytywać poszczególne elementy tzn.:

string[]elements=new []{"A", "B"};
foreach(string element in elements)
{
      Console.WriteLine(element);
}

Dla powyższego przykładu IEnumerable jest doskonały, problemy pojawią się gdy mamy do czynienia z czasochłonną operacją jak np. czytanie z web service. Na przykład taki kod byłby bardzo zły:

foreach(string element in _webService.GetElements())
{
      Console.WriteLine(element);
}

GetElements prawdopodobnie potrzebuje kilka sekund na wykonanie a to spowoduje zablokowanie wątku. W takich przykładach lepiej stosować kolekcje typu “push-based”, które opierają się na powiadomieniach (“wpychaniu” danych w strumień). W .NET są już one dobrze znane i można wymienić następujące przykłady:

  1. Zdarzenia, delegaty.
  2. ObservableCollection (oparte na zdarzeniach).
  3. Timer (również oparte na zdarzeniach).
  4. Metody asynchroniczne oparte na Begin\End (dziś wypierane przez async w .NET 4.5 ale wciąż bardzo często spotykane)
  5. Wątki (Task, Thread) i wszelkie callback’i

Jak do powyższych rozważań ma się RX? RX to ujednolicona architektura, która pozwala połączyć różne kolekcje push-based w jeden mechanizm. Łatwo zauważyć, że w .NET inaczej obsługuje się zdarzenia a inaczej callback’i a jeszcze inaczej wzorzec asynchroniczny Begin\End. RX traktuje wszystkie źródła danych (nieważne czy jest to kolekcja czy po prostu zdarzenie) jako kolekcje danych. Dzięki temu łatwe jest operowanie na zdarzeniach w których często potrzebujemy kilka ostatnich wartości. Na przykład aby narysować linie tak jak to Microsoft’owy Paint robi, należy pobrać aktualną i poprzednią pozycję kursora na ekranie. W przypadku zdarzeń jest to niemożliwe i należy ręcznie stworzyć pole w klasie, które służy jako bufor. W przypadku RX, traktujemy zdarzenia jak kolekcje i wtedy zwrócenie dwóch ostatnich elementów (dwóch ostatnich pozycji kursora) jest bardzo łatwe.

Ponadto RX oparty jest na języku zapytań LINQ. Z tego względu zwrócenie specyficznej pozycji kursora sprowadza się do jednego zapytania LINQ a nie buforowania serii wartości i tworzenia IF. Jak wspomniałem RX to ujednolicona architektura dla kolekcji push-based. Umożliwia to połączenie wiele źródeł danych (web services, zdarzenia, Begin\End) w jedno zapytanie LINQ. Przydatne to jest gdy należy np. pobrać identyfikatory z jednej bazy danych, następnie wykorzystać je jako klucze do przeczytania wartości z drugiej bazy. Dzisiaj należy obsłużyć to samemu poprzez callback’i, które zawsze są trudne w obsłudze a jeszcze trudniejsze w czytaniu. Innymi słowy, RX umożliwia synchronizacje wielu źródeł danych za pomocą jednego zapytania LINQ, które opisuje całe wykonywane zadanie. Zamiast skakać z jednej metody do drugiej tworzymy jedno (czasami długie) zapytanie, które wygląda jak zdanie opisujące daną akcję.

RX wspiera wiele platform:

  1. .NET Framework
  2. WPF\Silverlight – m.in. specjalne biblioteki ułatwiające synchronizacje z UI.
  3. Windows Phone
  4. JavaScript

To co warto zauważyć kolekcje push-based oraz pull-based służą do dokładnie tego samego – dostępu do danych. W Internecie można nawet znaleźć matematyczną transformację pomiędzy interfejsami IEnumerable\IEnumerator do IObservable\IObserver:

public interface IEnumerable<T>
{
    IEnumerator<T> GetEnumerator();
}

public interface IEnumerator<T>: IDisposable
{
    T Current { get; }
    bool MoveNext();
    void Reset();
}

// równoważne do

public interface IObservable<T>
{
    IDisposable Subscribe( IObserver<T>);
}
public interface IObserver<T>
{
    void OnNext(T);
    void OnError(Exception);
    void OnCompleted ();
}

Nie chcę na blogu pokazywać jak kro po kroku wygląda dowód matematyczny ale zobaczmy jak to wygląda po prostu od strony użytkownika. Dla pull-based odpytywanie danych wygląda oczywiście następująco:

string[]elements=new []{"A","B"};
foreach(string element in elements)
{
    Console.WriteLine(element);
}

Dla IObservable\IObserver wygląda to:

string[] elements = new[] {"A", "B"};
IObservable<string> observableElements = elements.ToObservable();
observableElements.Subscribe(Console.WriteLine);

Oba przykłady wyświetlą na ekranie taki sam wynik. IObserver\IObservable są interfejsami należącymi do .NET Framework ale z kolei metoda ToObservable to już RX Extensions. RX to zestaw rozszerzeń, które ułatwiają pracę z IObservable\IObserver. Dzięki nim możemy np. skonwertować zdarzenia .NET do IObservable. Analogicznie Metoda Observable.ToObservable służy do skonwertowania IEnumerable do IObservable. RX to nic innego jak zestaw helper’ów i metod rozszerzających.

Jak widać korzystanie IObservable sprowadza się do kilku kroków:

  1. Stworzenia źródła danych IObservable. W powyższym fragmencie wykorzystano metodę ToObservable zamieniającą IEnumerable do tego interfejsu. W RX Istnieją  jeszcze inne metody do konwersji zdarzeń czy wzorca Begin\End – o tym w następnych postach.
  2. Implementacji IObserver, który zawiera trzy metody OnNext, OnError, OnCompleted. Jest to etap opcjonalny bo można użyć wyrażenia lambda jak to zostało przedstawione w powyższym fragmencie kodu.
  3. Dokonanie subskrypcji za pomocą metody Subscribe.  Można użyć klasy stworzonej w punkcie drugim albo po prostu wyrażeń lambda.

Jeśli krok drugi nie jest jasny to proszę rozważyć następujący kod:

class CustomObserver:IObserver<string>
{
    public void OnNext(string value)
    {
        Console.WriteLine("Nastepny element:{0}",value);
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("Błąd:{0}",error.Message);
    }

    public void OnCompleted()
    {
        Console.WriteLine("Koniec danych");
    }
}
internal class Program
{
    private static void Main(string[] args)
    {
        string[] elements = new[] {"A", "B"};
        IObservable<string> observableElements = elements.ToObservable();

        // z użyciem IObserver
        observableElements.Subscribe(new CustomObserver());
     
        // z użyciem lambda
        observableElements.Subscribe(
            value => Console.WriteLine("Nastepny element:{0}", value),
            error => Console.WriteLine("Błąd:{0}", error.Message),
            () => Console.WriteLine("Koniec danych"));
    }
}

Proszę zauważyć, że jest to lepsze rozwiązanie niż zwykle zdarzenia w .NET. Oprócz czystych danych otrzymywanych przez OnNext dostajemy informację o ewentualnych błędach (OnError) oraz zakończeniu strumienia danych OnCompleted. Oczywiście nie wszystkie źródła danych mają swój koniec np. MouseMove zawsze będzie wywoływało OnNext ponieważ w każdej chwili użytkownik może poruszyć myszką. W przypadku jednak Begin\End czy powyższego przykładu z elementami “A”,”B”, OnCompleted zostanie wywołany bardzo szybko.

W następnym poście przedstawię więcej metod rozszerzających RX pozwalających na tworzenie bardziej skomplikowanych źródeł danych niż zwykła konwersja IEnumerable->IObservable. Wiem, że nie jest to na razie zbyt praktyczne ale trzeba przebrnąć przez te podstawy.

Interfejsy IObservable oraz IObserver

W przyszłych postach chcę zająć się Reactive Extensions, jednak zanim zacznę cykl postów o tym, najpierw przedstawię dwa interfejsy wprowadzone w .NET 4.0. Interfejsy umożliwiają implementację wzorca obserwator. IObserver powinien zostać zaimplementowany dla klasy, która chcę być powiadamiana o zmianach dokonywanych na klasie implementującej IObservable. Przyjrzyjmy się najpierw metodom IObserver:

  1. OnCompleted – obserwacja wszelkich zmian zakończona.
  2. OnError – wystąpił błąd.
  3. OnNext – Nowa zmiana np. dodano element do kolekcji.

Z kolei IObservable wymaga wyłącznie jednej metody, Subscribe odpowiedzialnej za subskrypcję obserwatora. Napiszmy więc klasę dokonującą zmian na bazie danych:

class PersonRepository:IObservable<string>
{        
        #region IObservable<string> Members

        private List<IObserver<string>> _observers = new List<IObserver<string>>();

        public IDisposable Subscribe(IObserver<string> observer)
        {
            _observers.Add(observer);
            return null;
        }       

        #endregion

        public void AddPerson(string person)
        {
            // jakas akcja
            if(person==null)
                _observers.ForEach(o => o.OnError(new NullReferenceException()));
            else
                _observers.ForEach(o=>o.OnNext(person));
        }
        public void Close()
        {
            _observers.ForEach(o => o.OnCompleted());
        }
}

Jak widać IObservable jest generycznym typem. W poście używam po prostu typu string. Metoda Subscribe odpowiedzialna jest za subskrypcję obserwatorów. Warto uwagę zwrócić na IDisposable. Dzięki temu można zaimplementować zwalnianie zasobów gdy obserwator jest już nie dostępny (pomięto to w poście). Następnie dodano dwie metody AddPerson oraz Close – tylko po to aby pokazać co można zrobić na podstawie IObservable. Gdy użytkownik wykona metodę AddPerson, wszyscy obserwatorzy zostaną o tym powiadomieni. Przejdźmy więc do implementacji IObserver:

class SampleObserver:IObserver<string>
{
        #region IObserver<string> Members

        public void OnCompleted()
        {
            MessageBox.Show("Completed");
        }

        public void OnError(Exception error)
        {
            MessageBox.Show(string.Format("Error: {0}", error.Message));
        }

        public void OnNext(string value)
        {
            MessageBox.Show(string.Format("Next: {0}",value));
        }

        #endregion
}

A na zakończenie przykład wykorzystania powyższych klas

PersonRepository repository=new PersonRepository();
repository.Subscribe(new SampleObserver());            
repository.AddPerson("test");
repository.AddPerson(null);
repository.Close();

Wykonanie AddPerson lub Close spowoduje wyświetlenie MessageBox, zgodnie z tym co zostało zaimplementowane w przykładowym obserwatorze. .NET dostarcza wyłącznie interfejsy, co może wydawać się nieco niepotrzebne. Zdecydowałem się jednak na takie wprowadzanie, aby następny post (o RX) był łatwiejszy w przyswojeniu.