Category Archives: Wielowątkowość

AKKA.NET – przełączanie stanów

Zanim przejdziemy do konkretnych problemów, musimy poznać przynajmniej podstawowe elementy AKKA.NET. W poprzednim wpisie opisałem jak definiować wiadomości oraz aktorów. Dzisiaj przejdziemy do kolejnego, bardzo ważnego elementu – przełączanie stanów. Jest to podstawowy element zarówno w modelu aktor, jak i w jakichkolwiek maszynach stanów (FSM). W poprzednim przykładzie przelewu środków z jednego konta na drugie, użyliśmy właśnie przełączania stanów. Dla przypomnienia:

class TransferActor
{
    public void OnTransferMessageReceived(TransferMessage transferMessage)
    {
        ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
         
        Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
    }
 
    public void AwaitFrom(string from, string to, int amount)
    {
        ActorsSystem.GetActor(to).Send(new DepositMessage(amount));
        Context.Became(AwaitTo(transferMessage.From, transferMessage.To, transferMessage.Amount));
    }
}

Po otrzymaniu zapytania o przesłaniu pieniędzy, przełączamy się w stan AwaitFrom. Po otrzymaniu potwierdzenia, składamy depozyt i znów czekamy na potwierdzenie.

W AKKA.NET zmiana stanu odbywa się za pomocą metody Become:

    public class SampleActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu I {0}",message);
            Become(GoToState2);
        }

        private void GoToState2(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu II {0}", message);
        }
    }

Po otrzymaniu pierwszej wiadomości, przełączamy się do stanu drugiego, reprezentowanego przez metodę GoToState2. Od tego momentu wszystkie wiadomości będą obsługiwane przez GoToState2, a nie OnReceive. Odpalmy zatem następujący kod:

            var system = ActorSystem.Create("JakasNazwa");
            var actor1 = system.ActorOf<SampleActor>();


            for (int i = 0; i < 10; i++)
            {
                actor1.Tell(i.ToString());
            }

Na ekranie zobaczymy najpierw tekst “Otrzymano wiadomosc ze stanu I”, a potem 9 razy “Otrzymano wiadomosc ze stanu II”.

Druga przydatna i alternatywna metoda to BecomeStacked. Podobnie jak Become służy do przełączania stanu. Tym razem jednak, stan będzie przechowywany na stosie, zatem będzie można go potem zdjąć i powrócić do poprzedniego. Przykład:

   public class SampleActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu I {0}",message);
            BecomeStacked(GoToState2);
        }

        private void GoToState2(object message)
        {
            Console.WriteLine("Otrzymano wiadomosc ze stanu II {0}", message);
            UnbecomeStacked();
        }
    }

Po odpaleniu, na zmianę będziemy mieć stan I oraz II. Oczywiście nie jesteśmy ograniczeni wyłącznie do dwóch stanów.

Akka.net – pierwszy przykład

W ostatnich dwóch wpisach pokazałem zasady działania modelu aktor. W kolejnych postach będę korzystał już z Akka.net zamiast pseudokodu. Dzisiaj czysty opis podstaw API – bez konkretnego problemu do rozwiązania.

Akka.net można zainstalować w formie pakietu Nuget:

Install-Package Akka

Następnie definiujemy wiadomość za pomocą zwyklej klasy typu immutable:

    public class TransferMoney
    {
        public string From { get; private set; }
        public string To { get; private set; }

        public TransferMoney(string from,string to)
        {
            From = @from;
            To = to;
        }
    }

Każdy aktor powinien dziedziczyć np. po ReceiveActor:

    public class TransferMoneyActor : ReceiveActor
    {
        public TransferMoneyActor()
        {
            Receive<TransferMoney>(msg =>
            Console.WriteLine("Transferring money from {0} to {1}", msg.From, msg.To));
        }
    }

Za pomocą Receive definiujemy, jakie wiadomości chcemy obsługiwać. Powyższa klasa zatem będzie odbierać wiadomości typu TransferMoney.
W akka.net aktorzy egzystują w tzw. ActorSystem. Aktorzy z dwóch różnych systemów są od siebie odizolowani. Inicjacja nowego systemu jest prosta:

var system = ActorSystem.Create("JakasNazwa");

Jak wiemy, aktorzy należą do konkretnych systemów, zatem w celu stworzenia aktora należy:

var actor1 = system.ActorOf<TransferMoneyActor>();

Wysłanie wiadomości odbywa się za pomocą metody Tell. Całość wygląda więc następująco:

var system = ActorSystem.Create("JakasNazwa");

var actor1 = system.ActorOf<TransferMoneyActor>();

actor1.Tell(new TransferMoney("nadawca", "odbiorca"));

Z poprzednich wpisów pamiętamy, że wiadomości są kolejkowane i wykonywane asynchronicznie. Stwórzmy zatem następującego aktora:

   public class TransferMoneyActor : ReceiveActor
    {
        public TransferMoneyActor()
        {
            Receive<TransferMoney>(DoWork,shouldHandle:null);
        }

        private void DoWork(TransferMoney msg)
        {
            Console.WriteLine("{0}:{1}",DateTime.Now,Thread.CurrentThread.ManagedThreadId);

            Thread.Sleep(5000);
        }
    }

Jeśli faktycznie wiadomości są kolejkowane i wykonywane jedno po drugim, wtedy powinniśmy zawsze widzieć ten sam identyfikator wątku w odstępach dokładnie 5 sekund. W celu udowodnienia, że zadania są wykonywane asynchronicznie, w pętli wyślemy 10 wiadomości:

            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine("{0}: Wysylanie wiadomosci nr {1}", DateTime.Now,i);
                actor1.Tell(new TransferMoney("nadawca", "odbiorca"));
            }

Jeśli są synchronicznie wykonywane, wtedy po wyświetleniu tekstu “Wysylanie wiadomosci nr…”, powinniśmy zobaczyć numer wątku. Jeśli z kolei wykonywane są asynchronicznie, tak jak tego spodziewamy się, wtedy metoda Tell powinna wyłącznie umieścić wiadomość w kolejce.

