Category Archives: Wielowątkowość

Pętla wykonywana równolegle–część III

W ostatnich wpisach, pokazałem dwa różne podejścia wykonywania pętli równolegle. Każda z nich wciąż ma wady, głównie związane z sytuacją gdzie część logiki blokuje wątki.

W tym poście, pokażę bardziej dynamiczne podejście. Żaden z wątków nie będzie miał z góry przydzielonych elementów na których musi pracować. Zacznijmy po prostu od kodu:

private static void For(Action<int> iteration, int startIndex, int endIndex, int threadsNumber)
{
  const int chunkSize = 4;
  int current = startIndex;
  var countdownEvent = new CountdownEvent(threadsNumber);

  for (int i = 0; i < threadsNumber; i++)
  {
      Task.Factory.StartNew(delegate(object threadIndexState)
      {
          int j;
          while ((j = (Interlocked.Add(ref current, chunkSize) - chunkSize)) < endIndex)
          {
              for (int k = 0; k < chunkSize && j + k < endIndex; k++)
              {
                  iteration(j + k);
              }
          }
          countdownEvent.Signal();
      }, i);
  }
  countdownEvent.Wait();
}

Wciąż mamy zmienną chunkSize, ponieważ optymalniej jest operować na porcjach danych.

W pętli while, zwiększamy indeks current. Każdy wątek, który chce coś wykonać, zwiększa current, tak że kolejny będzie wykonywał już inną porcję danych – nie ma potrzeby synchronizowania porcji, która jest przetwarzana. Raz zaalokowana porcja danych do danego wątku, nie może potem zostać wykonana przez inny wątek.

Warto zauważyć, że im mniejsze porcje danych, tym więcej należy wywołać Interlocked na zmiennej current, co jest sekcją krytyczną i spowalnia aplikację.  Liczba wątków użytych w algorytmie nie jest już tak kluczowym elementem, ponieważ kod jest skalowalny.

Powyższe rozwiązanie nadaje się do użycia wyłącznie w scenariuszu, gdzie z góry znamy rozmiar tablicy. Co jeśli do czynienia mamy np.  z IEnumerable? Nie mamy tam do dyspozycji dostępu przez indeks. Nie da się uzyskać takiej samej wydajności jak w przypadku tablic. Jednym z prostszych rozwiązań jest buforowanie elementów. Czyli jak wątek T1 chce wykonać jakiś kod, to najpierw czyta tą porcję danych z Enumerable i przechowuję ją potem w zwykłej tablicy. Oczywiście wiąże się to z dodatkową sekcją krytyczną oraz blokadą.

Jak dobrać stopień zrównoleglenia?

Zrównoleglenie danego algorytmu to jeszcze nie koniec wyzwań. Pytanie jakie należy postawić, to jak wiele stworzyć wątków? Musimy wziąć pod uwagę synchronizacje i problemy z tym związane.

Jeśli mamy tylko 4 procesory, wtedy tworzenie więcej niż 4 wątków nie przyśpieszy obliczeń, jeśli wszystkie one zawsze będą zajęte. Tworzenie większej liczy wątków niż CPU, ma sens wyłącznie jak część z nich musi czekać na jakieś dane i tym samym, nie wykorzystują one w pełni cykli CPU.

Liczba wątków, zależy od tego jak zaprojektowaliśmy nasz algorytm. Jeśli mamy do dyspozycji komputer z 20 procesorami, nie znaczy to, że 20 wątków zawsze warto tworzyć. W jednym, z ostatnich wpisów pokazałem jakiego typu optymalizacje można osiągnąć. Wniosek był taki, że najczęściej jest to poziom subliniowy. Innymi słowy, jeśli praca na jednym CPU zajęła 10 sekund, to na 5 osiągniemy wynik >2s. Bardzo często jakaś część algorytmu będzie wykonywana równolegle. Oznacza to, że dodając nowe wątki, przyśpieszamy wyłącznie pracę wykonywaną równolegle. Jeśli  10 sekund zajmuje algorytm sekwencyjny, a operacje, które muszą zostać wykonane liniowo zajmują 4 sekundy, to mowa jest tylko o przyśpieszeniu pozostałych operacji, które zajmują 6s.

Prowadzi to do wniosku, że w pewnym momencie, dodawanie nowych rdzeni traci sens, ponieważ wyłącznie sekcja krytyczna będzie wąskim gardłem algorytmu.

Przyśpieszenie algorytmu można wyliczyć za pomocą następującego wzoru:  1 / (S+ (1-S)/P).

S to procent operacji wykonywanych sekwencyjnie (sekcja krytyczna), a 1-S to pozostały procent, przeznaczony na operacje, które mogą zostać wykonane równolegle. P to liczba wątków, czyli stopień zrównoleglenia. Jeśli S=1 to algorytm jest typowo sekwencyjny i nie ważne jak wiele wątków użyjemy, wynik będzie taki sam. Jeśli z kolei 1-S równa się zero, to tworząc 5 nowych wątków, przyśpieszymy algorytm 5-krotnie.

