Category Archives: Patterns & Practices

Code review: Async void

Zaczynamy od razu od kodu:

    class Foo
    {
        public async void DoAsync()
        {
            await Task.Factory.StartNew(() =>
            {
                Thread.Sleep(2000);
                Console.WriteLine("DoAsync...");
            });
        }
    }

Dlaczego powyższy kod jest bardzo niebezpieczny? Nigdy nie należy używać async w połączeniu z void.
Przede wszystkim, użytkownik takiego kodu nie ma możliwości kontroli nad stworzonym wątkiem. Nie wiadomo, kiedy powyższa metoda zakończy się. Wywołanie będzie zatem wyglądać następująco:

    class Program
    {
        static void Main(string[] args)
        {
            Foo foo = new Foo();
            foo.DoAsync();

            Console.WriteLine("End");
            Console.ReadLine();
        }
    }

DoAsync tworzy nowy wątek i wykonuje jakiś kod w osobnym wątku. Ze względu na to, że metoda nie zwraca wątku, nie można użyć await lub Wait(). Jedynie możemy domyślać się, że po pewnym czasie zadanie zostanie wykonane.

Kolejnym problemem jest brak możliwości przetestowania takiej metody. Skoro niemożliwe jest wyznaczenie momentu zakończenia zadania, napisanie poprawnego i stabilnego testu również nie jest łatwe.

Jeszcze większe problemy będziemy mieli w momencie wyrzucenia wyjątku:

    class Foo
    {
        public async void DoAsync()
        {
            await Task.Factory.StartNew(() =>
            {
                Thread.Sleep(2000);
                throw new ArgumentException();
            });
        }
    }

