Artykuł: Optymalizacja kodu C# – część I

Rozpocząłem pisanie nowego cyklu artykułów, tym razem o wydajności w C#. Pierwsza część właśnie została opublikowana i zawiera przede wszystkim wprowadzenie do tematu oraz kilka konkretnych przykładów. Kolejne części już wkrótce i będą prezentowały poszczególne konstrukcje w C#. Na blogu już pisałem niejednokrotnie o wydajności w C#, ale artykuł oprócz tego co już tutaj zostało napisane, zawiera dodatkowe informacje i przykłady. Dla tych co nie czytali blog’a myślę, że taki cykl stanowi po prostu kompendium informacji o wydajności w C#.

Link: http://msdn.microsoft.com/pl-pl/library/optymalizacja-kodu-C-sharp–czesc-1

Reactive Extensions: Zdarzenia w RX

Opisywałem już mechanizm konwersji zdarzeń .NET do IObservable. W RX istnieje dodatkowo nowy mechanizm, mający na celu zastąpić standardowe zdarzenia  .NET – przynajmniej w części przypadków. Zastanówmy się, co jest złego tak naprawdę w obsłudze zdarzeń w .NET?

1. Dość często programiści zapominają usunąć zdarzenie co może skutkować memory leak. Czasami jest ciężko zdefiniować moment, w którym należy usunąć zdarzenie. Pewnym rozwiązaniem problemu może być zastosowanie wzorca weak event pattern ale jak wiemy z archiwalnych wpisów, nie jest to też takie proste.

2. Zdarzenia, które zawierają anonimowe metody są szczególnie trudne do usunięcia. Na przykład:

internal class Program
{
   public static event EventHandler MessageReceived;
   private static void Main(string[] args)
   {
       MessageReceived += (sender,e) => Console.WriteLine(e.ToString());
   }
}

W jaki sposób usunąć powyższe zdarzenie? Niestety trzeba stworzyć tymczasową zmienną i ją przechowywać gdzieś:

public static event EventHandler MessageReceived;

private static void Main(string[] args)
{
  EventHandler action=(sender, e) => Console.WriteLine(e.ToString());

  MessageReceived += action;
  // disposing
  MessageReceived -= action;
}

3.  Jak już wspomniałem i pokazałem w jednym z poprzednich wpisów, nie ma możliwości odczytania poprzednich wartości, jakie zostały wygenerowane przez zdarzenie (np. poprzednia pozycja myszki).

4. Zdarzenia to właściwości a nie obiekty co czasami może być niewygodne.

5. Z punktu pisania testów jednostkowych, zdarzenia również nie są szczególnie łatwe do obsługi.

6. Niestety zdarzeń nie można traktować jako kolekcji, z tego względu nie ma możliwości wykorzystania zapytań  LINQ.

Dość narzekania, czas pokazać mechanizm jaki dostarcza RX. Mam nadzieję, że wszyscy są już obeznani z wzorcem projektowym Observer. Jeśli nie, polecam poczytanie o nim( np. na tym blogu, kiedyś o tym już pisałem). Znając zasady działania wzorca obserwator, mechanizm działania zdarzeń RX nie powinien zaskakiwać. Najważniejszym interfejsem jest ISubject, który wygląda następująco:

public interface ISubject<in TSource, out TResult> 
: IObserver<TSource>, IObservable<TResult>
{ 
}
public interface ISubject<T>  : ISubject<T, T>
{ 
}

Proszę zauważyć, że implementuje on zarówno IObserver jak IObservable. W praktyce oznacza to, że klasy implementujące ISubject, mogą zarówno publikować jak i dokonywać subskrypcji (nasłuchiwać). Przykład:

internal class Program
{
   public static  Subject<string> MessageReceived=new Subject<string>();

   private static void Main(string[] args)
   {            
       MessageReceived.Subscribe(Console.WriteLine);
       MessageReceived.OnNext("Hello World");
       Console.ReadLine();
   }
}

W zwykłych zdarzeniach, operator += służy do podłączenia metod. W RX analogiczną funkcję pełni metoda Subscribe. Zasada działania jest taka sama, jak w przypadku dobrze znanego IObservable ( w końcu ISubject implementuje IObservable). W celu wywołania zdarzenia, korzystamy analogicznie z OnNext. Dzięki RX, łatwo zwolnić zdarzenie:

internal class Program
{
   public static  Subject<string> MessageReceived=new Subject<string>();

   private static void Main(string[] args)
   {            
       IDisposable disposable=MessageReceived.Subscribe(Console.WriteLine);
       MessageReceived.OnNext("Hello World");
       disposable.Dispose();
       MessageReceived.OnNext("You will not see this text");

       Console.ReadLine();
   }
}

Oczywiście, można (a nawet jest to polecane) użyć klauzuli using, dla powyższego przykładu.

internal class Program
{
   public static  Subject<string> MessageReceived=new Subject<string>();