Spójrzmy na przykład. Zakładamy, że 90% operacji muszą zostać wykonane sekwencyjnie. Podstawiając do wzoru, otrzymamy następujące rezultaty:

P

Przyśpieszenie

1 1
2

1.052631579

3

1.071428571

4

1.081081081

5

1.086956522

Jak widać, dodawanie kolejnych rdzeni nie ma zbytniego sensu. Należy jednak pamiętać, że jest to wielkie przybliżenie. W praktyce mamy do czynienia z dodatkowymi czynnikami. Każdy nowy rdzeń to nie tylko dodatkowe cykle, ale również pamięć podręczna. Nie wzięliśmy również pod uwagę, że część kodu może zostać wykonywana dłużej niż inna. Nie da się tego tak łatwo opisać, że 80% kodu można zrównoleglić. Co jeśli czas algorytmu zależy od danych wejściowych? Ma na myśli sytuacje, gdzie np. przetworzenie pierwszej części tablicy zajmuje więcej niż drugiej?

Wzór to wyłącznie przybliżenie ale daje nam konkretną informację na temat korzyści płynących z zrównoleglenia danego algorytmu.

Pętla wykonywana równolegle–część II

W poprzednim poście pisałem  o statycznej dekompozycji tablicy na kilka wątków. Główną wadą podejścia było przypuszczenie, że wszystkie iteracje są tak samo skomplikowane.

W niektórych algorytmach należy znaleźć element spełniający podane wymagania. Wyobraźmy sobie, że mamy 100 elementową tablicę i dzielimy ją na 10 wątków. Ponadto element szukany znajduje się pod indeksem 9. Wniosek taki, że NIC nie zyskamy ze zrównoleglenia. Dziewięć wątków będzie szukało w złym miejscu, a pierwszy z nich będzie wykonywał prace jak w podejściu sekwencyjnym.

Powyższe problemy można łatwo ominąć. Zamiast statycznie przydzielać wątki do iteracji, możemy wykonać to w sposób bardziej dynamiczny. Wystarczy, że iteracje będą przetwarzane na zmianę przez różne wątki.

Załóżmy, że tablica ma 10 elementów a mamy do dyspozycji 3 wątki. Możemy zatem do T1 przydzielić elementy 1,4,7,10, do T2 2,5,8 a do T3 3,6,9. Innymi słowy wątek wykonuje swoją porcję a następnie pomija wszystkie elementy, które przydzielone są do innego wątku. Oczywiście wspomniane porcję danych nie muszą być równe jednej iteracji. Może być, że pierwsze 5 elementów jest przydzielonych do T1, następne 5 do T2 itd.

Mała zmiana a zlikwiduje problemy wspomniane na początku wpisu. Implementacja jest również prosta:

private static void Main(string[] args)
{
  For(Console.WriteLine, 1, 10, 3);
  Console.WriteLine("Koniec");
}

private static void For(Action<int> iteration, int startIndex, int endIndex, int threadsNumber)
{
  const int chunkSize = 4;

  var countdownEvent = new CountdownEvent(threadsNumber);

  for (int i = 0; i < threadsNumber; i++)
  {
      Task.Factory.StartNew(delegate(object threadIndexState)
      {
          int start = startIndex + (int)threadIndexState * chunkSize;

          for (int j = start; j < endIndex; j += chunkSize * threadsNumber)
          {
              for (int k = 0; k < chunkSize && j + k < endIndex; k++)
              {
                  iteration(j + k);
              }
          }
          countdownEvent.Signal();
      }, i);
  }
  countdownEvent.Wait();
}

Kod jest podobny do tego z poprzedniego wpisu. Tak naprawdę wciąż jest to statyczna dekompozycja. Każdy wątek ma z góry określone iteracje do przetworzenia ale nie są one przylegające do siebie – co niweluje wiele problemów. Wciąż jednak musimy znać rozmiary tablicy – dla IEnumerable podejście nie sprawdzi się ponieważ nie mamy tam do dyspozycji indeksów.

Jaki wpływ na wydajność ma programowanie współbieżne

Dzisiaj kilka rozważań na temat korzyści płynących z wielowątkowości. Zastanówmy się, jak  bardzo może nam pomóc albo zaszkodzić wprowadzenie nowych wątków w aplikacji.

Jeśli wykonanie danej pracy na jednym procesorze zajmuje T(1) a wykonanie jej na n procesorach zajmuje T(n) wtedy możemy oszacować korzyści płynące z nowych wątków.

W przypadku gdy T(1)/T(n) daje wynik <1 wtedy mamy do czynienia z powolnieniem – im więcej wątków wprowadzamy tym wolniejszy algorytm otrzymujemy. Oczywiście w takim scenariuszu nie ma sensu wprowadzać dodatkowych wątków. Niektóre rzeczy muszą zostać po prostu wykonane sekwencyjne ze względu na swoją specyfikę – np. dalsze obliczenia bazują na poprzednich.