Załóżmy, że kod klienta wygląda następująco:

        static void Main(string[] args)
        {
            Foo foo = new Foo();
            try
            {
                foo.DoAsync();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

            Console.WriteLine("End");
            Console.ReadLine();
        }

Powyższy kod nigdy nie złapie wyjątku. Ponieważ DoAsync tworzy nowy wątek i nie czeka na jego zakończenie, wyjątek zostanie wyrzucony bezpośrednio do SynchronizationContext. Wątek jest wyrzucany w losowe miejsce, gdzie nie mamy kontroli. Z tego wynika, że wyjątek spowoduje zakończenie procesu. Metoda async void, zatem może zakończyć działanie procesu (poprzez wyrzucenie wyjątku) bez możliwości jego wyłapania – to bardzo niebezpieczne.

Zdefiniujmy teraz klasę foo w prawidłowy sposób:

    class Foo
    {
        public async Task DoAsync()
        {
            await Task.Factory.StartNew(() =>
            {
                Thread.Sleep(2000);
                throw new ArgumentException();
            });
        }
    }

Dzięki temu, że zwracamy Task, możliwe jest użycie słowa kluczowego await:

       private static async Task TestAsync()
        {
            Foo foo = new Foo();
            try
            {
                await foo.DoAsync();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }

Wyjątek zostanie wyłapany tak jak tego spodziewamy się.
Prawie zawsze zatem należy zwracać Task albo Task<T> – nawet jeśli wersja synchroniczna była typu void. Wyjątek stanowią event handler’y. Wynika to z definicji delegaty EventHandler. Warto jednak zwrócić uwagę, że handler’ów nie testujemy bezpośrednio oraz nie ma sensu owijać ich w try-catch. Jakakolwiek obsługa błędów jest w wewnątrz handler’a. Z tego względu jest to dopuszczalne i bezpieczne.

Wyjątku nie stanowią testy jednostkowe. Poniższy test jest również błędny:

[Test]
public async void Should_...().
{
      await _sut.DoAsync();

      Assert....
}

Prawidłowy test to oczywiście:

[Test]
public async Task Should_...().
{
      await _sut.DoAsync();

      Assert....
}

Framework nUnit w pewnej wersji (nie pamiętaj dokładnie, możliwe, że w 2.6) wspierał async void. Autorzy musieli napisać sporo kodu, aby móc obsługiwać takie testy. Od wersji 3.0 jednak, ta funkcjonalność (która była trochę hack’iem) została usunięta. W wersji 3.0 wyłącznie testy async task są wspierane i mogą być uruchamiane.

Programowanie reaktywne

Zanim będę kontynuował serię o AKKA.NET, warto zapoznać się z podstawami programowania reaktywnego. Pozwoli to później zrozumieć, w jaki sposób AKKA.NET implementuje założenia programowania reaktywnego.
Dzisiaj zatem przedstawienię tzw. “The Reactive Manifesto”, którego pełną treść można znaleźć tutaj. Moim zdaniem jednak, manifest może wydawać się trochę skomplikowany i dlatego zdecydowałem się wyjaśnić to po swojemu.

Zastanówmy się po co nam kolejny “typ” programowania? Co jest złego z naszym starym, klasycznym podejściem klient-serwer? Jak nie trudno domyślić się, wymagania i oczekiwania dla dzisiejszych systemów są inne niż te 10 lat temu. Zamiast pojedynczych serwerów, mamy całe klastry. Przez ogólną dostępność komputerów (PC, Mobile itp.), przetwarzamy dane nie w gigabajtach ale w terabajtach. Model oprogramowania także zmienił się. Większość systemów stanowią usługi, a nie oprogramowanie desktopow’e, pakowane w kartony i sprzedawane w sklepach. Oczekiwania są, aby te usługi działały jak najdłużej bez żadnych usterek. Oczywiście ciężko dostarczyć oprogramowanie, które działa 100% czasu, ale możliwe jest uzyskanie niezawodności np. w 99% czasu. Dostępność różnych urządzeń dostępowych, typu tablet czy telefon komórkowy, powoduje, że użytkownicy nie chcą czekać kilku sekund na odpowiedź od serwera, a mieć rezultat w przeciągu milisekund.

Jak sama nazwa mówi, programowanie reaktywne, cechuje się, że poszczególne komponenty w odpowiedni sposób reagują (react). Na co zatem nasze systemy powinny reagować?

Zdarzenia
Przede wszystkim powinny reagować na zdarzenia. Systemy reaktywne opierają się na zdarzeniach, a nie na zapytaniach typu klient-serwer. W nServiceBus czy AKKA.NET mamy właśnie zdarzenia w postaci asynchronicznych wiadomości. Aktor wysyłając wiadomość do innego aktora, nie blokuje wykonywania procesu. Komponenty komunikujące się za pomocą zdarzeń są powiązane ze sobą w luźny sposób. Jeśli klasa A komunikuję się z klasą B, nie musimy przechowywać żadnej referencji z instancji A do B. Jedyną, luźną zależnością jest wiadomość\zdarzenie.

Dane, obciążenie
Systemy reaktywne muszą również reagować na dane, które przekazywane mogą być z różną intensywnością. Innymi słowy, system powinien być skalowalny. Powinien odpowiednio reagować na małą liczbę zapytań, jak i bardzo dużą. W przypadku niewystarczających zasobów, system powinien zareagować poprzez skalowanie. Jednym z typów skalowania (tzw. scaling-up), jest użycie więcej pamięci czy rdzeni procesora, poprzez np. wielowątkowość. Czasami jednak taka skalowalność jest niewystarczająca i trzeba szukać zasobów na zewnątrz (tzw. scaling out). Zasoby na zewnątrz to oczywiście kolejny serwer dołączony do klastra.

Ogromną rolę w reagowaniu na obciążenie odgrywają wcześniej opisane zdarzenia. Skoro dwie klasy nie są ze sobą mocno powiązane, wtedy nie współdzielą ze sobą żadnego stanu. Co za tym idzie, nic nie stoi na przeszkodzie, aby jedna z klas była wykonywana na kompletnie innym komputerze. Jeśli cały stan zawarty jest w wiadomości, nie ma dla nas różnicy czy przekażemy go w tym samym procesie np. do innego wątku,czy wyślemy przez sieć do innego klastra. Taka właściwość nazywa się “location transparency”, ponieważ raz zaimplementowana logika, może być skalowana bez żadnych zmian w kodzie. AKKA.NET posiada tą właściwość. Implementując jakiś algorytm za pomocą aktorów, można część z nich umieścić na różnych komputerach (tzw. remote actor). Lokalizacja aktorów zatem nie ma znaczenia – mogą znajdować się w tym samym procesie albo na innych komputerach, a i tak zaimplementowany algorytm będzie miał taki sam kod.

Wyjątki i błędy
Systemy reaktywne powinny również reagować w odpowiedni sposób na wszelkie błędy. AKKA.NET posiada szereg strategii obsługi wyjątków o których napiszę w przyszłych postach. System jednak nie powinien przestawać działać w momencie, gdy mało istotny błąd miał miejsce. Dokonuje się tego np. poprzez izolację. Jeśli aktor A ma jakieś błędy, można go odseparować od reszty, aby nie popsuć stanu aplikacji. Innymi słowy, system powinien wrócić do działania w momencie wystąpienia błędu (failure receovery). Jeśli np. nie można połączyć się z jakąś usługą, można spróbować ponownie za kilka sekund, zamiast całkowicie anulować operację.
Aktorzy w AKKA.NET tworzą hierarchie. Jeśli jeden z aktorów ma błędy, jego rodzic decyduje, co z błędem należy zrobić. Można np. powtórzyć operację, zrestartować aktora lub przekazać odpowiedzialność do kolejnego aktora, który zdecyduje co należy zrobić z węzłami podrzędnymi.
Jak widzimy, dobrą obsługę błędów również uzyskujemy za pomocą wspomnianych zdarzeń. Izolacja czy replikacja nie byłoby możliwa bez nich. Za pomocą zdarzeń, można komunikować się między aktorami. Jeśli błąd wystąpił w klastrze A, można spróbować w innym klastrze. Stan nie jest współdzielony, zatem odseparowanie pojedynczego węzła jest łatwe i nie powoduje proliferacji problemu do innych aktorów\węzłów.

Responsywność
Ostatnią, najłatwiejszą w zrozumieniu cechą, jest responsywność. Systemy reaktywne powinny reagować na użytkowników. Jeśli operacja jest czasochłonna, należy powiadomić o tym użytkownika. Znowu uzyskujemy to za pomocą zdarzeń. Zamiast wysłania pojedynczego zapytania i czekania aż operacja wykona się, korzystamy z asynchronicznych wiadomości, które nie blokują wykonywania.
Oczywiście responsywność jest również uzyskana za pomocą skalowalności i poprawnej obsługi błędu. Jeśli jest duża liczba zapytań, dzięki skalowalności system nie powinien być powolny. Obsługa błędów, zagwarantuje, że użytkownik zawsze jest świadom co się dzieje i nie zastanie np. pustej strony, gdy operacja nie powiodła się.

Spójrzmy zatem na słynny diagram, który można znaleźć na stronie “The Reactive Manifesto” (źródło: http://www.reactivemanifesto.org:

“Message-driven” to wspomniane zdarzenia, wiadomości.
“Responsive” to oczywiście responsywność systemu.
“Resilent” to prawidłowa obsługa błędów. Innymi słowy, system powinien być elastyczny (resilent) na błędy i dostosowywać się do sytuacji.
“Elastic” to skalowalność. System powinien być na tyle elastyczny, aby obsługiwać zarówno małą liczbę zapytań jak i bardzo dużą.

Ponadto z oznaczeń na rysunku, możemy zaobserwować:
– Dzięki zdarzeniom uzyskujemy zarówno elastyczność\skalowalność (location transparency), responsywność (nie blokujemy wywołań) oraz Resilent (izolacja błędów, replikacja w innym klastrze).
– Skalowalność (elastic) oraz obsługa błędów (resilence) są ze sobą tak naprawdę powiązane. Gdyby nie skalowalność i location transparency, nie moglibyśmy na przykład wykonać operacji w innym klastrze. Gdyby, nie prawidłowa obsługą błędów, awaria w innych punkcie systemu, zniszczyła by wszystkie węzły, niwelując korzyści ze skalowalności.
– Responsywność nie byłaby możliwa dzięki skalowalności (wydajność), jak i prawidłowej obsługi błędów (nie zostawianie użytkownika bez odpowiedzi w przypadku błędów).

AKKA.NET – przełączanie stanów

Zanim przejdziemy do konkretnych problemów, musimy poznać przynajmniej podstawowe elementy AKKA.NET. W poprzednim wpisie opisałem jak definiować wiadomości oraz aktorów. Dzisiaj przejdziemy do kolejnego, bardzo ważnego elementu – przełączanie stanów. Jest to podstawowy element zarówno w modelu aktor, jak i w jakichkolwiek maszynach stanów (FSM). W poprzednim przykładzie przelewu środków z jednego konta na drugie, użyliśmy właśnie przełączania stanów. Dla przypomnienia:

class TransferActor
{
    public void OnTransferMessageReceived(TransferMessage transferMessage)
    {
        ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
         
        Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
    }
 
    public void AwaitFrom(string from, string to, int amount)
    {
        ActorsSystem.GetActor(to).Send(new DepositMessage(amount));
        Context.Became(AwaitTo(transferMessage.From, transferMessage.To, transferMessage.Amount));
    }
}

Po otrzymaniu zapytania o przesłaniu pieniędzy, przełączamy się w stan AwaitFrom. Po otrzymaniu potwierdzenia, składamy depozyt i znów czekamy na potwierdzenie.

W AKKA.NET zmiana stanu odbywa się za pomocą metody Become:

    public class SampleActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu I {0}",message);
            Become(GoToState2);
        }

        private void GoToState2(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu II {0}", message);
        }
    }

Po otrzymaniu pierwszej wiadomości, przełączamy się do stanu drugiego, reprezentowanego przez metodę GoToState2. Od tego momentu wszystkie wiadomości będą obsługiwane przez GoToState2, a nie OnReceive. Odpalmy zatem następujący kod:

            var system = ActorSystem.Create("JakasNazwa");
            var actor1 = system.ActorOf<SampleActor>();


            for (int i = 0; i < 10; i++)
            {
                actor1.Tell(i.ToString());
            }

Na ekranie zobaczymy najpierw tekst “Otrzymano wiadomosc ze stanu I”, a potem 9 razy “Otrzymano wiadomosc ze stanu II”.

Druga przydatna i alternatywna metoda to BecomeStacked. Podobnie jak Become służy do przełączania stanu. Tym razem jednak, stan będzie przechowywany na stosie, zatem będzie można go potem zdjąć i powrócić do poprzedniego. Przykład:

   public class SampleActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu I {0}",message);
            BecomeStacked(GoToState2);
        }

        private void GoToState2(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu II {0}", message);
            UnbecomeStacked();
        }
    }