Screenshot potwierdzający założenia:

tasks

W następnym wpisie, zajmiemy się znów jakimś problemem wielowątkowym, który najpierw rozwiążemy w “klasyczny sposób” z użyciem blokad, a później za pomocą modelu aktor.

Wielowątkowość: przykład modelu aktor

W ostatnim wpisie przedstawiłem zasadę działania modelu aktor. Zachęcam do przeczytania poprzedniego wpisu ponieważ dzisiaj skupię się na przykładzie, a nie podstawach teoretycznych. Jeśli poprzedni wpis nie był do końca zrozumiały, zachęcam do przeanalizowania przykładu z tego wpisu i potem powrócenia do poprzedniego postu – wtedy myślę, że wiele zagadnień będzie prostsze w zrozumieniu.

Poniższe przykłady należy traktować jako pseudokod. Stanowią one szkic wzorca aktor, a nie jego implementację. Do implementacji w kolejnych wpisach będę używał akka.net, ale moim zdaniem najważniejsze jest zrozumienie zasad, a nie nauczenie się kolejnego framework’a. Z tego względu, bardziej będę skupiał się na rozwiązywaniu różnych problemów wielowątkowych (np.problem ucztujących filozofów), a nie dokumentacji API.

Załóżmy, że chcemy rozwiązać klasyczny problem przelewu pieniędzy z jednego konta na drugie. W najprostszej postaci, będziemy mieli następującą klasę:

     class BankAccount
     {
         private int _Balance;


         public void Deposit(int amount)
         {
             _Balance += amount;
         }

         public void Withdraw(int amount)
         {
             if (amount <= _Balance)
                 _Balance -= amount;
             else
                 throw new ArgumentOutOfRangeException();
         }
     }

Oczywiście powyższy kod nie jest thread-safe, dlatego należy użyć blokady:

     class BankAccount
     {
         private int _Balance;
         private object _sync=new object();

         public void Deposit(int amount)
         {
             lock(_sync)
             {
               _Balance += amount;
             }
         }

         public void Withdraw(int amount)
         {
             lock(_sync)
             {
                 if (amount <= _Balance)
                    _Balance -= amount;
                 else
                    throw new ArgumentOutOfRangeException();
             } 
         }
     }

Kolejne zadanie to przetransferowanie pieniędzy z jednego konta na drugie. Klasyczne, błędne rozwiązanie to:

lock(accountA)
{
   lock(accountB)
   {
        accountA.Withdraw(5);
        accountB.Deposit(5);
   }
}

Oczywiście powyższy kod zakończy się deadlock, jeśli w tym samym czasie będziemy chcieli przelać pieniądze z konta A do B oraz z B do A.  Prawidłowe rozwiązanie to np. sortowanie blokad, przedstawione tutaj.

Wróćmy jednak do wzorca aktor. Stanowi on po prostu wyższy poziom abstrakcji dla wątków. Z poprzedniego wpisu wiemy, że aktorzy komunikują się za pomocą wiadomości, tak jak np. instancje w systemie kolejkowym. Zdefiniujmy zatem dwie wiadomości, dla depozytu i wycofywania środków:

     public class DepositMessage
     {
         public int Amount { get; }

         public DepositMessage(int amount)
         {
             Amount = amount;
         }
     }

     public class WithdrawMessage
     {
         public int Amount { get; }

         public WithdrawMessage(int amount)
         {
             Amount = amount;
         }
     }

Proszę zauważyć, że są one immutable – zawsze chcemy uniknąć współdzielenia stanu między różnymi aktorami. Następnie aktor, będzie obsługiwał wiadomości w sposób asynchroniczny:

     class BankAccountActor
     {
         private int _balance;

         public void OnReceive(object message)
         {
             if (message is WithdrawMessage)
             {
                 var withdrawMessage = ((WithdrawMessage)message);
                 if (withdrawMessage.Amount <= _balance)
                     _balance -= withdrawMessage.Amount;
             }

             if (message is DepositMessage)
             {
                 _balance += ((DepositMessage)message).Amount;
             }
         }
     }

Metoda OnReceive będzie wywoływana przez framework, w momencie otrzymania konkretnej wiadomości. Jak wspomniałem, przypomina to klasyczny system kolejkowy, ale OnReceive zawsze MUSI być wykonywane jedno po drugim. Jeśli dwie wiadomości przyjdą w tym samym czasie, mamy zagwarantowane, że OnReceive nie będzie wykonywane równocześnie z dwóch różnych wątków. Obsługa zatem może wyglądać następująco:


while(true)
{
    var message = blockingCollection.Dequeue();
    actor.OnReceive(message);
}

Z tego względu, nie musimy umieszczać w tych metodach żadnych blokad (brak współdzielonego stanu).
Następnie chcemy mieć możliwość transferu środków z jednego konta do drugiego. Zdefiniujmy zatem kolejną wiadomość:

     class TransferMessage
     {
         public string From { get; }
         public string To { get; }
         public int Amount { get; }

         public TransferMessage(string from, string to, int amount)
         {
             From = @from;
             To = to;
             Amount = amount;
         }
     }

Aktorzy mogą tworzyć hierarchie, w której jeden aktor zarządza kolejnymi. W naszym przypadku będziemy mieli dwa typy aktorów: TransferMoneyActor oraz BankAccountActor. Pierwszy z nich służy do koordynowania przepływu środków.

