W ostatnim poście pisałem o konwersji zdarzeń .NET do RX. Dziś chciałbym zaprezentować przydatną funkcję, dostępną w rozszerzeniach Observable – Buffer. Służy ona do podzielenia danych w bufory. Załóżmy, że mamy źródło, które generuje dane co 1 sekundę. W każdej sekundzie zatem otrzymujemy jedno powiadomione OnNext. Co w przypadku jednak gdybyśmy chcieli dostawać w każdym powiadomieniu kilka wartości (tablicę elementów) ? Na przykład zamiast 10 OnNext, chcemy dwa powiadomienia, w którym każde zawiera listę 5 elementów.
Do tego właśnie służy Buffer: grupowania kilku powiadomień w jedną kolekcję.
Załóżmy, że mamy następujące źródło danych:
var source=Observable.Interval(TimeSpan.FromSeconds(1));
Co jedną sekundę tworzymy następny element. Gdybyśmy teraz dokonali subskrypcji, otrzymywalibyśmy OnNext dla każdej wartości od tzn.: 1,2,3,4…n. Załóżmy, że w jednym powiadomieniu chcemy mieć 5 liczb:
internal class Program { private static void Main(string[] args) { var source = Observable.Interval(TimeSpan.FromSeconds(1)). Buffer(5); source.Subscribe(OnNext); Console.ReadLine(); } private static void OnNext(IList<long> values) { Console.WriteLine("Nowy bufer"); foreach (var value in values) Console.WriteLine(value); } }
Jako parametr Buffer, podajemy liczbę elementów w buforze. W taki sposób, pierwszy OnNext zostanie wywołany dopiero w momencie gdy 5 elementów będzie gotowych. Po uruchomieniu na wyjściu zobaczymy:
Bez buforu oczywiście sprawa wyglądałby następująco:
internal class Program { private static void Main(string[] args) { var source = Observable.Interval(TimeSpan.FromSeconds(1)); source.Subscribe(OnNext); Console.ReadLine(); } private static void OnNext(long value) { Console.WriteLine(value); } }
Oprócz liczby elementów w jednym buforze można wskazać ile elementów ma być omijanych. Najlepiej to zrozumieć na przykładzie:
internal class Program { private static void Main(string[] args) { var source = Observable.Interval(TimeSpan.FromSeconds(1)). Buffer(count:5,skip:2); source.Subscribe(OnNext); Console.ReadLine(); } private static void OnNext(IList<long> values) { Console.WriteLine("Nowy bufer"); foreach (var value in values) Console.WriteLine(value); } }
W pierwszym buforze mamy zatem wartości 0,1,2,3,4 (nic tutaj nie zmienia się), z kolei drugi, zamiast od 5 zaczyna się od 2 – dwie pierwszy wartości zostały ominięte. Jak widać, domyślnie parametr skip przyjmuje wartość rozmiaru bufora.
Oprócz buforów iloościowych, można tworzyć czasowe:
internal class Program { private static void Main(string[] args) { var source = Observable.Interval(TimeSpan.FromSeconds(1)). Buffer(TimeSpan.FromSeconds(5)); source.Subscribe(OnNext); Console.ReadLine(); } private static void OnNext(IList<long> values) { Console.WriteLine("Nowy bufer"); foreach (var value in values) Console.WriteLine(value); } }
Elementy są generowane co jedną sekundę, z kolei buforowane co 5 sekund. Wynika z tego, że każdy bufor będzie średnio zawierał po 5 elementów. Innymi słowy, po 5 sekundach dopiero zostanie wywołany OnNext, z kolekcją elementów, które zdążyły się wygenerować podczas tych 5 sekund.
Na zakończenie dzisiejszego wpisu małe zadanie. Spróbujmy napisać program, który zlicza liczbę zdarzeń MouseMove w ciągu jednej sekundy. Na podstawie dwóch ostatnich postów, można napisać:
var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove"). Buffer(TimeSpan.FromSeconds(1)). Select(e => e.Count). ObserveOn(SynchronizationContext.Current). Subscribe(count => Title = count.ToString());
FromEventPattern konwertuje zdarzenie MouseMove do źródła danych RX. Następnie buforujemy elementy co sekundę. Wynika z tego, że pojedynczy bufor zawiera wszystkie zdarzenia jakie przytrafiły się w przeciągu jednej sekundy. Następnie za pomocą Select wybieramy liczbę elementów w buforze czyli liczbę zdarzeń w ciągu jednej sekundy. ObserveOn służy do synchronizacji – o tym w następnych wpisach. W skrócie, musimy skorzystać z ObserveOn aby nie mieć problemów z wyjątkiem cross-thread operation (aktualizacja interfejsu użytkownika z innego wątku). Następnie w Subscribe możemy wyświetlić liczbę zdarzeń na sekundę.
Do następnego wpisu!