Po odpaleniu, na zmianę będziemy mieć stan I oraz II. Oczywiście nie jesteśmy ograniczeni wyłącznie do dwóch stanów.

Znajdowanie brakującej biblioteki

Szukanie lokalizacji danej biblioteki może być skomplikowane. W zależności od skonfigurowanego binding’u, inne foldery są przeszukiwanie. Wyjątek jaki dostaniemy w przypadku braku jednej z bibliotek jest następujący:

Could not load file or assembly 'nazwa' or one of its dependencies. 
The system cannot find the file specified.

Problem stanowi druga część – “or one of its dependencies’. Wyjątek nie zawsze powie nam, której biblioteki brakuje nam. Jeszcze większe problemy natrafimy, gdy użyjemy natywnej referencji.

Ostatnio miałem problemy z pewnym kodem i użyłem programu Process Monitor. Aplikacja może nie jest zbyt intuicyjna, ale pozwala w dość szybki sposób prześledzić wszystkie wiązania i brakujące biblioteki.

Domyślnie pokazuje wszystkie procesy, zatem po jej odpaleniu, ustawiłem filtr pokazujący wyłącznie program, który chcę przeanalizować (w tym przypadku ConsoleApplication6.exe). Z menu głównego wybieramy Filter->Filter…:

filter1

Ponadto ustawiłem dwa dodatkowe filtry. Jeden, wyświetla wyłącznie zdarzenia, które w rezultacie mają “NAME NOT FOUND”. Drugi filtr wyświetli wyłącznie wpisy, w których ścieżka kończy się na “dll”. Przykład:

filter1

Dzięki temu, łatwiej będzie nam przeanalizować wszystkie zdarzenia, które mają miejsce w danym procesie. Bez tego, ciężko byłoby wszystko zrozumieć.

Po uruchomieniu ConsoleApplication6, Process Monitor powie nam, których bibliotek nie udało załadować się:

filter1

W tym przypadku (nie trudno domyślić się, sztucznie stworzonym) brakuje ClassLibrary2. W prawdziwych systemach, zwykle mamy bardzo wiele zależności i nie jest wtedy już to takie proste do zgadnięcia bez Process Monitor.

Wielowątkowość: przykład modelu aktor

W ostatnim wpisie przedstawiłem zasadę działania modelu aktor. Zachęcam do przeczytania poprzedniego wpisu ponieważ dzisiaj skupię się na przykładzie, a nie podstawach teoretycznych. Jeśli poprzedni wpis nie był do końca zrozumiały, zachęcam do przeanalizowania przykładu z tego wpisu i potem powrócenia do poprzedniego postu – wtedy myślę, że wiele zagadnień będzie prostsze w zrozumieniu.