Dużo częściej w praktyce mamy do czynienia gdy T(1)/T(n) < n. Oznacza to, że gdy praca na jednym rdzeniu zajęła 5 sekund, wtedy na 5 rdzeniach zajmie np. 1.2 sekundy. Czyli wydajność ma charakter subliniowy. Bardzo często spotykamy się z tym ponieważ musimy spędzić trochę czasu na synchronizację między wieloma wątkami. W sortowaniu przez scalanie, niektóre części muszą zostać wykonanie sekwencyjnie (scalanie). Wtedy, wprowadzając 5 nowych wątków, nie przyśpieszymy algorytmu dokładnie o 5 razy.

Jeśli np. problemy są kompletnie od siebie niezależne wtedy mamy szanse uzyskać przyśpieszenie liniowe. Chyba łatwo to zrozumieć. Jak tylko możliwe jest rozdzielenie problemu na kompletnie niezależne podproblemy, wtedy każdy z nich może zostać wykonany równolegle – nie potrzebujemy czasu na kod sekwencyjny. Sytuacja w praktyce dość rzadko spotykana.

Przyśpieszenie podliniowe i liniowe są łatwe w wyobrażeniu. Czy istnieje jednak grupa problemów, w której możemy zyskać nadliniowo? Innymi słowy, jeśli na jednym wątku zadanie zajęło 5 sekund to czy na 5 wątkach możemy to wykonać w mniej niż jedną sekundę?

Do rozwiązania niektórych problemów wątki mogą ze sobą współpracować, aby dostarczyć nadliniowe przyśpieszenie. Wyobraźmy sobie, że przeszukujemy tablicę aby znaleźć element spełniający podane kryteria. Jeśli podzielimy zadanie na kilka wątków to będziemy przeszukiwać różne rejony tablicy jednocześnie. Co jeśli taki element zajmuje się gdzieś na końcu tablicy? W podejściu sekwencyjnym musielibyśmy przeszukać wszystkie elementy a w równoległym ostatni wątek znajdzie wartość bardzo szybko.

Więcej procesorów to również więcej zasobów a konkretniej pamięci podręcznej. Pisałem kilka wpisów wcześniej, że dzisiaj bardzo wielką rolę odgrywa jak korzystamy z pamięci i ile mamy tzw. cache misses.  Wiadomo, że każdy procesor posiada pamięć podręczna a zatem gdy mamy ich więcej to mniej danych musimy przechowywać bezpośrednio w pamięci operacyjnej, która jest dużo wolniejsza.

Pętla wykonywana równolegle–statyczne przydzielanie wątków

W .NET istnieje metoda do wykonywania pętli równolegle. Pisałem ogólne o niej kilka miesięcy temu. Temat jest jednak dużo bardziej skomplikowany i z pewnością należy zrozumieć różne podejścia do problemu.

Przed zrównolegleniem pętli, należy zastanowić się czy na prawdę przyniesie to pozytywne efekty. Złe rozpoznanie przypadku spowoduje znaczącą degradację wydajności. Zastanówmy się na co należy zwracać uwagę:

  1. Czy poszczególne elementy tablicy można przetwarzać w sposób bezpieczny (thread-safe). Jeśli nie, wtedy musielibyśmy użyć elementów synchronizacyjnych co mija się z celem.
  2. Czy kolejność z jaką przetwarzamy elementy ma znaczenie? Jeśli drugi element tablicy musi zostać koniecznie wykonany po pierwszym, zrównoleglenie nie ma sensu.
  3. Jakie są relacje między poszczególnymi iteracjami? W przypadku, gdy kolejne iteracje muszą operować na danych z poprzednich, nie będziemy mieli żadnych korzyści z optymalizacji.

Zadanie to wykonanie pętli przez wszystkie elementy w kolekcji z użyciem wątków. Pierwsze pytanie jakie nasuwa się to ile wątków chcemy stworzyć? W jaki sposób je przydzielić do zadań?

Istnieje wiele rozwiązań, ale zacznijmy od najprostszego. Podzielmy tablicę na kilka równych części, w zależności od liczby dostępnych wątków. Jeśli mamy 5 wątków, to każdy z nich będzie przetwarzał 20 elementów dla tablicy 100 elementowej.

Powstały kod wygląda prosto, a mianowicie:

private static void Main(string[] args)
{
  For(Console.WriteLine,1,10,3);
  Console.WriteLine("Koniec");
}

private static void For(Action<int> iteration, int startIndex, int endIndex, int threadsNumber)
{
  int chunkSize = (endIndex - startIndex)/threadsNumber;

  var countdownEvent = new CountdownEvent(threadsNumber);

  for (int i = 0; i < threadsNumber; i++)
  {
      Task.Factory.StartNew(delegate(object threadIndexState)
      {
          int threadIndex = (int)threadIndexState;
          int threadStartIndex = threadIndex * chunkSize + startIndex;
          int threadEndIndex = threadIndex == threadsNumber - 1 ? endIndex : threadStartIndex + chunkSize;

          for (int j = threadStartIndex; j < threadEndIndex; j++)
          {
              iteration(j);
          }
          countdownEvent.Signal();
      }, i);
  }
  countdownEvent.Wait();
}

