Reactive Extensions: Scalanie źródeł danych

W dzisiejszym wpisie znów wracamy do tematu RX. Postaram się wyjaśnić jak można ze sobą połączyć kilka IObservable. W RX istnieje naprawdę wiele metod umożliwiających wykonanie tego i na początku może wydawać się to dość skomplikowane, ze względu na liczbę sposobów w jaki można to wykonać.

1. Observable.Amb – zawsze zwraca wyłącznie tą sekwencje, która została jako pierwsza wygenerowana. Jeśli zatem mamy MouseMove i MouseUp wtedy zostanie zwrócone te zdarzenie, które jako pierwsze miało miejsce. Przykład:

internal class Program
{
   public static void Main()
   {
       var fasterSeq = Observable.Interval(TimeSpan.FromSeconds(1));
       var slowerSeq = Observable.Interval(TimeSpan.FromSeconds(5));

       var result = slowerSeq.Amb(fasterSeq);
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }  
}

2. Observable.Merge – scala ze sobą elementy (pary) z dwóch źródeł. Jeśli zatem pierwsza sekwencja ma wartości A,B,C,D a druga 1,2,3,4,5,6 to na wyjściu będzie A,1,B,2,C,3,D,4,5,6. Przykład:

public static void Main()
{
  var a = new []{"A", "B", "C", "D"}.ToObservable();
  var b = new[] {"1", "2", "3", "4", "5", "6"}.ToObservable();

  var result = a.Merge(b);
  result.Subscribe(Console.WriteLine);
  Console.ReadLine();
} 

3. Observable.Concat – na pierwszy rzut oka wygląda podobnie jak Observable.Merge. Jeśli scalane są źródła A,B,C,D z 1,2,3,4,5,6 na wyjściu będzie A,B,C,D,1,2,3,4,5,6. Najpierw zatem zostanie wyświetlona pierwsza sekwencja a potem druga. Przykład:

internal class Program
{
   public static void Main()
   {
       var a = new []{"A", "B", "C", "D"}.ToObservable();
       var b = new[] {"1", "2", "3", "4", "5", "6"}.ToObservable();

       var result = a.Concat(b);
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }  
}

Jakie to ma znaczenie w praktyce? Zależy od typu źródła. W powyższych przykładach nie ma większej różnicy ponieważ wszystkie IObservable zawsze zwrócą taki sam zbiór danych, niezależny od aktualnego czasu. Jeśli oczywiście generowane dane zależałby od czasu (np. zdarzenia) wtedy wynik mógłby zawierać inne elementy. Ponadto, w przypadku Concat może okazać się przydatna metoda SubscribeOn, o której będę pisał w innym poście, poświęconym synchronizacji w RX. W skrócie pisząc, gdy korzystamy z Concat, najpierw RX podpina się do źródła A, generuje wszystkie elementy a dopiero potem dokonuje subskrypcji do B. Z tego wynika, że źródło B dziedziczy kontekst (wątek) po A ponieważ, to w A wywoływany jest Subscribe do B. Jeśli to jest niezrozumiałe, to w jednym z następnych postów zostanie to wyjaśnione w szczegółach…

4. Observable.Zip – ten typ scalania trochę różni się od powyższych. Wszystkie przedstawione wcześniej sposoby, polegały na zwróceniu IObservable zawierającego ten sam typ. Jeśli zatem scalano dwa źródła danych zawierające po 2 elementy, wynikiem była taka sama sekwencja ale zawierająca po prostu 4 elementy (4 powiadomienia po subskrypcji). Obserbable.Zip polega na łączeniu w pary elementów z A i B. Oczywiście nic nie stoi na przeszkodzie aby elementy były innych typów. Jeśli zatem mamy “A”,”B”,”C”,”D” oraz 1,2,4 wtedy możemy to połączyć w nową sekwencje, zawierającą również 4 elementy, ale będącą po prostu rezultatem połączenia dwóch IObservable np.:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.Zip(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }
}

Na ekranie wyświetli się A:1,B:2,C:3,D:4. Co w przypadku jednak, gdy sekwencja A ma więcej elementów niż B? Innymi słowy, jak zachowa się Zip, jeśli nie można dopasować pary ponieważ brakuje odpowiadającego elementu w drugim źródle danych:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D","E"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.Zip(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine,()=>Console.WriteLine("Koniec"));
       Console.ReadLine();
   }
}

Na ekranie zostanie wyświetlony napis "Koniec” ponieważ, element “E” nie ma odpowiadającego elementu. Observable.Zip po prostu łączy pary po indeksach. Zip czeka aż drugie źródło wygeneruje odpowiedni element. W powyższym przypadku b nie wygeneruje tego elementu a zakończy swoje działanie przez OnCompleted – w takiej sytuacji Zip kończy również wykonywanie. Jeśli element “E” zostałby wygenerowany w 10 sekundzie, a wartość 5 w 20 sekundzie, wtedy Zip czekałby te 10 sekund na wartość. 5. W momencie jednak gdy b wywołał OnCompleted, całość kończy działanie.

5. Observable.CombineLatest – analogiczne do Zip, z tym, że nie czeka na odpowiadający element. Zawsze bierze ostatni dostępny. W powyższym przykładzie, E po prostu zostanie połączone z 4:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D","E"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.CombineLatest(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine,()=>Console.WriteLine("Koniec"));
       Console.ReadLine();
   }
}

Leave a Reply

Your email address will not be published.