Reactive Extensions: Observable.Buffer

W ostatnim poście pisałem o konwersji zdarzeń .NET do RX. Dziś chciałbym zaprezentować przydatną funkcję, dostępną w rozszerzeniach Observable – Buffer. Służy ona do podzielenia danych w bufory. Załóżmy, że mamy źródło, które generuje dane co 1 sekundę. W każdej sekundzie zatem otrzymujemy jedno powiadomione OnNext. Co w przypadku jednak gdybyśmy chcieli dostawać w każdym powiadomieniu kilka wartości (tablicę elementów) ? Na przykład zamiast 10 OnNext, chcemy dwa powiadomienia, w którym każde zawiera listę 5 elementów.

Do tego właśnie służy Buffer: grupowania kilku powiadomień w jedną kolekcję.

Załóżmy, że mamy następujące źródło danych:

var source=Observable.Interval(TimeSpan.FromSeconds(1));

Co jedną sekundę tworzymy następny element. Gdybyśmy teraz dokonali subskrypcji, otrzymywalibyśmy OnNext dla każdej wartości od tzn.: 1,2,3,4…n. Załóżmy, że w jednym powiadomieniu chcemy mieć 5 liczb:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(5);
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

Jako parametr Buffer, podajemy liczbę elementów w buforze. W taki sposób, pierwszy OnNext zostanie wywołany dopiero w momencie gdy 5 elementów będzie gotowych. Po uruchomieniu na wyjściu zobaczymy:

image

Bez buforu oczywiście sprawa wyglądałby następująco:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1));
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(long value)
   {
       Console.WriteLine(value);
   }
}

image

Oprócz liczby elementów w jednym buforze można wskazać ile elementów ma być omijanych. Najlepiej to zrozumieć na przykładzie:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(count:5,skip:2);
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

image

W pierwszym buforze mamy zatem wartości 0,1,2,3,4 (nic tutaj nie zmienia się), z kolei drugi, zamiast od 5 zaczyna się od 2 – dwie pierwszy wartości zostały ominięte. Jak widać, domyślnie parametr skip przyjmuje wartość rozmiaru bufora.

Oprócz buforów iloościowych, można tworzyć czasowe:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(TimeSpan.FromSeconds(5));
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

image

Elementy są generowane co jedną sekundę, z kolei buforowane co 5 sekund. Wynika z tego, że każdy bufor będzie średnio zawierał po 5 elementów. Innymi słowy, po 5 sekundach dopiero zostanie wywołany OnNext, z kolekcją elementów, które zdążyły się wygenerować podczas tych 5 sekund.

Na zakończenie dzisiejszego wpisu małe zadanie. Spróbujmy napisać program, który zlicza liczbę zdarzeń MouseMove w ciągu jednej sekundy.  Na podstawie dwóch ostatnich postów, można napisać:

 var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
                Buffer(TimeSpan.FromSeconds(1)).
                Select(e => e.Count).
                ObserveOn(SynchronizationContext.Current).
                Subscribe(count => Title = count.ToString());

FromEventPattern konwertuje zdarzenie MouseMove do źródła danych RX. Następnie buforujemy elementy co sekundę. Wynika z tego, że pojedynczy bufor zawiera wszystkie zdarzenia jakie przytrafiły się w przeciągu jednej sekundy. Następnie za pomocą Select wybieramy liczbę elementów w buforze czyli liczbę zdarzeń w ciągu jednej sekundy. ObserveOn służy do synchronizacji – o tym w następnych wpisach. W skrócie, musimy skorzystać z ObserveOn aby nie mieć problemów z wyjątkiem cross-thread operation (aktualizacja interfejsu użytkownika z innego wątku). Następnie w Subscribe możemy wyświetlić liczbę zdarzeń na sekundę.

Do następnego wpisu!

Leave a Reply

Your email address will not be published.