CountdownEvent używamy w celu blokowania funkcji, aż wszystkie iteracje zostaną wykonane. Kod jest bardzo prosty ale w niektórych przypadkach stanowi optymalne rozwiązanie. Zastanówmy się, kiedy powyższe, statyczne rozwiązanie nie sprawdzi się:

  1. Nie znamy rozmiaru kolekcji. W przypadku tablic, zawsze wiemy jaki jest całkowity rozmiar kolekcji. Wiele źródeł danych to jednak IEnumerable, który może być np. nieskończony. W takiej sytuacji nie jesteśmy w stanie dokonać statycznej dekompozycji, jak to zostało powyżej pokazane.
  2. Powyższa dekompozycja zakłada, że wszystkie iteracje mają taką samą liczbę operacji. A co jeśli iteracja zależy np. od indeksu? Mam ma myśli, że niektóre iteracje mogą mieć złożoność obliczeniową dużo wyższą niż pozostałe. W takim scenariuszu, statyczne przydzielenie wątków do iteracji nie jest optymalne. Co jeśli 5 pierwszych iteracji wykonają się po jednej sekundzie, a ostatnie 5 zajmą po kilka godzin?

Kolejnym problemem jest dobranie optymalnej liczby wątków wykonującej operacje. Jeśli mamy do dyspozycji 4 procesory wtedy nie ma sensu tworzyć 20 wątków ponieważ zmarnujemy czas na zmianę kontekstu a operacje  i tak będą mogły być wykonywane wyłącznie na 4 CPU.

Czy to znaczy, że dla 4 rdzeniowego procesora, 4 wątki wykonujące pętle są optymalne? Niestety nie… System Windows jak wiemy może wywłaszczyć wątki i jest to zjawisko bardzo częste. W OS jest masa wątków, które mają dużo wyższy priorytet np. te  wykonujące GC. Jeśli zatem stworzyliśmy 4 wątki i jeden lub więcej zostały zablokowane to marnujemy trochę zasoby.  Inny scenariusz to sytuacja gdzie wykorzystujemy zdarzenia do blokowania wątków. Jeśli każda iteracja musi czekać na jakieś dane wtedy zdecydowanie lepiej stworzyć więcej wątków, ponieważ część z nich i tak będzie zablokowana na jakiś czas. Zjawisko blokowania wątków jest powszechne i odbywa się niezależnie od nas (system decyduje o tym).

Ze względu na możliwość blokowania, rozsądnym podejściem jest przekazanie Environment.ProcessorCount*2 jako liczby wątków. Wtedy zwiększamy szansę, że w przypadku blokowania któryś z wątków, duża cześć danych jest wciąż przetwarzana.

Ponadto powyższy kod można lekko zoptymalizować ale o tym pisałem już tutaj:

http://www.pzielinski.com/?p=1738

Anulowanie wątków a synchronizacja

W .NET można anulować wątki za pomocą tokena. Oczywiście nie należy używać metody Abort czy Cancel, ale o tym już wiele razy pisałem – w skrócie nie wiadomo kiedy taki wątek zostanie przerwany. Użycie tokena jest proste tzn. (przykład MSDN):

class Program
{
    static void Main()
    {

        var tokenSource2 = new CancellationTokenSource();
        CancellationToken ct = tokenSource2.Token;

        var task = Task.Factory.StartNew(() =>
        {

            // Were we already canceled?
            ct.ThrowIfCancellationRequested();

            bool moreToDo = true;
            while (moreToDo)
            {
                // Poll on this property if you have to do 
                // other cleanup before throwing. 
                if (ct.IsCancellationRequested)
                {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                }

            }
        }, tokenSource2.Token); // Pass same token to StartNew.

        tokenSource2.Cancel();

        // Just continue on this thread, or Wait/WaitAll with try-catch: 
        try
        {
            task.Wait();
        }
        catch (AggregateException e)
        {
            foreach (var v in e.InnerExceptions)
                Console.WriteLine(e.Message + " " + v.Message);
        }

        Console.ReadKey();
    }
}

Należy sprawdzać czy flaga IsCancellationRequested jest ustawiona i wtedy odpowiednio zareagować. Daje nam to pełną kontrolę nad tym, kiedy wątek zakończy działanie.

Sprawa prosta. Ale co jeśli w naszej logice musimy czekać na jakieś inne wątki? Jeśli mamy obiekty synchronizacyjne wtedy sprawa nieco komplikuje się .  Wyobraźmy sobie taką pętle:


while (true)
{
    _event.Wait(); 
    // czekaj na jakies zdarzenie
    // wykonanie pracy
    if (ct.IsCancellationRequested)
    {
        ct.ThrowIfCancellationRequested();
    }
}

Powyższy kod może być implementacją wzorca producent\konsument. Jeden wątek czeka na porcje danych a drugi generuje te dane. Każdy z nich ma ten sam token sterujący. Co jeśli producent prawidłowo zostanie anulowany a następnie powyższy wątek utknie na _event.Wait? Jeśli wspieramy anulowanie wątków musimy być szczególnie ostrożni, gdy używamy jakichkolwiek mechanizmów synchronizacji.