   private static void Main(string[] args)
   {            
       using(MessageReceived.Subscribe(Console.WriteLine))
       {
           MessageReceived.OnNext("Hello World");
       }

       MessageReceived.OnNext("You will not see this text");

       Console.ReadLine();
}

Zdarzenie automatycznie zostanie zwolnione (a wszelkie metody połączonego do niej, usunięte) kiedy zmienna nie ma żadnych referencji. Przypomina to trochę weak event pattern, którego cel był analogiczny – ale wtedy należało wykonać dużo więcej roboty. Dzięki RX mamy możliwość większej kontroli nad przepływem zdarzeń za pomocą metod OnNext, OnError, OnCompleted:

internal class Program
{
   public static Subject<string> MessageReceived = new Subject<string>();

   private static void Main(string[] args)
   {
       using (MessageReceived.Subscribe(OnNext,OnCompleted,OnError))
       {
           MessageReceived.OnNext("Processing...");
           try
           {
               // do sth
           }
           catch (Exception e)
           {
               MessageReceived.OnError(e);
           }
           finally
           {
               MessageReceived.OnCompleted();
           }
       }

       Console.ReadLine();
   }
   private static void OnNext(string obj)
   {
   }
   private static void OnCompleted(Exception error)
   {
   }
   private static void OnError()
   {
   }
}

W przypadku standardowych zdarzeń, nie ma oczywiście takiej możliwości. Jeśli ktoś jest zaznajomiony z RX to Subject wygląda bardzo prosto – implementuje on interfejsy, które były bardzo dokładnie omówione w poprzednich wpisach.

Reactive Extensions: TakeUntil, Repeat, dalsza cześć przykładu obsługi zdarzeń

Kilka postów wcześniej, pokazałem jak narysować linię za pomocą RX oraz przechwytywania zdarzeń. Dla przypomnienia udało nam się narysować prostą linie z punktu (0,0) do aktualnej pozycji kursora:

public class MyCanvas : Canvas
{
   private Point _endPoint;

   public MyCanvas()
   {
       var eventsSource =
               Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
               Where(e => e.EventArgs.LeftButton == MouseButtonState.Pressed).
               Select(e => e.EventArgs.GetPosition(this));

       eventsSource.Subscribe(pos =>
                                  {
                                      _endPoint = pos;
                                      InvalidateVisual();
                                  });
   }
   protected override void OnRender(DrawingContext dc)
   {
       base.OnRender(dc);
       dc.DrawLine(new Pen(Brushes.Black, 1), new Point(), _endPoint);
   }
}

Następnym celem jest narysowanie linii od punktu gdzie użytkownik naciska lewy przycisk myszy do miejsca gdy zwolnienia myszkę. Dla uproszczenia, na razie nie będziemy rysować linii w czasie rzeczywistym tzn. kiedy użytkownik rusza myszką. Po prostu najpierw pobieramy punkt w zdarzeniu MouseDown, czekamy na MouseUp i po otrzymaniu sygnału rysujemy linie pomiędzy tymi dwoma punktami.

Rozwiązaniem jest użycie funkcji Zip, którą przedstawiałem w poprzednim poście:

var mouseDown =
 Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown").
 Select(r => r.EventArgs.GetPosition(this));

var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp").
Select(r => r.EventArgs.GetPosition(this));

mouseDown.Zip(mouseUp,(a,b)=>new{P1=a,P2=b}).
Subscribe((v) =>
               {
                   start = v.P1;
                   end = v.P2;
                   InvalidateVisual();
               })

Dlaczego nie CombineLatest? Jak wiemy, Zip czeka na drugą wartość. W praktyce oznacza to, gdy zostanie wywołane zdarzenie mouseDown, Zip będzie czekał aż do momentu gdy MouseUp jest wywołane. CombineLatest po prostu użyłoby ostatniego dostępnego zdarzenia (a nie tego po MouseDown), co oczywiście jest całkowicie błędne.

Nie jest jeszcze to idealne rozwiązanie.  Przede wszystkim, gdy użytkownik zwolni myszkę za oknem, wtedy następna narysowana linia będzie nieprawidłowa.  Powrócimy do tego na końcu wpisu ponieważ będzie potrzebne użycie funkcji TakeUntil oraz Repeat. Kolejnym krokiem jest zaimplementowanie rysowania w taki sposób, że linia jest  renderowana już w momencie naciśnięcia przycisku. Innymi słowy, najpierw chcemy czekać na zdarzenie MouseDown, następnie połączyć je z aktualną pozycją kursora, a na końcu zakończyć zapytanie gdy zostanie wywołany MouseUp:

var mouseDown =
      Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown").
      Select(r => r.EventArgs.GetPosition(this));
  
  var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp").
      Select(r => r.EventArgs.GetPosition(this));

  var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
      Select(r => r.EventArgs.GetPosition(this));