Poniższe przykłady należy traktować jako pseudokod. Stanowią one szkic wzorca aktor, a nie jego implementację. Do implementacji w kolejnych wpisach będę używał akka.net, ale moim zdaniem najważniejsze jest zrozumienie zasad, a nie nauczenie się kolejnego framework’a. Z tego względu, bardziej będę skupiał się na rozwiązywaniu różnych problemów wielowątkowych (np.problem ucztujących filozofów), a nie dokumentacji API.

Załóżmy, że chcemy rozwiązać klasyczny problem przelewu pieniędzy z jednego konta na drugie. W najprostszej postaci, będziemy mieli następującą klasę:

     class BankAccount
     {
         private int _Balance;


         public void Deposit(int amount)
         {
             _Balance += amount;
         }

         public void Withdraw(int amount)
         {
             if (amount <= _Balance)
                 _Balance -= amount;
             else
                 throw new ArgumentOutOfRangeException();
         }
     }

Oczywiście powyższy kod nie jest thread-safe, dlatego należy użyć blokady:

     class BankAccount
     {
         private int _Balance;
         private object _sync=new object();

         public void Deposit(int amount)
         {
             lock(_sync)
             {
               _Balance += amount;
             }
         }

         public void Withdraw(int amount)
         {
             lock(_sync)
             {
                 if (amount <= _Balance)
                    _Balance -= amount;
                 else
                    throw new ArgumentOutOfRangeException();
             } 
         }
     }

Kolejne zadanie to przetransferowanie pieniędzy z jednego konta na drugie. Klasyczne, błędne rozwiązanie to:

lock(accountA)
{
   lock(accountB)
   {
        accountA.Withdraw(5);
        accountB.Deposit(5);
   }
}

Oczywiście powyższy kod zakończy się deadlock, jeśli w tym samym czasie będziemy chcieli przelać pieniądze z konta A do B oraz z B do A.  Prawidłowe rozwiązanie to np. sortowanie blokad, przedstawione tutaj.

Wróćmy jednak do wzorca aktor. Stanowi on po prostu wyższy poziom abstrakcji dla wątków. Z poprzedniego wpisu wiemy, że aktorzy komunikują się za pomocą wiadomości, tak jak np. instancje w systemie kolejkowym. Zdefiniujmy zatem dwie wiadomości, dla depozytu i wycofywania środków:

     public class DepositMessage
     {
         public int Amount { get; }

         public DepositMessage(int amount)
         {
             Amount = amount;
         }
     }

     public class WithdrawMessage
     {
         public int Amount { get; }

         public WithdrawMessage(int amount)
         {
             Amount = amount;
         }
     }

Proszę zauważyć, że są one immutable – zawsze chcemy uniknąć współdzielenia stanu między różnymi aktorami. Następnie aktor, będzie obsługiwał wiadomości w sposób asynchroniczny:

     class BankAccountActor
     {
         private int _balance;

         public void OnReceive(object message)
         {
             if (message is WithdrawMessage)
             {
                 var withdrawMessage = ((WithdrawMessage)message);
                 if (withdrawMessage.Amount <= _balance)
                     _balance -= withdrawMessage.Amount;
             }

             if (message is DepositMessage)
             {
                 _balance += ((DepositMessage)message).Amount;
             }
         }
     }

Metoda OnReceive będzie wywoływana przez framework, w momencie otrzymania konkretnej wiadomości. Jak wspomniałem, przypomina to klasyczny system kolejkowy, ale OnReceive zawsze MUSI być wykonywane jedno po drugim. Jeśli dwie wiadomości przyjdą w tym samym czasie, mamy zagwarantowane, że OnReceive nie będzie wykonywane równocześnie z dwóch różnych wątków. Obsługa zatem może wyglądać następująco:


while(true)
{
    var message = blockingCollection.Dequeue();
    actor.OnReceive(message);
}

Z tego względu, nie musimy umieszczać w tych metodach żadnych blokad (brak współdzielonego stanu).
Następnie chcemy mieć możliwość transferu środków z jednego konta do drugiego. Zdefiniujmy zatem kolejną wiadomość:

     class TransferMessage
     {
         public string From { get; }
         public string To { get; }
         public int Amount { get; }

         public TransferMessage(string from, string to, int amount)
         {
             From = @from;
             To = to;
             Amount = amount;
         }
     }

Aktorzy mogą tworzyć hierarchie, w której jeden aktor zarządza kolejnymi. W naszym przypadku będziemy mieli dwa typy aktorów: TransferMoneyActor oraz BankAccountActor. Pierwszy z nich służy do koordynowania przepływu środków.