Jeśli nasz obiekt synchronizujący nie wspiera anulowania wtedy możemy skorzystać WaitHandle.WaitAny. Metoda ta czeka aż jakieś zdarzenia zostanie ustawione. Token również eksponuje WaitHandle, zatem możemy powyższy kod przepisać na:

   int eventThatSignaledIndex =
                WaitHandle.WaitAny(new WaitHandle[] { _event, token.WaitHandle },
                                    new TimeSpan(0, 0, 20));

Metoda zwraca indeks zdarzenia, które zostało odebrane. Jeśli zatem token ustawił zdarzenie to oczywiście przerywamy pracę. Pełny przykład (MSDN):

class CancelOldStyleEvents
{
    // Old-style MRE that doesn't support unified cancellation. 
    static ManualResetEvent mre = new ManualResetEvent(false);

    static void Main()
    {
        var cts = new CancellationTokenSource();

        // Pass the same token source to the delegate and to the task instance.
        Task.Run(() => DoWork(cts.Token), cts.Token);
        Console.WriteLine("Press s to start/restart, p to pause, or c to cancel.");
        Console.WriteLine("Or any other key to exit.");

        // Old-style UI thread. 
        bool goAgain = true;
        while (goAgain)
        {
            char ch = Console.ReadKey().KeyChar;

            switch (ch)
            {
                case 'c':
                    cts.Cancel();
                    break;
                case 'p':
                    mre.Reset();
                    break;
                case 's':
                    mre.Set();
                    break;
                default:
                    goAgain = false;
                    break;
            }

            Thread.Sleep(100);
        }
    }

    static void DoWork(CancellationToken token)
    {
        while (true)
        {
            // Wait on the event if it is not signaled. 
            int eventThatSignaledIndex =
                WaitHandle.WaitAny(new WaitHandle[] { mre, token.WaitHandle },
                                    new TimeSpan(0, 0, 20));

            // Were we canceled while waiting? 
            if (eventThatSignaledIndex == 1)
            {
                Console.WriteLine("The wait operation was canceled.");
                throw new OperationCanceledException(token);

            }
            // Were we canceled while running? 
            else if (token.IsCancellationRequested)
            {
                Console.WriteLine("I was canceled while running.");
                token.ThrowIfCancellationRequested();

            }
            // Did we time out? 
            else if (eventThatSignaledIndex == WaitHandle.WaitTimeout)
            {
                Console.WriteLine("I timed out.");
                break;
            }
            else
            {
                Console.Write("Working... ");
                // Simulating work.
                Thread.SpinWait(5000000);
            }
        }
    }
}

Cześć obiektów wspiera bezpośrednio tokeny i zamiast WaitAny możemy (przykład z MSDN):

static void DoWork(CancellationToken token)
{

   while (true)
   {
       if (token.IsCancellationRequested)
       {
           Console.WriteLine("Canceled while running.");
           token.ThrowIfCancellationRequested();
       }

       // Wait on the event to be signaled  
       // or the token to be canceled, 
       // whichever comes first. The token 
       // will throw an exception if it is canceled 
       // while the thread is waiting on the event. 
       try
       {
           // mres is a ManualResetEventSlim
           mres.Wait(token);
       }
       catch (OperationCanceledException)
       {
           // Throw immediately to be responsive. The 
           // alternative is to do one more item of work, 
           // and throw on next iteration, because  
           // IsCancellationRequested will be true.
           Console.WriteLine("The wait operation was canceled.");
           throw;
       }

       Console.Write("Working...");
       // Simulating work.
       Thread.SpinWait(500000);
   }
}

Po prostu metoda Wait przyjmuje jako parametr token do anulowania. W przypadku, gdy token zostanie ustawiony to Wait wyrzuci OperactionCanceledException.

Wniosek z wpisu jest taki, że przed użyciem tokenu należy szczególnie przyjrzeć się używanym metodom synchronizacyjnym ponieważ łatwo o zakleszczenie i wyciek pamięci.

Alokacja pamięci a false sharing

Kiedyś pisałem już o false sharing. Jeśli problem nie jest znany, najpierw zachęcam do przeczytania tego wpisu, ponieważ nie będę tutaj pisał o teoretycznych zagadnieniach:

http://www.pzielinski.com/?p=1489

Oprócz wyjaśnienia podstaw, podałem przykład struktury danych składających się z dwóch Int32. Pokazałem również jakie pułapki czekają nas przy pracy z tablicami. To zadziwiające, że kolejność w jakiej przeglądamy tablicę ma tak ogromne znaczenie w wydajności (kod może być nawet kilkakrotnie wolniejszy). Oczywiście mowa tutaj tylko o wielowątkowym dostępie.

Dzisiaj kolejny przykład, na który możemy natrafić każdego dnia, pisząc kod wielowątkowy.