  mouseDown.CombineLatest(mouseMove, (a, b) => new { P1 = a, P2 = b }).TakeUntil(mouseUp).Repeat().
      Subscribe((a) =>
                    {
                        start = a.P1;
                        end = a.P2;
                        InvalidateVisual();
                    });

CombineLatest łączy zdarzenie zawierające pozycje kursora w momencie naciśnięcia przycisku (MouseDown,a) oraz również gdy użytkownik rusza myszą (MouseMove,b). Następnie chcemy te wartości przetwarzać aż do momentu MouseUp. Funkcja TakeUntil publikuje wartości aż do momentu zdarzenia przekazanego jako parametr – w tym przypadku jest to MouseUp. W momencie gdy MouseUp zostanie wywołany, wtedy zostaje wykonane OnCompleted i źródło danych kończy generowanie elementów. Oczywiście jedna sekwencja MouseDown, MouseMove, MouseUp nie jest wystarczająca i dlatego na końcu wywołujemy Repeat. Powoduje to, że cała sekwencja będzie powtarzana nieskończoną liczbę razy. Jeśli TakeUntil wciąż nie jest zrozumiały, proszę przeanalizować poniższy kod:

var cancelEvent = Observable.FromEventPattern(this,”Cancel”);

var query = readDataSource.TakeUntil(cancelEvent);

Źródło readDataSource będzie generowało elementy aż do momentu przyjścia zdarzenia Cancel. TakeUnti zachowuje się po prostu jak zawór, który jest zakręcany gdy inne źródło wygeneruje element.

Na zakończenie, powróćmy do poprzedniego problemu o rysowaniu linii pomiędzy mouseDown oraz MouseUp. Mając już wiedzę o Repeat oraz TakeUntil możemy napisać:

var mouseDown =
    Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown").
    Select(r => r.EventArgs.GetPosition(this));

var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp").
    Select(r => r.EventArgs.GetPosition(this));

mouseDown.
    TakeUntil(mouseUp).
    CombineLatest(mouseUp,(a,b)=>new{P1=a,P2=b}).
    Take(1).
    Repeat().
    Subscribe((a) =>
                {
                    start = a.P1;
                    end = a.P2;
                    InvalidateVisual();
                });

Po co nam Take(1)? Bez tego CombineLatest nigdy nie zakończy działania – będzie czekał na nowe elementy. Take(1) bierze po prostu pierwszą parę punktów i powoduje opublikowanie danych.

Reactive Extensions: Scalanie źródeł danych

W dzisiejszym wpisie znów wracamy do tematu RX. Postaram się wyjaśnić jak można ze sobą połączyć kilka IObservable. W RX istnieje naprawdę wiele metod umożliwiających wykonanie tego i na początku może wydawać się to dość skomplikowane, ze względu na liczbę sposobów w jaki można to wykonać.

1. Observable.Amb – zawsze zwraca wyłącznie tą sekwencje, która została jako pierwsza wygenerowana. Jeśli zatem mamy MouseMove i MouseUp wtedy zostanie zwrócone te zdarzenie, które jako pierwsze miało miejsce. Przykład:

internal class Program
{
   public static void Main()
   {
       var fasterSeq = Observable.Interval(TimeSpan.FromSeconds(1));
       var slowerSeq = Observable.Interval(TimeSpan.FromSeconds(5));

       var result = slowerSeq.Amb(fasterSeq);
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }  
}

2. Observable.Merge – scala ze sobą elementy (pary) z dwóch źródeł. Jeśli zatem pierwsza sekwencja ma wartości A,B,C,D a druga 1,2,3,4,5,6 to na wyjściu będzie A,1,B,2,C,3,D,4,5,6. Przykład:

public static void Main()
{
  var a = new []{"A", "B", "C", "D"}.ToObservable();
  var b = new[] {"1", "2", "3", "4", "5", "6"}.ToObservable();

  var result = a.Merge(b);
  result.Subscribe(Console.WriteLine);
  Console.ReadLine();
} 

3. Observable.Concat – na pierwszy rzut oka wygląda podobnie jak Observable.Merge. Jeśli scalane są źródła A,B,C,D z 1,2,3,4,5,6 na wyjściu będzie A,B,C,D,1,2,3,4,5,6. Najpierw zatem zostanie wyświetlona pierwsza sekwencja a potem druga. Przykład:

internal class Program
{
   public static void Main()
   {
       var a = new []{"A", "B", "C", "D"}.ToObservable();
       var b = new[] {"1", "2", "3", "4", "5", "6"}.ToObservable();

       var result = a.Concat(b);
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }  
}

Jakie to ma znaczenie w praktyce? Zależy od typu źródła. W powyższych przykładach nie ma większej różnicy ponieważ wszystkie IObservable zawsze zwrócą taki sam zbiór danych, niezależny od aktualnego czasu. Jeśli oczywiście generowane dane zależałby od czasu (np. zdarzenia) wtedy wynik mógłby zawierać inne elementy. Ponadto, w przypadku Concat może okazać się przydatna metoda SubscribeOn, o której będę pisał w innym poście, poświęconym synchronizacji w RX. W skrócie pisząc, gdy korzystamy z Concat, najpierw RX podpina się do źródła A, generuje wszystkie elementy a dopiero potem dokonuje subskrypcji do B. Z tego wynika, że źródło B dziedziczy kontekst (wątek) po A ponieważ, to w A wywoływany jest Subscribe do B. Jeśli to jest niezrozumiałe, to w jednym z następnych postów zostanie to wyjaśnione w szczegółach…

4. Observable.Zip – ten typ scalania trochę różni się od powyższych. Wszystkie przedstawione wcześniej sposoby, polegały na zwróceniu IObservable zawierającego ten sam typ. Jeśli zatem scalano dwa źródła danych zawierające po 2 elementy, wynikiem była taka sama sekwencja ale zawierająca po prostu 4 elementy (4 powiadomienia po subskrypcji). Obserbable.Zip polega na łączeniu w pary elementów z A i B. Oczywiście nic nie stoi na przeszkodzie aby elementy były innych typów. Jeśli zatem mamy “A”,”B”,”C”,”D” oraz 1,2,4 wtedy możemy to połączyć w nową sekwencje, zawierającą również 4 elementy, ale będącą po prostu rezultatem połączenia dwóch IObservable np.:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.Zip(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine);
       Console.ReadLine();
   }
}