Najpierw implementujemy obsługę wiadomości TransferMessage:

     class TransferActor
     {
         public void OnTransferMessageReceived(TransferMessage transferMessage)
         {
             ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
             
             Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
         }

W momencie otrzymania TransferMessage, zostanie wysłana wiadomość do aktora, który reprezentuje konto nadawcy. Pamiętajmy, że wszystkie operacje są asynchroniczne, zatem stanowią model “fire&forget”. TransferActor jednak musi dowiedzieć się, czy środki zostały prawidłowo zdjęte z konta nadawcy. Z tego względu, jedną z bardzo ważnych właściwości aktorów jest zmiana kontekstu. W powyższym przykładzie chcemy zmienić kontekst w tryb oczekiwania na odpowiedź od nadawcy. Służy zwykle do tego metoda “Become”. Aktor zatem staje się aktorem oczekującym na odpowiedź od nadawcy. Odpowiedź przyjdzie oczywiście w formie kolejnej wiadomości:

     class MoneyWithdrawn
     {
         public ActorRef ActorRef { get;  }
         public int Amount { get;  }

         public MoneyWithdrawn(ActorRef actorRef,int amount)
         {
             ActorRef = actorRef;
             Amount = amount;
         }
     }

Następnie w momencie potwierdzenia wycofania pieniędzy, możemy wysłać wiadomość w celu umieszczenia środków na innym koncie:

     class TransferActor
     {
         public void OnTransferMessageReceived(TransferMessage transferMessage)
         {
             ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
             
             Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
         }

         public void AwaitFrom(string from, string to, int amount)
         {
             ActorsSystem.GetActor(to).Send(new DepositMessage(amount));
             Context.Became(AwaitTo(transferMessage.From, transferMessage.To, transferMessage.Amount));
         }

Analogicznie, aktor przechodzi w kolejny stan, oczekiwania na potwierdzenie złożenia depozytu. Potwierdzenie przyjdzie w formie kolejnej wiadomości:

     class TransferActor
     {
         public void OnTransferMessageReceived(TransferMessage transferMessage)
         {
            ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
             
             Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
         }

         public void AwaitFrom(string from, string to, int amount)
         {
             ActorsSystem.GetActor(to).Send(new DepositMessage(amount));
             Context.Became(AwaitTo(transferMessage.From, transferMessage.To, transferMessage.Amount));
         }

         public void AwaitTo(string from, string to, int amount)
         {
             Context.Finished();
         }

Widzimy, że każda operacja jest atomowa (pod warunkiem, że przetwarzanie wiadomości nie jest współbieżne). To bardzo ważna cecha systemów opartych na aktorach – należy rozszerzać hierarchie o tyle poziomów, aby konkretne zadanie było łatwe w implementacji. Przez “łatwe” mam na myśli sytuację, w której nie musimy korzystać z blokad.

Model dla prostych problemów (takich jak powyższy) jest moim zdaniem zła praktyką i przykładem over-engineering’u. Dla bardziej skomplikowanych problemów, znacząco to ułatwia zapobiegnięcie zakleszczeniom. Tak jak wspomniałem, aktor to pewien poziom abstrakcji. Ta abstrakcja daje nam ogromne możliwości skalowania – od problemu rozwiązywanego współbieżnie na np. 4 procesorach do środowiska opartego na wielu komputerach połączonych w sieć. Jeśli aktor jest abstrakcyjny, nic nie stoi na przykładzie, aby umieścić go na osobnym komputerze i przesyłać wiadomości za pomocą TCP. Jak widać, można skalować rozwiązanie od jednego procesu po wiele usług webowych komunikujących się dowolnymi sposobami (HTTP, systemy kolejkowy, TCP itp.). Użycie prostej blokady jest dobre, ale nie posiada żadnej abstrakcji – ogranicza nas do jednego procesu.

Wielowątkowość: Wzorzec aktor (actor based programming)

W kolejnych wpisach chciałbym opisać framework akka.net. Zanim jednak przejdę do opisu API, warto poświęcić chwilę (myślę, że około dwa wpisy) na zasadę działania “actor model”.

Aktor jest modelem budowania aplikacji wielowątkowych. Powstał w celu ułatwienia synchronizacji między różnymi wątkami. Programiści piszący aplikacje wielowątkowe zwykle korzystają z klasycznych blokad (lock) w celu opisania sekcji krytycznej. W wielu sytuacjach jest to najlepszy i najprostszy sposób. Niestety dla dużych i skomplikowanych systemów, utrzymywanie takiego kodu jest bardzo trudne, mozolne i niezwykłe podatne na powstanie deadlock lub livelock.

Dzięki odpowiedniemu podziałowi kodu, można uniknąć powyższych problemów na poziomie projektu klas. Wspomniany aktor to nic innego jak klasa, która spełnia pewne wymagania:
– przechowuje stan (zawiera np. pola lub właściwości)
– implementuje logikę (zawiera zatem metody)
– jest reprezentowana przez wątek
– komunikuje się z innymi aktorami za pomocą asynchronicznych wiadomości
– wiadomości nie mogą być modyfikowalne (zatem są “immutable”).
– aktor może przetwarzać wyłącznie jedną wiadomość danym momencie – pozostałe są kolejkowanie
– stan aktora nie może być modyfikowany bezpośrednio przez zewnętrzne obiekty

Myślę, że to najważniejsze właściwości modelu. W świecie C#, aktor będzie zatem klasą wykonywaną na osobnym wątku albo zadaniu (task – TPL). Taka klasa nie będzie eksponowała setter’ów. Wszystkie właściwości mogą być tylko do odczytu. Musimy zagwarantować, że aktor nie jest modyfikowany przez cokolwiek innego. Stan aktora za to może być modyfikowany przez niego samego.

Kluczową rolę pełnią tutaj asynchroniczne wiadomości. Jeśli ktoś jest zaznajomiony z nServiceBus czy nawet opisanym w zeszłym tygodniu Hangfire, powinien rozumieć systemy kolejkowe. W najprostszej postaci, wspomniana klasa (aktor) będzie zawierała kolekcję odebranych wiadomości. Następnie w wątku, będą one zdejmowane i przetwarzane jedna po drugim. Konieczne jest, aby dany aktor, przetwarzał wyłącznie jedną wiadomość w dowolnym czasie. Dzięki temu, nie musimy martwić się o synchronizację. Mamy zagwarantowane zatem:
– jeden aktor to wyłącznie jeden wątek
– stan aktora nie jest modyfikowany na zewnątrz
– żadne blokady nie są wymagane.

To bardzo ułatwia pracę. Nie musimy korzystać z żadnych blokad, ponieważ dany kod jest wykonywany wyłącznie przed jeden wątek. Sekcje krytyczną zastąpiono zatem asynchronicznymi wiadomościami. Jeśli wątek A chce odczytać albo zmienić stan wątku B, wykonuje to przez asynchroniczne wiadomości.

Zwykle systemy tworzą hierarchie aktorów, uformowane w postaci drzew. Każdy aktor może mieć swojego rodzica (zarządcę). Zwykle dany problem rozbija się na taką liczbę aktorów, aby pojedynczy aktor mógł wykonywać kod bez żadnej synchronizacji. Jeśli dany problem składa się z operacji wymagających sekcji krytycznych, wtedy rozdzielamy go jeszcze bardziej, tworząc kolejny poziom w drzewie aktorów.

Najtrudniejszym element w wielowątkowości jest modyfikacja stanu współdzielonego. W przypadku aktorów, takiego stanu po prostu nie ma. Każdy aktor pracuje niezależnie od siebie. Jeśli dane jednego aktora potrzebne są przez drugiego, przesyłane są w formie niemodyfikowalnych, asynchronicznych wiadomości.

Dla prostych problemów, model może okazać się zbyt skomplikowany. Czasami łatwiej jest stworzyć sekcję krytyczną w formie blokady, niż implementować kilka klas komunikujących się za pomocą wiadomości. W przyszłym poście, pokażę realny problem, zaimplementowany najpierw za pomocą blokady lock, a potem w w postaci aktorów.

Code Review: Metody asynchroniczne z async oraz oczekiwanie na rezultat

Coraz więcej API dostarcza asynchroniczne wersje metod. Niektóre z nich, idą o krok dalej i w ogóle nie posiadają synchronicznej wersji. Załóżmy, że zewnętrzna biblioteka ma następującą metodę:

 async Task<string> FetchDataAsync() {...}

Często jednak nie potrzebujemy korzystać z wersji async i tylko ona komplikuje sprawę. W powyższym przypadku moglibyśmy pokusić się o:

  string data=dataProvider.FetchDataAsync().Result;

  Console.WriteLine(data);

Niestety powyższy kod może być niebezpieczny i wywołać w niektórych sytuacjach deadlock. Jeśli nie wiemy, jak została zaimplementowana metoda FetchDataAsync bardzo łatwo popełnić błąd. Załóżmy, że ciało metody wygląda następująco:

    public async Task<string> FetchDataAsync()
    {
         await DoSomethingAsync();

         return "Hello World";
    }
    private Task DoSomethingAsync()
    {
         return Task.Run(() => Thread.Sleep(2000));
    } 

Kiedyś na blogu pokazywałem już podobny przykład odnoście wydajności async\await. Jeśli wywołujemy await, pobierany jest kontekst przed wejściem w nowy wątek. Dzięki temu, po zakończeniu wątku, czyli w momencie wyjścia z await, wiadomo jaki wątek powinien kontynuować operację. Wynika to z faktu, że w większości przypadków, użytkownik spodziewa się, że wątek przed await i po jest taki sam. Rozważmy kod:

string data = await GetInfo();
_textBox.Text = data;

Gdyby po wyjściu await, kontekst wykonywania nie przechodził w ten przed wywołaniem wątku, wtedy aktualizacja interfejsu nie powiodłaby się – zawsze należy go aktualizować z wątku głównego.

Wracając do przykładu z deadlock. W celu powrócenia do wątku głównego, po wywołaniu “await DoSomethingAsync();”, należy oczywiście uzyskać do niego dostęp. Problem w tym, że będzie on ciągle zajęty. Wywołanie “string data=dataProvider.FetchDataAsync().Result;” blokuje wątek główny do momentu zakończenia operacji. Operacja oczywiście nigdy nie zakończy się ponieważ czeka ona na na dostęp do wątku głównego blokowanego przez “Task.Result”.

Zachowanie różni się w zależności od typu aplikacji. W przypadku aplikacji konsolowej, nie ma kontekstu synchronizacyjnego (TaskScheduler) więc operacja nie zakończy się deadlock’iem. Jeśli piszemy np. aplikację ASP.NET lub WPF wtedy doświadczymy opisanych wyżej problemów.

Istnieje możliwość wyłączenia mechanizmu powracania do poprzedniego kontekstu. Służy do tego ConfigureAwait(false);

        public async Task<string> FetchDataAsync()
        {
            await DoSomethingAsync().ConfigureAwait(false);

            return "Hello World";
        }

Po wywołaniu ConfigureAwait(false) nie będziemy mieli problemów z deadlock, ponieważ żaden kontekst nie będzie pobierany. Oczywiście nie jest to eleganckie rozwiązanie. ConfigureAwait jest jednak bardzo przydatny ze względów wydajnościowych – np. gdy w pętli wywołujemy await, wtedy zwykle nie ma sensu pobierać kontekstu.

Jeśli zatem wywołujemy metodę async, powinniśmy używać await, inaczej istnieje duże ryzyko, że kod po prostu zawiesi się.

Producent-konsument w C# – BlockingCollection

BlockingCollection jest specjalną kolekcją danych, przygotowaną do implementacji wzorca producent-konsument. Nakład pracy do implementacji tego wzorca jest minimalny z BlockingCollection. Nie musimy martwić się o synchronizację, sekcję krytyczną czy deadlock. Zacznijmy od razu od przykładu.
Producent będzie wyglądać następująco:

       private static void Produce(BlockingCollection<int> buffer)
        {
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine("Producing {0}", i);
                Thread.Sleep(10);

                buffer.Add(i);
            }

            buffer.CompleteAdding();
        }

Jak widzimy, implementacja producenta to nic innego jak dodawanie danych do kolekcji. Metoda Add jest thread-safe więc nie musimy używać lock. Ponadto robi to bardzo optymalnie, ponieważ nie jest to prosty mechanizm polegający po prostu na wstawieniu lock’a. Zaglądając do implementacji Add, zobaczymy między innymi spinning. BlockingCollection należy to tzw. concurrent collections o których już pisałem na blogu. Oznacza to, że jakiekolwiek operacje są zaimplementowane w ten sposób, aby unikać blokad. Zostało to osiągnięte na poziomie projektu (np. kilka mini-kolekcji w środku dla różnych wątków), jak i używania spinningu, gdy wiadomo, że zbyt długo nie będzie trzeba czekać.

Metoda CompleteAdding kończy produkcję i konsumenci nie będą już dłużej czekać. Musimy ją wywołać na końcu ponieważ w przeciwnym wypadku, konsumenci będą myśleli, że produkcja ciągle trwa i należy wciąż czekać.
Przyjrzyjmy się teraz konsumpcji:

        private static void Consume(BlockingCollection<int> buffer)
        {
            foreach (var i in buffer.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming {0}.", i);
                Thread.Sleep(20);
            }
        }

Kluczem jest metoda GetConsumingEnumerable. W bezpieczny sposób usuwa one dane z bufora. Jeśli w buforze nic nie ma po prostu wątek będzie blokowany. Iterator zakończy dopiero działanie, gdy producent wywoła CompleteAdding. W przeciwnym wypadku, foreach będzie zdejmował dane z kolekcji, lub blokował wywołanie w oczekiwaniu na więcej danych. Jeśli zajrzyjmy do implementacji wewnętrznej, znowu znajdziemy semafory i SpinWait.

Tak naprawdę, do najprostszej implementacji nic więcej już nie potrzebujemy. Całość wygląda zatem następująco:

class Program
    {
        static void Main(string[] args)
        {
            var buffer=new BlockingCollection<int>();

            var producerTask = Task.Run(() => Produce(buffer));
            var consumeTask = Task.Run(() => Consume(buffer));

            Task.WaitAll(producerTask, consumeTask);
        }

        private static void Produce(BlockingCollection<int> buffer)
        {
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine("Producing {0}", i);
                Thread.Sleep(10);

                buffer.Add(i);
            }

            buffer.CompleteAdding();
        }

        private static void Consume(BlockingCollection<int> buffer)
        {
            foreach (var i in buffer.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming {0}.", i);
                Thread.Sleep(20);
            }
        }
    }

W praktyce jednak trzeba rozważyć kilka innych “drobiazgów”. Co jeśli wyjątek zdarzy się podczas konsumpcji danych? Producent wciąż będzie generował dane, co przecież zwykle nie ma sensu i spowoduje memory leak. Dlatego lepiej napisać obsługę błędów:

       private static void Consume(BlockingCollection<int> buffer)
        {
            try
            {
                foreach (var i in buffer.GetConsumingEnumerable())
                {
                    Console.WriteLine("Consuming {0}.", i);
                    Thread.Sleep(20);
                    throw new Exception();
                }
            }
            catch
            {
                buffer.CompleteAdding();
                throw;
            }
        }

W momencie wystąpienia błędu, wywołujemy CompleteAdding, co spowoduje, że próba dodania nowych danych przez producenta zakończy się wyjątkiem InvalidOperationException i zakończeniem produkcji.
Analogicznie, warto dostać klauzule try-finally w producencie:

        private static void Produce(BlockingCollection<int> buffer)
        {
            try
            {
                for (int i = 0; i < 100; i++)
                {
                    Console.WriteLine("Producing {0}", i);
                    Thread.Sleep(10);

                    buffer.Add(i);
                }
            }
            finally
            {
                buffer.CompleteAdding();
            }

        }

W przypadku producenta, chcemy wywołać CompleteAdding zarówno w przypadku powodzenia (aby konsument już dłużej nie czekał), jak i wyjątku.
CompleteAdding tak naprawdę korzysta z CancellationToken, który jest znany nam z TPL. Prawdopodobnie warto również dodać obsługę wyjątków InvalidOperationException, tak aby dwukrotnie nie wywoływać CompleteAdding.

Inna bardzo ważna obserwacja to przypadek, gdy konsument jest dużo wolniejszy niż producent. Załóżmy, że wyprodukowanie zajmuje jedną sekundę, a konsumpcja 10. W tym przypadku, po długim przetwarzaniu możemy mieć do czynienia z ogromną alokacją pamięci ponieważ producent będzie ciągle dodawał dane, a konsument nie nadąży z usuwaniem ich.

BlockingCollection w bardzo prosty sposób rozwiązuje ten problem poprzez wprowadzenie maksymalnego limitu “porcji” w kolekcji. Wystarczy, w konstruktorze przekazać maksymalną pojemność:

var buffer = new BlockingCollection<int>(boundedCapacity: 10);

Po przekroczeniu limitu, metoda Add nie wyrzuci wyjątku, a po prostu będzie blokowała wywołanie za pomocą wspomnianych wcześniej technik (spinning, locking etc).

Porównywanie znaków, ToUpper, string.IndexOf oraz StringComparison.Ordinal

Resharper daje naprawdę cenne wskazówki. Nie wszystkie są oczywiste i czasami należy zagłębić się w temat. Jedną z takich wskazówek jest używanie IndexOf wraz z StringComparison.Ordinal.

Załóżmy, że mamy następujący kod:

string text = "test";
Console.WriteLine(text.IndexOf("est"));

Resharper zasugeruje konwersję do:

string text = "test";
Console.WriteLine(text.IndexOf("est", StringComparison.Ordinal));

Dlaczego?
Jeśli nie przekażemy ustawień regionalnych jawnie, wtedy domyślnie aktualna zostanie użyta.
Czasami oczywiście dokładnie tego chcemy i dlatego domyślnie przekazywany jest StringComparison.CurrentCulture.

W niektórych sytuacjach może spowodować to bardzo irytujące problemy. Każdy język ma pewne zasady, które nie zawsze pokrywają się z intuicją osoby piszącącej kod. Najsłynniejszym chyba przykładem jest język turecki i litera ‘i’.

Czego byśmy spodziewali się po poniższym kodzie?

string text = "some text this";
Console.WriteLine(text.ToUpper().IndexOf("THIS")); 

Naturalne wydaje się, że po wywołaniu ToUpper, tekst “THIS” zostanie znaleziony (na pozycji 10). Zmieńmy kulturę na tr-TR (Turcja):

Thread.CurrentThread.CurrentCulture=new CultureInfo("tr-TR");
string text = "some text this";
Console.WriteLine(text.ToUpper().IndexOf("THIS"));

Na ekranie zobaczymy -1. W języku tureckim, wielka litera ‘i’ to İ, a nie ‘I’.

Inny przykład to w niemieckim litery ‘ß’ oraz ‘ss’, które będą traktowane jako takie same, jeśli ustawimy odpowiednio kulturę.

Ciekawy przykład, znalazłem również na StackOverflow:

//SOURCE: http://stackoverflow.com/a/10941507
var s1 = "é"; //é as one character (ALT+0233)
var s2 = "é"; //'e', plus combining acute accent U+301 (two characters)

Console.WriteLine(s1.IndexOf(s2, StringComparison.Ordinal)); //-1
Console.WriteLine(s1.IndexOf(s2, StringComparison.InvariantCulture)); //0
Console.WriteLine(s1.IndexOf(s2, StringComparison.CurrentCulture)); //0

Problem w tym, że nie mamy pojęcia na jakim komputerze nasz kod jest wykonywany. Musimy przyjmować, że może być to dowolna kultura. Jeśli zatem, porównujemy angielskie czy polskie znaki, wtedy nie chcemy korzystać z ustawień regionalnych. Co jeśli aplikacja webowa jest hostowana na niemieckim serwerze, a porównujemy w niej wyłącznie polskie słowa, bo np. baza danych i aplikacja jest wykorzystywana wyłącznie w Polsce. Z tego względu, bezpieczniejszą opcją jest StringComparison.Ordinal, która porównuje wartości liczbowe znaków (np. kod ASCII).

Kilka porad na temat usług REST

O usługach REST, które dzisiaj są wszechobecne pisałem już wiele razy np. tutaj.  Dzisiaj chciałbym napisać krótkie podsumowanie w formie porad i antywzorców. Zaczynamy:

1. Nigdy nie używaj czasowników w URI.

Przykład błędnych linków:

GET: localhost\persons\1\UpdateEmail?email=’…’

Jedynym dozwolonym czasownikiem w adresie to HTTP verb. Całość linku to nic innego jak hierarchia zasobów. Poprawna aktualizacja adresu email może wyglądać zatem następująco:

PUT: localhost\persons\1\email

Metoda PUT oznacza, że mamy do czynienia z modyfikacją danych. UpdateEmail to nazwa metody, a nie zasobu co jest sprzeczne z REST. Adres email’a w przypadku PUT można umieścić w ciele zapytania.

Analogicznie jeśli chcemy odczytać dane osoby, nie powinniśmy:

GET localhost\persons\1\getdata

Poprawny adres to:

GET localhost\persons\1\data

2. REST != RPC.  

Osoby zaczynające z REST, często próbują korzystać z tego typu usług tak samo jak np. z WCF. Klasyczne RPC (Remote-Procedure-Call) polega na wykonaniu konkretnej metody na usłudze. REST to hierarchia zasobów  więc powinniśmy tworzyć linki w postaci:

localhost/persons/1/address/town

Widzimy, że operujemy tutaj na kilku poziomach. Niepoprawną z kolei postacią jest:

localhost/persons/gettown?id=1

Wynika to nie tylko z użycia czasownika w nazwie, ale również w operowaniu bezpośrednio na korzeniu zasobów, zamiast na wykorzystaniu całej dostępnej hierarchii.

3. Korzystaj z dostępnych metod  HTTP.

Jeśli chcemy usunąć zasób wtedy:

DELETE local/host/persons/1

Początkujące osoby zbyt często korzystają wyłącznie z GET. Jeśli usługa ma realizować zadania CRUD, wtedy zdecydowanie potrzebujemy POST,GET,PUT,DELETE.

4. Korzystaj z dostępnych statusów HTTP.

Status 200 (OK) zdecydowanie nie jest jedynym z którego możemy korzystać. Jeśli operacja nie uda się, wtedy oprócz zwrócenia błędu w ciele HTTP, należy zwrócić również kod np. 500 (Internal Error) albo 400 (Bad Request) w zależności od kontekstu. Kody od 400 to błędy spowodowane przez klienta (np. nieprawidłowe zapytanie), z kolei od 500 to błędy spowodowane przez serwer.

Jeśli tworzymy nową encję za pomocą HTTP POST, można zwrócić 201 (Created) zamiast po prostu 200 (OK), który bardziej nadaję się na zapytania HTTP GET.

Inny przykład to kod 404 (Not found). Jeśli użytkownik wywoła localhost/persons/5, a osoba o identyfikatorze 5 nie istnieje, wtedy zamiast 200 należy zwrócić kod 404 .

5. QueryString

Klasyczne parametry QueyString wciąż są dozwolone, ale nie powinny określać one zasobów. Innymi słowy, złą praktyką jest:

localhost/persons?id=1

Identyfikator w tym przypadku jest ściśle powiązany z zasobem. Dopuszczalne jest za to używanie QueryString do określenia np. sortowania:

lcoalhost/persons?sort=desc

Podobnie sprawa wygląda z filtrowaniem czy z innymi parametrami, które nie określają zasobów a po prostu np. ich sposób wyświetlania.

6. Zwrócenie stanu obiektu po aktualizacji.

Utworzenie zasobu lub jego aktualizacja (POST, PUT) powinny zwracać jego reprezentacje albo link, który umożliwia odczytanie pełnego stanu. Innymi słowy, po utworzeniu zasobu, ciało odpowiedzi nie może być puste. Zwykle zwraca się mapę linków\operacji, które można wykonać na nowo utworzonym zasobie.

7.  Usługa powinna być bezstanowa.

Umożliwia to łatwiejsze skalowanie jak i cachowanie. Oznacza to, że wywołanie danej akcji, nie powinno zależeć od wykonania wcześniejszych operacji. Oczywiście mowa o stanie usługi, a nie aplikacji, gdzie stan stanowi zestaw reguł biznesowych. Z tego względu, autoryzacja nie powinna bazować na ciasteczkach czy sesjach ponieważ łamią one tą zasadę. Lepiej przesyłać dany token w każdym zapytaniu. Innymi słowy, każde zapytanie powinno dostarczyć wszystkie niezbędne dane do wykonania akcji.