Alokacja pamięci w .NET jest zaawansowanym mechanizmem i bierze pod uwagę architekturę CPU. Jak napisałem (patrz powyższy post), zmienne będące blisko siebie w programie, również zostaną zaalokowane blisko siebie w pamięci (patrz przykład dwóch Int32). W większości przypadków, ma  to korzystny wpływ na wydajność. Alokacja obiektów na stercie, bierze pod uwagę wątek, z którego jest alokowana. Ma to bardzo pozytywne skutki, ponieważ dzięki temu nie mamy problemów ze wspomnianym false sharing i cache misses. Obiekty z różnych miejsc, zostaną zadeklarowane w możliwie dalekich od siebie miejscach. Napiszmy dwa programy, jeden mający problemy z false sharing:

internal class CacheLineSample
{
   public int Number { get; set; }
}

internal class Program
{
   private static void Main(string[] args)
   {
       var stopwatch = Stopwatch.StartNew();

       var data = new[] {new CacheLineSample(), new CacheLineSample(), new CacheLineSample(), new CacheLineSample()};

       Task t1 = Task.Factory.StartNew(() => Run(0, data));
       Task t2 = Task.Factory.StartNew(() => Run(1, data));
       Task t3 = Task.Factory.StartNew(() => Run(2, data));
       Task t4 = Task.Factory.StartNew(() => Run(3, data));

       Task.WaitAll(t1, t2, t3, t4);
       stopwatch.Stop();
       Console.WriteLine(stopwatch.ElapsedMilliseconds);
   }

   private static void Run(int index,CacheLineSample[] data)
   {
       CacheLineSample cacheLineSample = data[index];

       for (int i = 0; i < 10000000; i++)
       {
           cacheLineSample.Number++;
       }
   }
}

Deklarujemy tutaj pamięć w głównym wątku. A co za tym idzie, naturalne jest, że GC będzie spodziewał się, że elementy tablicy powinny być zadeklarowane koło siebie w pamięci. Następnie w różnych wątkach, zwiększamy licznik – każdy wątek operuje na osobnej klasie, której obiekt należy do wspólnej tablicy. Na swoim komputerze dostałem wynik ~550 ms.

Następnie spróbujmy zoptymalizować kod tak, że alokacja będzie dokonywana na osobnym wątkach – a co za tym idzie, logiczne jest umieszczenie obiektów w różnych miejscach w pamięci:

internal class CacheLineSample
{
    public int Number { get; set; }
}

internal class Program
{
    private static void Main(string[] args)
    {
        var stopwatch = Stopwatch.StartNew();

        var data = new CacheLineSample[4];

        Task t1 = Task.Factory.StartNew(() => Run(0, data));
        Task t2 = Task.Factory.StartNew(() => Run(1, data));
        Task t3 = Task.Factory.StartNew(() => Run(2, data));
        Task t4 = Task.Factory.StartNew(() => Run(3, data));

        Task.WaitAll(t1, t2, t3, t4);
        stopwatch.Stop();
        Console.WriteLine(stopwatch.ElapsedMilliseconds);
    }

    private static void Run(int index, CacheLineSample[] data)
    {
        CacheLineSample cacheLineSample = data[index] = new CacheLineSample();

        for (int i = 0; i < 10000000; i++)
        {
            cacheLineSample.Number++;
        }
    }
}

Otrzymany wynik to ~160 ms. Optymalizacja jest znacząca i nie należy jej bagatelizować. Każdy kto interesuje się procesorami wie jakie są trendy na rynku od wielu lat. Czasy kiedy co miesiąc wychodził procesor z szybszym zegarem już dawno minęły. Kolejną rewolucją w wydajności było doczepianie kilka dodatkowych rdzeni, co umożliwia nam realną pracę w środowisku wielowątkowym. Szybko jednak okazało się, że prawdziwą przeszkodzą jest dostęp do pamięci. Procesory były na tyle szybkie, że optymalizacje należało skierować w stronę buforowania i  unikania cache misses a nie na częstotliwość zegara czy nawet liczbę rdzeni. Problem false sharing jest znakomitym przykładem, jak dysponując szybkim procesorem, można zdegradować wydajność poprzez problemy z buforowaniem.

Code Review: Wykonywanie wielu zadań i czekanie na wynik

Wielowątkowość jest na tyle łatwo dostępna, że programiści próbują zrównoleglić jak największą liczbę zadań. Przykład:

internal class Program
{
   private static void Main(string[] args)
   {
       Task.WaitAll(Task.Factory.StartNew(Run1), Task.Factory.StartNew(Run2), Task.Factory.StartNew(Run3));
   }

   private static void Run1()
   {
   }

   private static void Run2()
   {            
   }

   private static void Run3()
   {

   }
}

W tym konkretnym przykładzie, lepsze byłoby prawdopodobnie Parallel.Invoke. Ale nie o to mi chodzi. Nie chciałem pokazywać realnego przykładu, ponieważ zbyt skomplikowałoby to post. Załóżmy, że musimy wykonać kilka zadań równoległe a potem czekać na wynik. Prostym sposobem można zoptymalizować kod, wykonując jedno zadanie na wątku macierzystym. Poprawiony kod:

Task t1 = Task.Factory.StartNew(Run1);
Task t2 = Task.Factory.StartNew(Run2);
Run3();
Task.WaitAll(t1,t2);

Skoro i tak musimy blokować macierzysty wątek, to nie ma potrzeby tworzyć kolejnego na Run3. Lepiej wykorzystać macierzysty wątek, niż marnować go na czekanie. Oszczędzamy tutaj zmianę kontekstu, co w systemach low-latency\high frequency może mieć znaczenie. Dla wielu biznesowych aplikacji, nie ma to dużego wpływu. Wciąż jednak, lepiej pisać kod wydajniejszy, zwłaszcza, że w tym przypadku jest to po prostu łatwiejsze.

Zagrożenia w wielowątkowości: Two-Step Dance

Dzisiaj o kolejnym, mało znanym, ale bardzo powszechnym zagrożeniu w środowisku współbieżnym. Zjawisko opisane w tym poście jest ściśle powiązane z lock convoy i stampede, które opisywałem w zeszłym tygodniu.

Problem jest bardzo prosty. Budzimy wątek za pomocą Pulse\Set, ale wątek i tak nic sensowego nie może zrobić, ponieważ dany zasób jest wciąż zablokowany, co skutkuje, że taki wątek znów zostanie uśpiony. Zarys:

void T1()
{
    lock (Sync)
    {
        blocker.Set();
    }
}
void T2()
{
    blocker.WaitOne();
    
    lock (Sync)
    {
    }
}

Zmienna blocker to jakakolwiek klasa implementująca WaitHandle. Skutek będzie taki, że T1 wyśle sygnał, T2 zostanie obudzony ale wtedy będzie chciał wejść do sekcji krytycznej, co poskutkuje, że znów zostanie uśpiony. Następnie T1 zwolni blokadę i T2 może zostać ponownie obudzony. Co prawda, to nie jest zagrożenie z pokroju deadlock, ponieważ aplikacja będzie działać. W świecie wielowątkowym, chcemy jednak aby wszystko działało optymalnie. Z tego względu, nie możemy sobie pozwolić na niepotrzebne zmiany kontekstu, spowodowane przez wybudzenie i uśpienie po chwili jakiegoś wątku.

Z tego co wyczytałem, na jedno rdzeniowych procesorach to jest jeszcze gorzej. W momencie gdy wysyłamy sygnał za pomocą Set, Windows zwiększy priorytet T2 tak, że T2 na pewno wywłaszczy T1. Proszę zauważyć, że nie zawsze mamy wpływ na ten problem. Domyślna implementacja condition variables w CLR, spowoduje two-step dance:

public void Thread1()
{
  Monitor.Enter(_lock);

  while (!_condition)
  {
      Monitor.Wait(_lock);
  }
  _condition = false;
  Monitor.Exit(_lock);
}

public void Thread2()
{
  Monitor.Enter(_lock);

  _condition = true;

  Monitor.Pulse(_lock);

  Monitor.Exit(_lock);
}

Niestety, aby korzystać z Pulse, musimy być w sekcji krytycznej – taka jest po prostu implementacja CLR. Nie mamy na to wpływu. Mimo wszystko, powinniśmy analizować nasz kod na bieżącą i wyłapywać niepotrzebne zmiany kontekstu, spowodowane przez two-step dance.

Zagrożenia w programowaniu współbieżnym: lock convoy oraz stampede

O zakleszczeniu czy zagłodzeniu, każdy programista słyszał, nawet nie mając styczności z programowaniem wielowątkowym. Opisany w poprzednim poście przykład livelock jest bardziej wyrafinowanym problemem.

Dzisiaj z kolei o kilku innych błędach popełnianych w środowisku wielowątkowym.

Lock Convoy  – występuje, gdy mamy za dużo wątków próbujących założyć blokadę. Załóżmy, że napisaliśmy następującą funkcję:

private void DoAsync()
{
    lock(_sync)
    {
        // jakis kod
        Thread.Sleep(1000);
    }
    // jakis kod 
}

Wykonanie jej zajmie oczywiście więcej niż jedna sekunda. Co gorsze, mamy tam sekcję krytyczną, która zawsze będzie wykonywana sekwencyjnie. Jeśli nowe wątki będą wywoływać DoAsync z wyższą częstotliwością niż może nastąpić zwolnienie blokady, wtedy do czynienia będziemy mieli z lock convoy. Lock convoy to po prostu akumulacja wątków, które blokują zasoby, ale nie mogą wykonać kodu, ponieważ uzyskanie dostępu do sekcji krytycznej jest niemożliwe.

Lock Convoy może zrujnować całkowicie system. Jeśli występuje, po jakimś czasie po prostu zabraknie zasobów. Dlatego ważne jest monitorowanie (np. za pomocą performance counters) liczby wątków tworzonych w danym systemie.