Na ekranie wyświetli się A:1,B:2,C:3,D:4. Co w przypadku jednak, gdy sekwencja A ma więcej elementów niż B? Innymi słowy, jak zachowa się Zip, jeśli nie można dopasować pary ponieważ brakuje odpowiadającego elementu w drugim źródle danych:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D","E"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.Zip(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine,()=>Console.WriteLine("Koniec"));
       Console.ReadLine();
   }
}

Na ekranie zostanie wyświetlony napis "Koniec” ponieważ, element “E” nie ma odpowiadającego elementu. Observable.Zip po prostu łączy pary po indeksach. Zip czeka aż drugie źródło wygeneruje odpowiedni element. W powyższym przypadku b nie wygeneruje tego elementu a zakończy swoje działanie przez OnCompleted – w takiej sytuacji Zip kończy również wykonywanie. Jeśli element “E” zostałby wygenerowany w 10 sekundzie, a wartość 5 w 20 sekundzie, wtedy Zip czekałby te 10 sekund na wartość. 5. W momencie jednak gdy b wywołał OnCompleted, całość kończy działanie.

5. Observable.CombineLatest – analogiczne do Zip, z tym, że nie czeka na odpowiadający element. Zawsze bierze ostatni dostępny. W powyższym przykładzie, E po prostu zostanie połączone z 4:

internal class Program
{
   public static void Main()
   {
       var a = new[] {"A", "B", "C", "D","E"}.ToObservable();
       var b = new[] {1, 2, 3, 4}.ToObservable();

       var result = a.CombineLatest(b, (first, second) => string.Format("{0}:{1}", first, second));
       result.Subscribe(Console.WriteLine,()=>Console.WriteLine("Koniec"));
       Console.ReadLine();
   }
}

Code Review: Garbage Collector a zmienne lokalne w metodach

Co wyświetli poniższy fragment kodu?

internal class Program
{
   public static void Main()
   {            
       var timer = new Timer(TimerCallback, null, 0, 1000);            
       Console.ReadLine();
   }
   private static void TimerCallback(Object o)
   {
       Console.WriteLine("Callback: " + DateTime.Now);
   }
}

Powyższy konstruktor uruchamia timer i spodziewalibyśmy się, że na ekranie po prostu będą wyświetlane kolejne callbacki. W praktyce jednak dokonywana jest pewna optymalizacja, która ma fatalne efekty. Można zauważyć, że zmienna timer nie jest nigdzie tak naprawdę wykorzystywana. Z tego względu, powyższy kod można zapisać również jako (po optymalizacji w release):

internal class Program
{
   public static void Main()
   {            
       new Timer(TimerCallback, null, 0, 1000);            
       Console.ReadLine();
   }
   private static void TimerCallback(Object o)
   {
       Console.WriteLine("Callback: " + DateTime.Now);
   }
}

Jakie to przyniesie skutki? GC może zwolnić pamięć przeznaczoną na timer ponieważ nie ma do niej referencji. W wyniku czego, TimerCallback nie będzie wywoływany. Można to zaobserwować uruchamiając poniższy kod w release:

internal class Program
{
   public static void Main()
   {            
       var timer = new Timer(TimerCallback, null, 0, 1000);            
       Console.ReadLine();
   }
   private static void TimerCallback(Object o)
   {
       Console.WriteLine("Callback: " + DateTime.Now);
       GC.Collect();
   }
}

W trybie debug co sekundę będzie wyświetlany komunikat. W trybie release, ze względu na optymalizacje opisane wyżej, zostanie wyświetlony tylko raz TimerCallback. Oczywiście jeśli mamy pecha może ogóle nie zostać wyświetlony. W jaki sposób napisać powyższy kod, który działa dobrze nawet w trybie release?

Można oczywiście zadeklarować zmienną jako pole klasy:

internal class Program
{
   private static Timer _timer;

   public static void Main()
   {
       _timer = new Timer(TimerCallback, null, 0, 1000);
       Console.ReadLine();
   }

   private static void TimerCallback(Object o)
   {
       Console.WriteLine("Callback: " + DateTime.Now);
       GC.Collect();
   }
}

Dużo lepiej w tym przypadku jest jednak wywołanie dispose po ReadLine:

internal class Program
{
   public static void Main()
   {            
       var timer = new Timer(TimerCallback, null, 0, 1000);            
       Console.ReadLine();
       timer.Dispose();
   }
   private static void TimerCallback(Object o)
   {
       Console.WriteLine("Callback: " + DateTime.Now);
       GC.Collect();
   }
}