Najpierw implementujemy obsługę wiadomości TransferMessage:

     class TransferActor
     {
         public void OnTransferMessageReceived(TransferMessage transferMessage)
         {
             ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
             
             Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
         }

W momencie otrzymania TransferMessage, zostanie wysłana wiadomość do aktora, który reprezentuje konto nadawcy. Pamiętajmy, że wszystkie operacje są asynchroniczne, zatem stanowią model “fire&forget”. TransferActor jednak musi dowiedzieć się, czy środki zostały prawidłowo zdjęte z konta nadawcy. Z tego względu, jedną z bardzo ważnych właściwości aktorów jest zmiana kontekstu. W powyższym przykładzie chcemy zmienić kontekst w tryb oczekiwania na odpowiedź od nadawcy. Służy zwykle do tego metoda “Become”. Aktor zatem staje się aktorem oczekującym na odpowiedź od nadawcy. Odpowiedź przyjdzie oczywiście w formie kolejnej wiadomości:

     class MoneyWithdrawn
     {
         public ActorRef ActorRef { get;  }
         public int Amount { get;  }

         public MoneyWithdrawn(ActorRef actorRef,int amount)
         {
             ActorRef = actorRef;
             Amount = amount;
         }
     }

Następnie w momencie potwierdzenia wycofania pieniędzy, możemy wysłać wiadomość w celu umieszczenia środków na innym koncie:

     class TransferActor
     {
         public void OnTransferMessageReceived(TransferMessage transferMessage)
         {
             ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
             
             Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
         }

         public void AwaitFrom(string from, string to, int amount)
         {
             ActorsSystem.GetActor(to).Send(new DepositMessage(amount));
             Context.Became(AwaitTo(transferMessage.From, transferMessage.To, transferMessage.Amount));
         }

Analogicznie, aktor przechodzi w kolejny stan, oczekiwania na potwierdzenie złożenia depozytu. Potwierdzenie przyjdzie w formie kolejnej wiadomości:

     class TransferActor
     {
         public void OnTransferMessageReceived(TransferMessage transferMessage)
         {
            ActorsSystem.GetActor(transferMessage.From).Send(new WithdrawMessage(transferMessage.Amount));
             
             Context.Become(AwaitFrom(transferMessage.From,transferMessage.To,transferMessage.Amount));
         }

         public void AwaitFrom(string from, string to, int amount)
         {
             ActorsSystem.GetActor(to).Send(new DepositMessage(amount));
             Context.Became(AwaitTo(transferMessage.From, transferMessage.To, transferMessage.Amount));
         }

         public void AwaitTo(string from, string to, int amount)
         {
             Context.Finished();
         }

Widzimy, że każda operacja jest atomowa (pod warunkiem, że przetwarzanie wiadomości nie jest współbieżne). To bardzo ważna cecha systemów opartych na aktorach – należy rozszerzać hierarchie o tyle poziomów, aby konkretne zadanie było łatwe w implementacji. Przez “łatwe” mam na myśli sytuację, w której nie musimy korzystać z blokad.

Model dla prostych problemów (takich jak powyższy) jest moim zdaniem zła praktyką i przykładem over-engineering’u. Dla bardziej skomplikowanych problemów, znacząco to ułatwia zapobiegnięcie zakleszczeniom. Tak jak wspomniałem, aktor to pewien poziom abstrakcji. Ta abstrakcja daje nam ogromne możliwości skalowania – od problemu rozwiązywanego współbieżnie na np. 4 procesorach do środowiska opartego na wielu komputerach połączonych w sieć. Jeśli aktor jest abstrakcyjny, nic nie stoi na przykładzie, aby umieścić go na osobnym komputerze i przesyłać wiadomości za pomocą TCP. Jak widać, można skalować rozwiązanie od jednego procesu po wiele usług webowych komunikujących się dowolnymi sposobami (HTTP, systemy kolejkowy, TCP itp.). Użycie prostej blokady jest dobre, ale nie posiada żadnej abstrakcji – ogranicza nas do jednego procesu.

Wielowątkowość: Wzorzec aktor (actor based programming)

W kolejnych wpisach chciałbym opisać framework akka.net. Zanim jednak przejdę do opisu API, warto poświęcić chwilę (myślę, że około dwa wpisy) na zasadę działania “actor model”.

Aktor jest modelem budowania aplikacji wielowątkowych. Powstał w celu ułatwienia synchronizacji między różnymi wątkami. Programiści piszący aplikacje wielowątkowe zwykle korzystają z klasycznych blokad (lock) w celu opisania sekcji krytycznej. W wielu sytuacjach jest to najlepszy i najprostszy sposób. Niestety dla dużych i skomplikowanych systemów, utrzymywanie takiego kodu jest bardzo trudne, mozolne i niezwykłe podatne na powstanie deadlock lub livelock.

Dzięki odpowiedniemu podziałowi kodu, można uniknąć powyższych problemów na poziomie projektu klas. Wspomniany aktor to nic innego jak klasa, która spełnia pewne wymagania:
– przechowuje stan (zawiera np. pola lub właściwości)
– implementuje logikę (zawiera zatem metody)
– jest reprezentowana przez wątek
– komunikuje się z innymi aktorami za pomocą asynchronicznych wiadomości
– wiadomości nie mogą być modyfikowalne (zatem są “immutable”).
– aktor może przetwarzać wyłącznie jedną wiadomość danym momencie – pozostałe są kolejkowanie
– stan aktora nie może być modyfikowany bezpośrednio przez zewnętrzne obiekty

Myślę, że to najważniejsze właściwości modelu. W świecie C#, aktor będzie zatem klasą wykonywaną na osobnym wątku albo zadaniu (task – TPL). Taka klasa nie będzie eksponowała setter’ów. Wszystkie właściwości mogą być tylko do odczytu. Musimy zagwarantować, że aktor nie jest modyfikowany przez cokolwiek innego. Stan aktora za to może być modyfikowany przez niego samego.

Kluczową rolę pełnią tutaj asynchroniczne wiadomości. Jeśli ktoś jest zaznajomiony z nServiceBus czy nawet opisanym w zeszłym tygodniu Hangfire, powinien rozumieć systemy kolejkowe. W najprostszej postaci, wspomniana klasa (aktor) będzie zawierała kolekcję odebranych wiadomości. Następnie w wątku, będą one zdejmowane i przetwarzane jedna po drugim. Konieczne jest, aby dany aktor, przetwarzał wyłącznie jedną wiadomość w dowolnym czasie. Dzięki temu, nie musimy martwić się o synchronizację. Mamy zagwarantowane zatem:
– jeden aktor to wyłącznie jeden wątek
– stan aktora nie jest modyfikowany na zewnątrz
– żadne blokady nie są wymagane.

To bardzo ułatwia pracę. Nie musimy korzystać z żadnych blokad, ponieważ dany kod jest wykonywany wyłącznie przed jeden wątek. Sekcje krytyczną zastąpiono zatem asynchronicznymi wiadomościami. Jeśli wątek A chce odczytać albo zmienić stan wątku B, wykonuje to przez asynchroniczne wiadomości.

Zwykle systemy tworzą hierarchie aktorów, uformowane w postaci drzew. Każdy aktor może mieć swojego rodzica (zarządcę). Zwykle dany problem rozbija się na taką liczbę aktorów, aby pojedynczy aktor mógł wykonywać kod bez żadnej synchronizacji. Jeśli dany problem składa się z operacji wymagających sekcji krytycznych, wtedy rozdzielamy go jeszcze bardziej, tworząc kolejny poziom w drzewie aktorów.

Najtrudniejszym element w wielowątkowości jest modyfikacja stanu współdzielonego. W przypadku aktorów, takiego stanu po prostu nie ma. Każdy aktor pracuje niezależnie od siebie. Jeśli dane jednego aktora potrzebne są przez drugiego, przesyłane są w formie niemodyfikowalnych, asynchronicznych wiadomości.

Dla prostych problemów, model może okazać się zbyt skomplikowany. Czasami łatwiej jest stworzyć sekcję krytyczną w formie blokady, niż implementować kilka klas komunikujących się za pomocą wiadomości. W przyszłym poście, pokażę realny problem, zaimplementowany najpierw za pomocą blokady lock, a potem w w postaci aktorów.

Code Review: Metody asynchroniczne z async oraz oczekiwanie na rezultat

Coraz więcej API dostarcza asynchroniczne wersje metod. Niektóre z nich, idą o krok dalej i w ogóle nie posiadają synchronicznej wersji. Załóżmy, że zewnętrzna biblioteka ma następującą metodę:

 async Task<string> FetchDataAsync() {...}

Często jednak nie potrzebujemy korzystać z wersji async i tylko ona komplikuje sprawę. W powyższym przypadku moglibyśmy pokusić się o:

  string data=dataProvider.FetchDataAsync().Result;

  Console.WriteLine(data);

Niestety powyższy kod może być niebezpieczny i wywołać w niektórych sytuacjach deadlock. Jeśli nie wiemy, jak została zaimplementowana metoda FetchDataAsync bardzo łatwo popełnić błąd. Załóżmy, że ciało metody wygląda następująco:

    public async Task<string> FetchDataAsync()
    {
         await DoSomethingAsync();

         return "Hello World";
    }
    private Task DoSomethingAsync()
    {
         return Task.Run(() => Thread.Sleep(2000));
    } 

Kiedyś na blogu pokazywałem już podobny przykład odnoście wydajności async\await. Jeśli wywołujemy await, pobierany jest kontekst przed wejściem w nowy wątek. Dzięki temu, po zakończeniu wątku, czyli w momencie wyjścia z await, wiadomo jaki wątek powinien kontynuować operację. Wynika to z faktu, że w większości przypadków, użytkownik spodziewa się, że wątek przed await i po jest taki sam. Rozważmy kod:

string data = await GetInfo();
_textBox.Text = data;

Gdyby po wyjściu await, kontekst wykonywania nie przechodził w ten przed wywołaniem wątku, wtedy aktualizacja interfejsu nie powiodłaby się – zawsze należy go aktualizować z wątku głównego.

Wracając do przykładu z deadlock. W celu powrócenia do wątku głównego, po wywołaniu “await DoSomethingAsync();”, należy oczywiście uzyskać do niego dostęp. Problem w tym, że będzie on ciągle zajęty. Wywołanie “string data=dataProvider.FetchDataAsync().Result;” blokuje wątek główny do momentu zakończenia operacji. Operacja oczywiście nigdy nie zakończy się ponieważ czeka ona na na dostęp do wątku głównego blokowanego przez “Task.Result”.

Zachowanie różni się w zależności od typu aplikacji. W przypadku aplikacji konsolowej, nie ma kontekstu synchronizacyjnego (TaskScheduler) więc operacja nie zakończy się deadlock’iem. Jeśli piszemy np. aplikację ASP.NET lub WPF wtedy doświadczymy opisanych wyżej problemów.

Istnieje możliwość wyłączenia mechanizmu powracania do poprzedniego kontekstu. Służy do tego ConfigureAwait(false);

        public async Task<string> FetchDataAsync()
        {
            await DoSomethingAsync().ConfigureAwait(false);

            return "Hello World";
        }

Po wywołaniu ConfigureAwait(false) nie będziemy mieli problemów z deadlock, ponieważ żaden kontekst nie będzie pobierany. Oczywiście nie jest to eleganckie rozwiązanie. ConfigureAwait jest jednak bardzo przydatny ze względów wydajnościowych – np. gdy w pętli wywołujemy await, wtedy zwykle nie ma sensu pobierać kontekstu.

Jeśli zatem wywołujemy metodę async, powinniśmy używać await, inaczej istnieje duże ryzyko, że kod po prostu zawiesi się.

Różnica między Task.Run, a Task.Factory.StartNew

W .NET 4.5 pojawiła się metoda Task.Run. Z przyzwyczajenia jednak przez długi czas używałem tylko Task.Factory.StartNew. Obie metody służą do stworzenia nowego wątku i natychmiastowego jego uruchomienia. Sposób wywołania wygląda bardzo podobnie:

var t1=Task.Run(()=>Method());
var t2 = Task.Factory.StartNew(Method);

Zajrzyjmy do zdekompilowanego kodu Task.Run:

    [__DynamicallyInvokable]
    [MethodImpl(MethodImplOptions.NoInlining)]
    public static Task Run(Action action)
    {
      StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
      return Task.InternalStartNew((Task) null, (Delegate) action, (object) null, new CancellationToken(), TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackMark);
    }

Oznacza to, że Task.Run to nic innego jak:

var t2 = Task.Factory.StartNew(Method, CancellationToken.None,TaskCreationOptions.DenyChildAttach,TaskScheduler.Default);

Spróbujmy jednak rozszyfrować co powyższe parametry oznaczają. W przypadku CancellationToken.None sprawa jest oczywista – po prostu nie przekazujemy własnego tokena.

Następny parametr to TaskCreationOptions.DenyChildAttach. Opisałem go w poprzednim w poście w szczegółach – służy do tworzenia wątków macierzystych, które są po prostu niezależne od pozostałych wątków.

TaskScheduler.Default wymaga więcej uwagi. Często przekazuje się TaskScheduler.Default albo TaskScheduler.Current. Pierwszy z nich zwraca domyślny scheduler, drugi z kolei aktualny czyli ten ustawiony w danym wątku. Zaglądając do TaskScheduler.Current zobaczymy

        public static TaskScheduler Current
        { 
            get
            { 
                Task currentTask = Task.InternalCurrent; 

                if (currentTask != null) 
                {
                    return currentTask.ExecutingTaskScheduler;
                }
                else 
                {
                    return TaskScheduler.Default; 
                } 
            }
        } 

Innymi słowy, jeśli aktualny wątek nie ma żadnego schedulera, wtedy TaskScheduler.Default zostanie zwrócony. Wynika z tego, że TaskScheduler.Current i TaskScheduler.Default mają różne wartości, wyłącznie gdy aktualny wątek ma już ustawiony jakiś scheduler.

Default oznacza, że następujący obiekt zostanie zwrócony:

 private static TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler(); 

Klasa ThreadPoolTaskScheduler używa standardowej puli wątków zaimplementowanej w .NET. Domyślny scheduler, to ten operujący bezpośrednio na puli wątków .NET. Warto pamiętać, że ta pula używa wewnętrznie dwóch typów kolejek do przechowywania wątków. GlobalQueue zawiera referencje do wątków macierzystych (patrz poprzedni wpis). LocalQueue zawiera z kolei wątki stworzone w kontekście innych wątków. Wynika to z właściwości opisanych w poprzednim wpisie i z tego względu, LocalQueue jest powiązany z wątkiem macierzystym.

Kiedy zatem chcemy używać Default lub Current? Klasycznym jest odświeżanie interfejsu. Zwykle pierwszy wątek będzie wykonywał czasochłonną operację, a potem chcemy wykonać zadanie na wątku UI, w celu odświeżenia interfejsu. Przykład:

Task.Factory.StartNew(RunLongLastingOperation, CancellationToken.None, TaskCreationOptions.DenyChildAttach,TaskScheduler.Default).
ContinueWith(UpdateUI, TaskScheduler.FromCurrentSynchronizationContext());

RunLongLastingOperation będzie wykonana na wątku z puli. Potem, UpdateUI użyje schedulera bazującego na kontekście synchronizacyjnym, czyli umieści zadanie na wątku UI. Następnie załóżmy, że w UpdateUI tworzymy kolejny wątek za pomocą TaskScheduler.Current:

private void UpdateUI()
{
    Task.Factory.StartNew(AnotherOperation,CancellationToken.None, TaskCreationOptions.DenyChildAttach,TaskScheduler.Current);
}

AnotherOperation w tym przypadku, zostanie wykonany na tym samym wątku co UpdateUI, czyli wątku UI. Jeśli chcemy mieć serie typu czasochłonna operacja, aktualizacja UI i znów czasochłonna operacja, nie możemy korzystać z TaskScheduler.Current bo w powyższym przypadku będzie to po prostu wątek UI. Z tego względu, musimy skorzystać z puli wątków czyli TaskScheduler.Default:

private void UpdateUI()
{
    Task.Factory.StartNew(AnotherOperation,CancellationToken.None, TaskCreationOptions.DenyChildAttach,TaskScheduler.Default);
}

Co nam daje zatem Task.Run? Tworzy on zawsze wątek na puli (TaskScheduler.Default), który jest niezależny od rodzica(DenyChildAttach). Jeśli chcemy stworzyć wątek, który coś robi w tle, wtedy Task.Run jest naturalnym wyborem. W przypadku Task.Factory.StartNew zostanie przekazany domyślnie TaskCreationOptions.None, co spowoduje, że wątek nie jest niezależny.

W praktyce, Task.Run prawdopodobnie powinien być najczęściej wykorzystywany. Z tego względu, twórcy .NET dodali skrót w formie Task.Run do najczęściej wykorzystywanych parametrów.

Tworzenie wątków: TaskCreationOptions.DenyChildAttach, TaskCreationOptions.AttachedToParent

Tworząc nowe zadania (wątki) za pomocą TPL, możemy przekazać parametry AttachedToParent lub DenyChildAttach. Określają one, czy wątek powinien być podłączony do rodzica czy nie. W dzisiejszym wpisie postaram wyjaśnić się, czym one różnią się.
Parametry definiują relację wątku z nadrzędnym wątkiem. Jeśli wątek A, tworzy kolejny wątek B, wtedy za pomocą powyższych wartości możemy określić relacje wątku B z A.

Spróbujmy zatem wyjaśnić jak ta relacja wpływa na zachowanie wątków.

Jeśli wątek B(podrzędny) jest podłączony do rodzica (AttachedToParent), to wątek A (macierzysty) zawsze będzie czekał na wykonanie B. Innymi słowy, wątek A nie zostanie uznany za zakończony, dopóki nie skończy działania wątek B. Najpierw uruchomimy kod, z opcją DenyChildAttach:

            var parent = Task.Factory.StartNew(
                   () =>
                   {
                       Task.Factory.StartNew(() =>
                       {
                           Thread.Sleep(1000);
                           Console.WriteLine("B");
                       }, TaskCreationOptions.DenyChildAttach);
                   });

            parent.Wait();
            Console.WriteLine("Parent thread finished.");
            Console.ReadLine();

Na ekranie prawdopodobnie zobaczymy następującą sekwencję:
1. Parent thread finished.
2. B

Wątek macierzysty (parent), nie czeka na skończenie wątku B. Wywołanie Result albo Wait, przestanie blokować jak tylko wykona się wątek główny, bez wątków zagnieżdżonych.
Zmieńmy teraz na AttachedToParent:

            var parent = Task.Factory.StartNew(
                   () =>
                   {
                       Task.Factory.StartNew(() =>
                       {
                           Thread.Sleep(1000);
                           Console.WriteLine("B");
                       }, TaskCreationOptions.AttachedToParent);
                   });

            parent.Wait();
            Console.WriteLine("Parent thread finished.");
            Console.ReadLine();

Kolejność oczywiście będzie odwrotna, a mianowicie:
1. B
2. Parent thread finished.

Kolejna różnica wynika z obsługi błędów. Rozważmy następujący kod:

            var parent = Task.Factory.StartNew(
                   () =>
                   {
                       Task.Factory.StartNew(() =>
                       {
                           Thread.Sleep(1000);
                           throw new Exception("B");
                       }, TaskCreationOptions.AttachedToParent);
                   });

            parent.Wait();
            Console.WriteLine("Parent thread finished.");
            Console.ReadLine();

Wywołanie Task.Wait (powyższy przykład), albo Task.Result spowoduje wychwycenie wyjątku wyrzuconego przez podrzędne wątki. W przypadku DetachedChild, musielibyśmy napisać obsługę błędu w ciele wątku macierzystego ponieważ Wait\Result wyrzuci wyłącznie wyjątek, jeśli był on wyrzucony bezpośrednio w wątku macierzystym.

Kolejna różnica to anulowanie wątków. Zacznijmy od przypadku, kiedy wątek podrzędny typu Attached, anuluje wykonanie:

            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;

            var parent = Task.Factory.StartNew(
                   () =>
                   {
                       Task.Factory.StartNew(() =>
                       {
                           Thread.Sleep(1000);
                           token.ThrowIfCancellationRequested();

                       }, token, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
                    
                   }, token);


            Thread.Sleep(5);
            parent.Wait(token);
            parent.Wait();

            Console.WriteLine(parent.Status);
            Console.ReadLine();

W takim przypadku, wyjątek (TaskCancellationException) zostanie przekazany macierzystemu wątkowi, co oznacza, że Wait na głównym wątku wyrzuci TaskCancellationException.

Nieco inaczej sytuacja wygląda w przypadku wątków Detached:

            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;

            var parent = Task.Factory.StartNew(
                   () =>
                   {
                       Task.Factory.StartNew(() =>
                       {
                           Thread.Sleep(1000);
                           token.ThrowIfCancellationRequested();

                       }, token, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
                    
                   }, token);


            Thread.Sleep(5);
            tokenSource.Cancel();
            parent.Wait(token);

            Console.WriteLine(parent.Status);
            Console.ReadLine();

Jeśli wątek macierzysty kończy się przed anulowaniem wątku podrzędnego, wtedy TaskCancellationException nie zostanie wyrzucony przy wykonywaniu Wait. Wynika to z faktu przedstawionego na początku wpisu – wątek macierzysty nie czeka na wątki podrzędne, zatem taka informacja jest po prostu niedostępna.

Jeśli wątek podrzędny zostanie anulowany przed zakończeniem się wątku macierzystego, to TaskCancellationException zostanie wyrzucony, pomimo tego, że jest to wątek Detached:

            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;

            var parent = Task.Factory.StartNew(
                   () =>
                   {
                       Task.Factory.StartNew(() =>
                       {
                           token.ThrowIfCancellationRequested();

                       }, token, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
                    Thread.Sleep(100);
                   }, token);


            tokenSource.Cancel();
            parent.Wait(token);

            Console.WriteLine(parent.Status);
            Console.ReadLine();

Mam nadzieję, że powyższe przykłady były przydatne i rozjaśniły temat, a nie jeszcze bardzo skomplikowały go 🙂

Dobre praktyki – wydajność async\await dla skomplikowanych wyrażeń

O dobrych praktykach async\await pisałem już wielokrotnie. Dzisiaj zacznijmy od code review:

private static async void Test()
{
  int result = SyncFunction1() + SyncFunction2()*await TestAsync() + SyncFunction3();
}

private static int SyncFunction1()
{
  // jakas logika
  return 1;
}
private static int SyncFunction2()
{
  // jakas logika
  return 1;
}

private static int SyncFunction3()
{
  // jakas logika
  return 1;
}
private static async Task<int> TestAsync()
{
  return 5;
}

W powyższym kodzie mamy skomplikowane wyrażenie, które wygląda na synchroniczne:

int result = SyncFunction1() + SyncFunction2()*await TestAsync() + SyncFunction3();

Wiemy, że tak naprawdę musi zostać wygenerowana maszyna stanów. Powyższy przykład zawiera również wyrażenie, które jest przetwarzane standardowo od lewej do prawej strony, z uwzględnieniem nawiasów i kolejności wykonywania operatorów. Gdyby nie kod asynchroniczny, byłoby to łatwe – po prostu na stosie kolejne wyniki byłyby przechowywane.

Standardowy stos w async nie może zostać użyty bo po wskoczeniu w metodę asynchroniczną całość zostałaby wyczyszczona.

Z tego względu w .NET zaimplementowano tzw. stack spilling. Polega to na przeniesieniu wartości, które normalnie znajdowałby się na stosie, na stertę (stack->heap). Ma to oczywiście pewne konsekwencje takie jak dodatkowa alokacja obiektów czy boxing. Każdy zaalokowany obiekt referencyjny posiada kilka dodatkowych pól w przeciwieństwie do bardziej oszczędnych struktur. Ponadto GC będzie miał dodatkową robotę i powoduje to najwięcej strat. Przyjrzyjmy się wygenerowanej maszynie stanów:

[CompilerGenerated]
[StructLayout(LayoutKind.Auto)]
private struct <Test>d__0 : IAsyncStateMachine
{
public int <>1__state;
public AsyncVoidMethodBuilder <>t__builder;
public int <result>5__1;
private TaskAwaiter<int> <>u__$awaiter2;
private object <>t__stack;
void IAsyncStateMachine.MoveNext()
{
    try
    {
        int num = this.<>1__state;
        if (num != -3)
        {
            int arg_BF_0;
            int arg_BE_0;
            TaskAwaiter<int> taskAwaiter;
            if (num != 0)
            {
                int expr_1E = arg_BF_0 = Program.SyncFunction1();
                int expr_23 = arg_BE_0 = Program.SyncFunction2();
                taskAwaiter = Program.TestAsync().GetAwaiter();
                if (!taskAwaiter.IsCompleted)
                {
                    Tuple<int, int> tuple = new Tuple<int, int>(expr_1E, expr_23);
                    this.<>t__stack = tuple;
                    this.<>1__state = 0;
                    this.<>u__$awaiter2 = taskAwaiter;
                    this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter<int>, Program.<Test>d__0>(ref taskAwaiter, ref this);
                    return;
                }
            }
            else
            {
                Tuple<int, int> tuple = (Tuple<int, int>)this.<>t__stack;
                arg_BF_0 = tuple.Item1;
                arg_BE_0 = tuple.Item2;
                this.<>t__stack = null;
                taskAwaiter = this.<>u__$awaiter2;
                this.<>u__$awaiter2 = default(TaskAwaiter<int>);
                this.<>1__state = -1;
            }
            int arg_BE_1 = taskAwaiter.GetResult();
            taskAwaiter = default(TaskAwaiter<int>);
            int num2 = arg_BF_0 + arg_BE_0 * arg_BE_1 + Program.SyncFunction3();
            this.<result>5__1 = num2;
        }
    }
    catch (Exception exception)
    {
        this.<>1__state = -2;
        this.<>t__builder.SetException(exception);
        return;
    }
    this.<>1__state = -2;
    this.<>t__builder.SetResult();
}
[DebuggerHidden]
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine param0)
{
    this.<>t__builder.SetStateMachine(param0);
}
}

Na początku widzimy deklaracje pól wewnętrznych maszyny:

public int <>1__state;
public AsyncVoidMethodBuilder <>t__builder;
public int <result>5__1;
private TaskAwaiter<int> <>u__$awaiter2;
private object <>t__stack;

Wszystkie zmienne (numer stanu, buiilder, awaiter) były już wcześniej omawiane na blogu. Nowością jest _stack czyli wspomniany stos przechowywany na stercie (proszę zauważyć, że jest to zawsze typ referencyjny).

W programowaniu asynchronicznym, jeśli potrzebujemy użyć stosu (evaluation stack) umieszczamy go właśnie w tym polu (__stack). W większości wypadków będzie to lista wartości i wtedy do __stack przypisuje się Tuple<…> ze wszystkimi częściowymi wynikami.

W pierwszym stanie wykonujemy wszystko co jest po lewej stronie await, czyli SyncFunction1 oraz SyncFunction2:

int expr_1E = arg_BF_0 = Program.SyncFunction1();
int expr_23 = arg_BE_0 = Program.SyncFunction2();
taskAwaiter = Program.TestAsync().GetAwaiter();
if (!taskAwaiter.IsCompleted)
{
    Tuple<int, int> tuple = new Tuple<int, int>(expr_1E, expr_23);
    this.<>t__stack = tuple;
    this.<>1__state = 0;
    this.<>u__$awaiter2 = taskAwaiter;
    this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter<int>, Program.<Test>d__0>(ref taskAwaiter, ref this);
    return;
}

Wynik na stosie jest przechowywany jako Tuple<int,int>:

Tuple<int, int> tuple = new Tuple<int, int>(expr_1E, expr_23);
this.<>t__stack = tuple;

Jest to pierwsza “zbędna” alokacja. W kolejnym stanie zdejmujemy wartości ze stosu, uzyskując wynik SyncFunction1 oraz SyncFunction2. Na tym etapie mamy   również wynik asynchronicznej funkcji TestAsync:

Tuple<int, int> tuple = (Tuple<int, int>)this.<>t__stack;
    arg_BF_0 = tuple.Item1;
    arg_BE_0 = tuple.Item2;
    this.<>t__stack = null;
    taskAwaiter = this.<>u__$awaiter2;
    this.<>u__$awaiter2 = default(TaskAwaiter<int>);
    this.<>1__state = -1;
}
int arg_BE_1 = taskAwaiter.GetResult();
taskAwaiter = default(TaskAwaiter<int>);
int num2 = arg_BF_0 + arg_BE_0 * arg_BE_1 + Program.SyncFunction3();
this.<result>5__1 = num2;

Przyjrzyjmy się teraz sytuacji, gdzie niezbędne jest posiadanie więcej niż tylko dwóch stanów:

int result = await TestAsync()*SyncFunction1() + SyncFunction2()*(await TestAsync() + SyncFunction3()*await TestAsync());

Jeśli zajrzymy do IlSpy, zobaczymy masę alokacji m.in:

Tuple<int, int> tuple = new Tuple<int, int>(expr_AA, expr_AB);

Tuple<int, int, int, int> tuple2 = new Tuple<int, int, int, int>(arg_162_0, arg_162_1, expr_139, expr_146);

“Prosty” przykład, może poskutkować wieloma alokacji. Z tego względu, czasami lepiej wywołania asynchroniczne uszeregować poza wyrażeniem tzn.:

int resultAsync = await TestAsync();
int result = SyncFunction1() + SyncFunction2()*resultAsync;

Jeśli możemy kilka metod asynchronicznych wykonać w tym samym czasie, wtedy dużo lepiej jest użyć Task.WhenAll.

Powyższy kod nie doprowadzi do stack spilling, a wyniki tymczasowe będą przechowywane w zwyczajnych zmiennych lokalnych. Skutek będzie taki, że maszyna stanów będzie posiadała wiele pól, ale jest to lepsze w wielu przypadkach niż wspomniane alokacje (czas + overhead). Z drugiej strony, pola w maszynie stanów będą w pamięci aż do jej zakończenia, co może być czasochłonne (programowanie asynchroniczne). Wartości ze sterty zawsze można zwolnić, np. podczas przechodzenia z jednego stanu w drugi.

Visual Studio 2013 – debugowanie asynchronicznego kodu

W VS 2013 usprawniono debugowanie kodu asynchronicznego. Wszyscy jesteśmy przyzwyczajeni już do async\await. Znacząco to ułatwia wykonywanie operacji asynchronicznych. Niestety, debugowanie w VS 2012 jest dość uciążliwe. Załóżmy, że mamy kod z wieloma metodami asynchronicznymi, które z kolei są pozagnieżdżane. W przypadku wyrzucenia wyjątku lub ustawienia breakpoint’a, call stack nie zawierał żadnych informacji. Przetestujmy opisany problem  na następującym kodzie:

public partial class MainWindow : Window
{
   public MainWindow()
   {
       InitializeComponent();
       DoSomething();
   }   
   private async void DoSomething()
   {
       await RunAsync();
   }

   private async Task RunAsync()
   {
       await Task.Delay(100);
       
       await DownloadNumberAsync();
   }
   private Task<int> DownloadNumberAsync()
   {
       return Task<int>.Factory.StartNew(DownloadNumber);
   }
   private int DownloadNumber()
   {   
       return 1;
   }
}

W Visual Studio 2012, gdy ustawimy breakpoint na linię await DownloadNumberAsync, call stack wyglądał następująco:

image

Z kolei w VS 2013 mamy pełny stos:

image

Dzięki ulepszeniom w VS 2013 i Windows 8.1, dużo łatwiej zrozumieć kod asynchroniczny. Wcześniej call stack pokazywał wyłącznie ostatnią metodę i nie wiadomo było, jak ona została wywołana. Na blogu kiedyś opisywałem internale async\await. Wiemy, że jest tam w rzeczywistości maszyna stanu, oparta na callbackach. Z tego względu, w poprzednich wersjach VS, nie wiadomo było jak metoda była wywołana. VS 2013 rozpoznaje konstrukcje async\await i można już je zaprezentować w sposób, który wynika z kodu c#, a nie implementacji w CLR.

Wielowątkowość a CultureInfo

CultureInfo zawiera informacje regionalne, przydatne, jeśli chcemy dostosować naszą aplikację do różnym krajów. Używamy tej klasy m.in. do określenia formatowania liczb, dat czy po prostu języka w jakim wyświetlamy tekst.

Jeśli korzystamy z domyślnych ustawień regionalnych, wtedy poniższy kod wyświetli prawidłową wartość CultureInfo:

static void Main(string[] args)
{
  DisplayCulture();
  Task.Factory.StartNew(DisplayCulture);

  Console.ReadLine();
}

private static void DisplayCulture()
{
  Console.WriteLine(CultureInfo.CurrentCulture.ToString());
}

Często jednak w aplikacji można zmienić język i tym samym CultureInfoUI. Nierzadko popełnianym błędem jest po prostu ustawienie CultureInfo dla aktualnego wątku:

class Program
{
   static void Main(string[] args)
   {
       Thread.CurrentThread.CurrentCulture=new CultureInfo("en-US");
       DisplayCulture();
       Task.Factory.StartNew(DisplayCulture);

       Console.ReadLine();
   }

   private static void DisplayCulture()
   {
       Console.WriteLine(Thread.CurrentThread.CurrentCulture);
   }
}

Kod zmienia kulturę wyłącznie aktualnego wątku – w tym przypadku głównego. Wszelkie inne wątki, będą używały kultury domyślnej z ustawień regionalnych. W celu zsynchronizowania wątków musimy sami przekazywać kulturę i ustawiać ją ręcznie w każdym z wątków.

Jest to dość czasochłonne i nudne zajęcie więc istnieją pewne usprawnienia od .NET 4.5:

class Program
{
   static void Main(string[] args)
   {
       CultureInfo.DefaultThreadCurrentCulture=new CultureInfo("en-US");
       DisplayCulture();
       Task.Factory.StartNew(DisplayCulture);

       Console.ReadLine();
   }

   private static void DisplayCulture()
   {
       Console.WriteLine(Thread.CurrentThread.CurrentCulture);
   }
}

W .NET 4.5 dodano DefaultThreadCurrentCulture. Ustawienie tej właściwości spowoduje, że wszelkie wątki będą korzystały ze wskazanej kultury. Przed .NET 4.5 trzeba było ręcznie to ustawiać albo zastosować hack, który znajdziecie tutaj:

http://blog.rastating.com/setting-default-currentculture-in-all-versions-of-net/

Kod:

public void SetDefaultCulture(CultureInfo culture)  
{
    Type type = typeof(CultureInfo);

    try
    {
        type.InvokeMember("s_userDefaultCulture",
                            BindingFlags.SetField | BindingFlags.NonPublic | BindingFlags.Static,
                            null,
                            culture,
                            new object[] { culture });

        type.InvokeMember("s_userDefaultUICulture",
                            BindingFlags.SetField | BindingFlags.NonPublic | BindingFlags.Static,
                            null,
                            culture,
                            new object[] { culture });
    }
    catch { }

    try
    {
        type.InvokeMember("m_userDefaultCulture",
                            BindingFlags.SetField | BindingFlags.NonPublic | BindingFlags.Static,
                            null,
                            culture,
                            new object[] { culture });

        type.InvokeMember("m_userDefaultUICulture",
                            BindingFlags.SetField | BindingFlags.NonPublic | BindingFlags.Static,
                            null,
                            culture,
                            new object[] { culture });
    }
    catch { }
}

Warto zwrócić na to uwagę, ponieważ wyświetlanie np. dat w wątku głównym w inny sposób niż w pozostałych wątkach, nie jest dobrym pomysłem.