Inną ciekawostką jest, że kiedyś systemy Windows, planowały wykonywanie wątków, w takiej samej kolejności jakie one chciały wejść w sekcję krytyczną. Jeśli zatem T1 wykonuje sekcję krytyczną, a T2 oraz T3 kolejno chciały do niej wejść, wtedy OS gwarantował, że T2 najpierw uzyska dostęp, a potem dopiero T3. Jaki to ma związek z Lock Convoy? Proszę zauważyć, że pomiędzy zwolnieniem locka a przyznaniem go uśpionemu wątkowi, wykonanych musi być wiele cyklów procesora. Z tego względu, jeśli mamy opisaną powyżej sytuację (T1 wychodzi z sekcji krytycznej a T2,T3 są uśpione), dużo lepiej nadać dostęp wątkowi, który jest już wykonywany. Załóżmy, że T4 nie wymaga zmiany kontekstu i chce akurat wejść do sekcji krytycznej trzymanej przez T1 – dużo szybciej będzie mu nadać dostęp niż zmieniać kontekst dla T2. Jaka jest wada jednak takiego rozwiązania? Oczywiście może dojść do zagłodzenia T2 i T3.

Kolejnym zagrożeniem to stampede, występującym również, gdy korzystamy np. z metody Monitor.PulseAll. Przykład:

class ConditionPatternSample
{
   private readonly Object _lock = new Object();
   private Boolean _condition = false;

   public void Thread1()
   {
       Monitor.Enter(_lock);

       while (!_condition)
       {
           Monitor.Wait(_lock);
       }
       _condition = false;
       Monitor.Exit(_lock);
   }

   public void Thread2()
   {
       Monitor.Enter(_lock);

       _condition = true;

       Monitor.PulseAll(_lock);

       Monitor.Exit(_lock);
   }
}

Powyższy kod, prezentuje sprawdzanie flagi w środowisku wielowątkowym. Używamy tutaj PulseAll, który wybudza wszystkie wątki. Jaki jest tego skutek? Jeśli mamy 100 wątków czekających w metodzie Thread1, zostaną one wszystkie wybudzone. A co potem? Jak widać, są one w sekcji krytycznej i znów 99 będzie musiało zostać uśpionych. Skutek tego jest dość katastrofalny – mnóstwo zmian kontekstu, która jest bardzo kosztowna.

Klasa Monitor ma dwie kolejki, wątków gotowych do uzyskania blokady oraz czekających na Pulse. W momencie, gdy wykonujemy metodę Monitor.Enter, przeniesiony on jest do kolejki “ready”. W przypadku wykonania Monitor.Wait jest on przeniesiony do “waiting queue”. Z kolei Pulse przenosi tylko pierwszy wątek z waiting queue  z powrotem do ready queue. PulseAll  za to przeniesie wszystkie wątki, nie tylko ten pierwszy. Różnica jest taka, że wątki mogą uzyskać dostęp do sekcji, wyłącznie gdy są w ready queue.

Ktoś może słusznie zadać pytanie. Po co wywoływać PulseAll, skoro zarówno Pulse jak i PulseAll muszą być umieszczone pomiędzy Monitor.Enter a Monitor.Exit? W końcu zawsze tylko jeden wątek będzie mógł wykonywać daną operację. Czy nie będzie to w praktyce to samo? Pulse budzi jeden wątek i jest on wykonywany. PulseAll budzi wszystkie, ale tylko jeden naturalnie będzie wykonany. Nie lepiej nie używać po prostu PulseAll i tym sposobem unikać zawsze stapmede? Wszystko zależy od konkretnego scenariusza. Jeśli wszystkie wątki sprawdzają ten sam warunek (jak wyżej), nie ma sensu korzystać z PulseAll, ponieważ kod może zostać wykonany przez jakikolwiek wątek. Inna sprawa, gdy każdy wątek czeka na inny warunek. Wtedy jeśli będziemy budzić wątek po wątku, wykonamy masę niepotrzebnych zmian kontekstu. Wybudzimy np. T1, który sprawdzi warunek i znów uśpi się. PulseAll z kolei wybudzi wszystkie i każdy z nich sprawdzi czy spełnia dany warunek. PulseAll gwarantuje nam, że wszystkie wątki sprawdzą czy spełniają dany warunek. Gdybyśmy wywołali Pulse jeden wątek sprawdziłby warunek i w przypadku jego niespełnienia zostałby przeniesiony znów do waiting queue, co spowoduje, że wątki, które mogłyby coś wykonać, nie zostaną odpalone.

Podsumowując: PulseAll może prowadzić do stampedee. Czasami po prostu jest to wymagane, aby poprawnie zaimplementować algorytm  – przykład sprawdzania różnych warunków w środowisku równoległym. Monitor.Pulse z kolei może pogorszyć lock convoy. Wyobraźmy sobie, że budzimy wątek, on sprawdza warunek i znów musimy go uśpić – będziemy potrzebować więcej zmian kontekstu. Problem z Monitor.Pulse i  lock convoy to tam sama sytuacja jak z opisanym na początku szeregowaniem wątku – czasami lepiej dać dostęp do sekcji krytycznej wątkowi, który nie potrzebuje zmiany kontekstu i jest gotowy uzyskać dostęp od razu.