W tym przypadku nie zostanie dokonana optymalizacja ponieważ zmienna jest wykorzystywana. Analogiczne rozwiązanie to:

internal class Program
{
   public static void Main()
   {            
       using(new Timer(TimerCallback, null, 0, 1000))
       {
           Console.ReadLine();
       }
   }
   private static void TimerCallback(Object o)
   {
       Console.WriteLine("Callback: " + DateTime.Now);
       GC.Collect();
   }
}

GC może zwolnić obiekt, nawet jeśli wykonuje jego metodę. Trzeba zdać sobie sprawę, że obiekt może zostać “zebrany” w momencie gdy jest niepotrzebny. GC i JIT współpracują ze sobą i w trakcie wykonywania metody, w dowolnym momencie może zostać zmieniony stos, usuwając jakąś nieużywaną referencje. Na przykład:

class SampleClass
{
    ~SampleClass()
   {
       Console.WriteLine("Obiekt jest usuwany.");
   }
   public void Print()
   {
       GC.Collect();
       GC.WaitForFullGCComplete();
       GC.WaitForPendingFinalizers();
       Console.WriteLine("Test");
   }
}
internal class Program
{
   public static void Main()
   {
       SampleClass sampleClass=new SampleClass();            
       sampleClass.Print();
   }  
}

Spodziewalibyśmy się na wyjściu najpierw “Test” a potem dopiero “Obiekt jest usuwany”. Proszę jednak zauważyć, że wskaźnik this w metodzie Print nie jest nigdzie używany i już przed linią Console.WriteLine(“test”) może SampleClass zostać zwolniony. Ostatni moment kiedy jest wymagany dostęp do SampleClass jest w momencie wywołania metody Print. Jednakże gdyby Print używał this w Console.WriteLine wtedy na wyjściu pokaże się najpierw “Test” a potem dopiero “Obiekt jest usuwany”:

class SampleClass
{
    ~SampleClass()
   {
       Console.WriteLine("Obiekt jest usuwany.");
   }
   public void Print()
   {
       GC.Collect();
       GC.WaitForFullGCComplete();
       GC.WaitForPendingFinalizers();
       Console.WriteLine("Test {0}",GetType());
   }
}
internal class Program
{
   public static void Main()
   {
       SampleClass sampleClass=new SampleClass();            
       sampleClass.Print();
   }  
}

Zaskakujące! Może wydawać się to dziwne ale jeszcze raz powtórzę: stos może być zmieniany w każdym momencie dzięki dokonywanym optymalizacjom. Jeśli tylko dany obiekt jest nieużywany, może on zostać wtedy usunięty nawet gdy wywoływana jest na nim w danym momencie metoda!

ASP.NET MVC – przekierowanie do powrotnej strony, luka w bezpieczeństwie

Ciekawostka: http://www.asp.net/mvc/tutorials/security/preventing-open-redirection-attacks

W ASP.NET MVC 1.0 oraz 2.0 istnieje pewna luka, umożliwiająca przeprowadzanie ataku phishing. Zwykle returnUrl w QueryString zawiera powrotny adres, ale można go zmienić na dowolny a ASP.NET MVC 1.0 i 2.0 nie sprawdza czy link należy do tej samej domeny. Więcej informacji w powyższym linku.

Reactive Extensions– Observable.FromAsyncPattern, dalsza część przykładu

W poprzednim poście pokazałem jak korzystać z funkcji FromAsyncPattern na przykładzie usługi sieciowej. Dzisiaj zaprezentuję kilka dodatkowych funkcji. Najpierw zdefiniujmy co chcemy uzyskać:

  1. Użytkownik może wpisać szukaną frazę w pole edycyjne.
  2. Usługa sieciowa ma za zadanie wyszukanie fraz wpisanych w pole zdefiniowane w punkcie 1.
  3. Wyłącznie frazy dłuższe niż 3 znaki mają być przetwarzane.
  4. Jeśli użytkownik wpisze dwa razy tą samą frazę to tylko pierwsza ma zostać wysłana do usługi (optymalizacja).
  5. Zdarzenie TextChanged jest wywoływane dla każdego wpisanego znaku. Z tego względu, poprzednie rozwiązanie wysyłało dużo niepotrzebnych zapytań. W celu optymalizacji wprowadzimy opóźnienie takie jakie jest np. w Intellisense – propozycje nie są wyświetlane za każdym razem gdy jest wpisywany pojedynczy znak ale dopiero gdy użytkownik chwilę odczeka.

Klasyczne rozwiązanie problemu, bez użycia RX mogłoby wyglądać następująco:

public partial class MainWindow : Window
{
   private string _previousText;
   private DateTime _lastRead;
   private const long MinimalTimeBetweenRequests = 1000*5;

   public MainWindow()
   {
       InitializeComponent();
   }
   private void TextBoxTextChanged(object sender, TextChangedEventArgs e)
   {
       TimeSpan timeDiff = DateTime.Now - _lastRead;
       if (termTextBox.Text.Length >= 3 && _previousText != termTextBox.Text && timeDiff.TotalMilliseconds > MinimalTimeBetweenRequests)
       {
           DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap");
           _lastRead = DateTime.Now;
           client.BeginMatchInDict("wn", termTextBox.Text, "prefix", SearchCallback, client);
       }
       _previousText = termTextBox.Text;
   }
   private void SearchCallback(IAsyncResult result)
   {
       var client = (DictServiceSoapClient) result.AsyncState;
       DictionaryWord[] words = client.EndMatchInDict(result);            
       Dispatcher.Invoke(new Action(() => UpdateResults(words)));
   }
   private void UpdateResults(IEnumerable<DictionaryWord> words)
   {
       StringBuilder resultsBuilder = new StringBuilder();
       foreach (var word in words)
       {
           resultsBuilder.Append(word.Word);
           resultsBuilder.AppendLine();
       }
       results.Text = resultsBuilder.ToString();
   }
}

Szczególnie nie lubię callback’ów i definiowania pól w klasie, które naprawdę wykorzystywane są tylko w jednym miejscu.  Należy dążyć do sytuacji gdzie jest jak najmniej pól – upraszcza to czytanie kodu. Jeśli to tylko możliwe lepiej korzystać z zasobów lokalnych.

RX znacząco uprości powyższy problem. Najpierw warto jednak przyjrzeć się następującym metodom:

  1. Throttle – tłumi przetwarzanie danych. Dzięki temu łatwo zrealizować wymaganie numer 5. Wystarczy wywołać tą metodę z odpowiednim parametrem (czas) a RX zajmie się resztą.
  2. DistinctUntilChanged – zwraca wyłącznie jednorazowe, unikatowe wartości. Jeśli zdarzenie TextChanged generuje “Piotr”,”Paweł’,”Paweł na wyjściu pojawi się “Piotr”,”Paweł”. Z kolei jeśli zdarzenia dostarczają “Piotr”,”Paweł”,”Piotr” na wyjściu ukażę się identyczna sekwencja. Z tego względu DistinctUntilChanged nie jest tym samym co Distinct.

Zatem rozwiązanie za pomocą RX może wyglądać następująco:

public partial class MainWindow : Window
{
   public MainWindow()
   {
       InitializeComponent();

       DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap");

       var dictionarySource =
           Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>(client.BeginMatchInDict,
                                                                                 client.EndMatchInDict);
       var inputSource =
           Observable.FromEventPattern<TextChangedEventArgs>(termTextBox, "TextChanged").Select(i =>
                                                                                                ((TextBox) i.Sender)
                                                                                                    .Text);

       inputSource.Throttle(TimeSpan.FromSeconds(1)).
                   DistinctUntilChanged().
                   Where(input => input.Length >= 3).
                   SelectMany(input => dictionarySource("wn", input, "prefix")).
                   ObserveOn(SynchronizationContext.Current).
                   Subscribe(UpdateResults, () => MessageBox.Show("Completed"));
   }
   private void UpdateResults(IEnumerable<DictionaryWord> words)
   {
       StringBuilder resultsBuilder = new StringBuilder();
       foreach (var word in words)
       {
           resultsBuilder.Append(word.Word);
           resultsBuilder.AppendLine();
       }
       results.Text = resultsBuilder.ToString();
   }
}

Dzięki RX programista może opisywać swoją intencję za pomocą jednego zdania. Taki kod jest czytelniejszy ponieważ nie trzeba skakać z jednego miejsca do drugiego – wszystko jest zawarte w tym zdaniu.

Reactive Extensions–Observable.FromAsyncPattern

W wcześniejszych wersjach .NET Framework metody asynchroniczne implementujące wzorzec Begin\End były bardzo powszechne w użyciu. RX posiada metody pomocnicze, umożliwiające konwersję asynchronicznego źródła danych do IObservable.

Rozważmy to na przykładzie. Załóżmy, że mamy web service, zawierający jakieś metody. Można oczywiście dla takiego serwisu wygenerować asynchroniczne metody. Jeśli chcecie popraktykować możecie skorzystać z tej, darmowej usługi:

http://services.aonaware.com/DictService/DictService.asmx

Oczywiście dodając w Visual Studio referencję do usługi należy zaznaczyć pole Generate Asynchronous operations:

image

 

Zostaną m.in. wygenerowane metody BeginMatchInDict oraz EndMatchInDict. Powyższa usługa pozwala na wyszukiwanie słów w słowniku. MatchInDict posiada kilka parametrów przyjmujących np. słowo kluczowe, typ wyszukiwania czy typ słownika. Chcąc z nich skorzystać bez użycia RX należy zdefiniować własny callback:

private void TextBoxTextChanged(object sender, TextChangedEventArgs e)
{  
    DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap"); 
    client.BeginMatchInDict("wn", termTextBox.Text, "prefix", SearchCallback, client);  
}
private void SearchCallback(IAsyncResult result)
{
    var client = (DictServiceSoapClient) result.AsyncState;
    DictionaryWord[] words = client.EndMatchInDict(result);            
    UpdateResults(words);
}

Za pomocą RX można ten sam efekt uzyskać korzystając z Observable.FromAsyncPattern:

DictServiceSoapClient client = new DictServiceSoapClient("DictServiceSoap");

var dictionarySource =
Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>(client.BeginMatchInDict,
                                                             client.EndMatchInDict);




var inputSource =
Observable.FromEventPattern<TextChangedEventArgs>(termTextBox, "TextChanged").Select(i =>
                                                                            ((TextBox) i.Sender)
                                                                                .Text);

inputSource.SelectMany(input => dictionarySource("wn", input, "prefix")).
ObserveOn(SynchronizationContext.Current).Subscribe(UpdateResults);

Na początku tworzymy dwa źródła danych. Jedno dla metod asynchronicznych drugie dla zdarzenia TextChanged obiektu TextBox, w którym użytkownik może wpisać szukaną frazę. Następnie za pomocą LINQ, przechodzimy przez każdy element inputSource (czyli przez każde wpisane słowo) i i wywołujemy dictionarySource. Wynikiem będzie oczywiście lista zawierająca szukane słowa. W następnym poście rozwinę trochę powyższy przykład aby pokazać, że posiadanie IObservable ułatwia znacząco przetwarzanie danych.

Reactive Extensions: Observable.Buffer

W ostatnim poście pisałem o konwersji zdarzeń .NET do RX. Dziś chciałbym zaprezentować przydatną funkcję, dostępną w rozszerzeniach Observable – Buffer. Służy ona do podzielenia danych w bufory. Załóżmy, że mamy źródło, które generuje dane co 1 sekundę. W każdej sekundzie zatem otrzymujemy jedno powiadomione OnNext. Co w przypadku jednak gdybyśmy chcieli dostawać w każdym powiadomieniu kilka wartości (tablicę elementów) ? Na przykład zamiast 10 OnNext, chcemy dwa powiadomienia, w którym każde zawiera listę 5 elementów.

Do tego właśnie służy Buffer: grupowania kilku powiadomień w jedną kolekcję.

Załóżmy, że mamy następujące źródło danych:

var source=Observable.Interval(TimeSpan.FromSeconds(1));

Co jedną sekundę tworzymy następny element. Gdybyśmy teraz dokonali subskrypcji, otrzymywalibyśmy OnNext dla każdej wartości od tzn.: 1,2,3,4…n. Załóżmy, że w jednym powiadomieniu chcemy mieć 5 liczb:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(5);
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

Jako parametr Buffer, podajemy liczbę elementów w buforze. W taki sposób, pierwszy OnNext zostanie wywołany dopiero w momencie gdy 5 elementów będzie gotowych. Po uruchomieniu na wyjściu zobaczymy:

image

Bez buforu oczywiście sprawa wyglądałby następująco:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1));
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(long value)
   {
       Console.WriteLine(value);
   }
}

image

Oprócz liczby elementów w jednym buforze można wskazać ile elementów ma być omijanych. Najlepiej to zrozumieć na przykładzie:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(count:5,skip:2);
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

image

W pierwszym buforze mamy zatem wartości 0,1,2,3,4 (nic tutaj nie zmienia się), z kolei drugi, zamiast od 5 zaczyna się od 2 – dwie pierwszy wartości zostały ominięte. Jak widać, domyślnie parametr skip przyjmuje wartość rozmiaru bufora.

Oprócz buforów iloościowych, można tworzyć czasowe:

internal class Program
{
   private static void Main(string[] args)
   {
       var source = Observable.Interval(TimeSpan.FromSeconds(1)).
           Buffer(TimeSpan.FromSeconds(5));
       source.Subscribe(OnNext);

       Console.ReadLine();

   }
   private static void OnNext(IList<long> values)
   {
       Console.WriteLine("Nowy bufer");
       foreach (var value in values)
           Console.WriteLine(value);
   }
}

image

Elementy są generowane co jedną sekundę, z kolei buforowane co 5 sekund. Wynika z tego, że każdy bufor będzie średnio zawierał po 5 elementów. Innymi słowy, po 5 sekundach dopiero zostanie wywołany OnNext, z kolekcją elementów, które zdążyły się wygenerować podczas tych 5 sekund.

Na zakończenie dzisiejszego wpisu małe zadanie. Spróbujmy napisać program, który zlicza liczbę zdarzeń MouseMove w ciągu jednej sekundy.  Na podstawie dwóch ostatnich postów, można napisać:

 var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
                Buffer(TimeSpan.FromSeconds(1)).
                Select(e => e.Count).
                ObserveOn(SynchronizationContext.Current).
                Subscribe(count => Title = count.ToString());

FromEventPattern konwertuje zdarzenie MouseMove do źródła danych RX. Następnie buforujemy elementy co sekundę. Wynika z tego, że pojedynczy bufor zawiera wszystkie zdarzenia jakie przytrafiły się w przeciągu jednej sekundy. Następnie za pomocą Select wybieramy liczbę elementów w buforze czyli liczbę zdarzeń w ciągu jednej sekundy. ObserveOn służy do synchronizacji – o tym w następnych wpisach. W skrócie, musimy skorzystać z ObserveOn aby nie mieć problemów z wyjątkiem cross-thread operation (aktualizacja interfejsu użytkownika z innego wątku). Następnie w Subscribe możemy wyświetlić liczbę zdarzeń na sekundę.

Do następnego wpisu!

Reactive Extensions: Konwersja zdarzeń .NET do RX

Po przeczytaniu poprzednich wpisów, zasada działania i zastosowanie RX powinny być już jasne. Jeśli tak nie jest, koniecznie zachęcam do przeczytania tamtych postów. Jak wspomniałem, RX to ujednolicony model, umożliwiający korzystanie z kolekcji typu “push-based” w jednakowy sposób. Dzisiaj pokażę jak sprawa wygląda dla zdarzeń czyli jak skonwertować EventHandler do IObervable.

Oczywiście kluczem do rozwiązania jest klasa Observable, która zawiera mnóstwo rozszerzeń dla interfejsów IObservable\IObserver. Znajduje się tam również metoda FromEventPattern, odpowiedzialna właśnie za konwersje zdarzenia do IObservable. Istnieje kilka sygnatur i używamy ich w zależności od tego jaki sposób bardziej odpowiada nam (od strony dobrych praktyk):

// I
var inputSource =
Observable.FromEventPattern<TextChangedEventArgs>(termTextBox, "TextChanged");

// II
var movingEvents = 
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => this.MouseMove += h, h => this.MouseMove -= h); 

Sposób pierwszy polega na przekazaniu instancji obiektu zawierającego zdarzenie i nazwy tego zdarzenia w formie string. Druga sygnatura przyjmuje delegaty odpowiedzialne za dodanie oraz usunięcie zdarzenia.  Spróbujmy więc napisać pierwszy kod, który coś robi. Załóżmy, że mamy pole edycyjne w aplikacji WPF i chcemy wyświetlać komunikat, gdy wprowadzony tekst jest dłuższy niż 5 znaków:

var eventSource = Observable.
                  FromEventPattern<TextChangedEventArgs>(textbox1, "TextChanged").
                  Select(e => ((TextBox) e.Sender).Text).
                  Where(text => text.Length > 5);

eventSource.Subscribe(text=>MessageBox.Show(text));

Wywołanie FromEventPattern zwraca IObservable dla zdarzenia o nazwie TextChanged. Następnie za pomocą LINQ zwracamy tylko tekst – nie potrzebujemy informacji o Sender czy EventArgs. Za pomocą Where nakładamy filtr. Proszę zauważyć, że w taki sposób korzystanie z zdarzeń niczym nie różni się od zwykłej kolekcji. Nie musimy martwić się już o buforowanie poprzednich elementów ponieważ traktujemy serie zdarzeń jak najzwyklejszą kolekcję danych.

Kolejnym przykładem może być rysowanie linii. W dzisiejszym poście ograniczymy się do narysowania linii od punktu (0,0) do aktualnej pozycji kursora, pod warunkiem, że lewy przycisk myszy jest wciśnięty. W tym celu stworzono kontrolkę dziedziczącą po Canvas:

public class MyCanvas : Canvas
{
   private Point _endPoint;

   public MyCanvas()
   {
       var eventsSource =
               Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove").
               Where(e => e.EventArgs.LeftButton == MouseButtonState.Pressed).
               Select(e => e.EventArgs.GetPosition(this));

       eventsSource.Subscribe(pos =>
                                  {
                                      _endPoint = pos;
                                      InvalidateVisual();
                                  });
   }     
   protected override void OnRender(DrawingContext dc)
   {
       base.OnRender(dc);
       dc.DrawLine(new Pen(Brushes.Black, 1), new Point(), _endPoint);
   }
}

Efekt końcowy:

image

Miałem zamiar w dzisiejszym wpisie pokazać bardziej praktyczny mechanizm drag&drop w RX ale stwierdziłem, że musi najpierw powstać jeszcze jeden post o funkcjach służących do scalania dwóch różnych źródeł. W ten sposób będziemy mogli połączyć MouseDown z MouseMove i na końcu znów z MouseUp.

Oprócz łatwej możliwości dostępu do dowolnych zdarzeń (brak konieczności buforowania), bardzo lubię fakt, że Subscribe zwraca tak naprawdę IDisposable:

using(eventsSource.Subscribe(pos =>
                             {
                                 _endPoint = pos;
                                 InvalidateVisual();
                             }))
{
      // zdarzenie podlaczone
}
  // tutaj zdarzenie jest juz odlaczone poniewaz nastapilo wywolanie IDisposable.Dispose()

Jak widać, łatwo sprzątać po sobie zasoby. Zdarzenia są zdecydowanie w .NET zbyt trudne do usuwania i często powodują memory leak. Pisałem już kiedyś o tzw. weak event pattern, ale osobiście preferuje RX. To nie koniec przygody ze zdarzeniami w RX – za kilka postów przedstawię kolejny mechanizm, przeznaczony wyłącznie dla zdarzeń.

Warto pamiętać, że RX dostarcza oprócz OnNext, takie metody jak OnCompleted oraz OnEror co ułatwia obsługę błędów. Dzięki LINQ łatwo tworzyć skomplikowane zapytania czy transformacje typu scalenie dwóch zdarzeń (o tym w następnym wpisie).