TPL Dataflows – część VIII (przykład z BufferBlock i JoinBlock)

W kilku ostatnich postach pisałem o blokach grupujących. Obiecywałem, że pokaże kilka przykładów ich zastosowania, w szczególności wyjaśniające zachowanie zachłanne i niezachłanne. W dzisiejszym wpisie, zaprezentuję przykład (lekko zmodyfikowany z MSDN) jak JoinBlock i BufferBlock mogą zostać zastosowane.

Wyobraźmy sobie, że mamy kilka źródeł danych. Na przykład: WCF, pliki oraz pamięć. Odczyt z WCF prawdopodobnie będzie najwolniejszy. W zależności od lokalizacji, odczyt pliku prawdopodobnie będzie dużo szybszy. Z kolei czytanie danych z pamięci będzie zdecydowanie najszybsze. Stwórzmy trzy klasy reprezentujące powyższe źródła danych:

internal interface IDataSource
{
    int Read();
}
class MemoryResource:IDataSource
{
   public int Read()
   {
       Console.WriteLine("Odczyt danych z memory resource.");
       Thread.Sleep(1000);

       return 1;
   }
}
class WcfResource : IDataSource
{
   public int Read()
   {
       Console.WriteLine("Odczyt danych z wcf resource.");
       Thread.Sleep(7000);

       return 1;
   }
}
class FileResource : IDataSource
{
   public int Read()
   {
       Console.WriteLine("Odczyt danych z file resource.");
       Thread.Sleep(4000);

       return 1;
   }
}

Następnie, aby wykonać jakąś fikcyjną operację, należy mieć dane z WCFResource oraz MemoryResource lub z FileResource i MemoryResource. Można podzielić logię na dwa JoinBlock. Dane z MemoryBlock zawsze są potrzebne oraz w zależności od kontekstu, wymagane jest pobranie informacji z WcfResource lub MemoryResource.  Ponadto, będziemy musieli kolejkować (buforować) przychodzące dane:

image

Dlaczego musimy korzystać z BufferBlock? Zakładamy, że dane ciągle przychodzą z podanych źródeł i należy je obsłużyć jak najszybciej. Można tego dokonać  wykonując operacje I albo operację II, w zależności jaka para danych przyszła. Wyłącznie MemoryResource może zostać użyty w obydwu operacjach.

Zamieńmy zatem powyższy schemat na kod. Najpierw musimy kolejkować\buforować przychodzące dane:

var memoryResourceBuffer = new BufferBlock<int>();
var networkResourceBuffer = new BufferBlock<int>();
var wcfResourceBuffer = new BufferBlock<int>();

Kolejnym etapem jest stworzenie dwóch JoinBlock i odpowiednie podtrzepienie danych z właśnie utworzonych buforów:

var joinBlock1 = new JoinBlock<int, int>();
memoryResourceBuffer.LinkTo(joinBlock1.Target1);
wcfResourceBuffer.LinkTo(joinBlock1.Target2);
joinBlock1.LinkTo(new ActionBlock<Tuple<int, int>>(data=>Operation1(data)));

var joinBlock2 = new JoinBlock<int, int>();
memoryResourceBuffer.LinkTo(joinBlock2.Target1);
networkResourceBuffer.LinkTo(joinBlock2.Target2);
joinBlock2.LinkTo(new ActionBlock<Tuple<int, int>>(data => Operation2(data)));

//..

private static void Operation1(Tuple<int, int> data)
{
  Thread.Sleep(5000);
  Console.WriteLine("Wykonywanie operacji I");
}
private static void Operation2(Tuple<int, int> data)
{
  Thread.Sleep(5000);
  Console.WriteLine("Wykonywanie operacji II");
}

Każdy JoinBlock (patrz poprzednie posty) eksponuje Target1 oraz Target2, na które wysyłamy dane z buforów. Następnie łączymy wynik, który składa się z danych z obydwu źródeł i wysyłamy je do kolejnego bloku, którym jest ActionBlock: operation1 oraz operation2.

Na końcu musimy “wpompować” dane do systemu np.:

var memoryResource=new MemoryResource();
var wcfResource = new WcfResource();
var fileResource = new FileResource();

for (int i = 0; i < 5; i++)
{
    memoryResourceBuffer.SendAsync(memoryResource.Read());
    networkResourceBuffer.SendAsync(wcfResource.Read());
    wcfResourceBuffer.SendAsync(fileResource.Read());
}

Jeśli teraz byśmy uruchomili aplikacje to zobaczyliśmy, że wyłącznie operacja I jest wykonywana:

image

Dlaczego? Po prostu domyślnie, wszystkie bloki grupujące są zachłanne i JoinBlockI przyjmuje wszystkie dane, nawet jak nie jest w stanie ich obsłużyć. W końcu po otrzymaniu pierwszej pary z MemoryResource i WcfResource, JoinBlock powinien ustąpić na rzecz drugiego bloku. Aby zmienić zachowanie na niezachłanne wystarczy:

var joinBlock1 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });

Na ekranie teraz powinniśmy zobaczyć wyniki pojawiające się z dwóch operacji:

image

W trybie zachłannym wszystkie wiadomości są akceptowane, nawet jak nie ma odpowiednika na drugim Target. Innymi słowy, gdy mamy 10 wiadomości na Target I i 0 na Target II, w trybie zachłannym wszystkie 10 zostanie zaakceptowanych. W trybie niezachłannym tylko pierwsza zostanie zaakceptowana, z kolei pozostałe 9 będą miały status “postponed”, co oznacza, że mogą one być wykorzystane przez inny JoinBlock. Dzięki trybowi niezachłannemu, jesteśmy w stanie balansować ruch między tymi dwoma blokami.

Na zakończenie cały listing kodu:

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;


namespace ConsoleApplication4
{
    internal interface IDataSource
    {
         int Read();
    }
    class MemoryResource:IDataSource
    {
        public int Read()
        {
            Console.WriteLine("Odczyt danych z memory resource.");
            Thread.Sleep(1000);

            return 1;
        }
    }
    class WcfResource : IDataSource
    {
        public int Read()
        {
            Console.WriteLine("Odczyt danych z wcf resource.");
            Thread.Sleep(7000);

            return 1;
        }
    }
    class FileResource : IDataSource
    {
        public int Read()
        {
            Console.WriteLine("Odczyt danych z file resource.");
            Thread.Sleep(4000);

            return 1;
        }
    }

    internal class Program
    {
        private static void Main()
        {
            var memoryResourceBuffer = new BufferBlock<int>();
            var networkResourceBuffer = new BufferBlock<int>();
            var wcfResourceBuffer = new BufferBlock<int>();

            var joinBlock1 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
            memoryResourceBuffer.LinkTo(joinBlock1.Target1);
            wcfResourceBuffer.LinkTo(joinBlock1.Target2);
            joinBlock1.LinkTo(new ActionBlock<Tuple<int, int>>(data=>Operation1(data)));

            var joinBlock2 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
            memoryResourceBuffer.LinkTo(joinBlock2.Target1);
            networkResourceBuffer.LinkTo(joinBlock2.Target2);
            joinBlock2.LinkTo(new ActionBlock<Tuple<int, int>>(data => Operation2(data)));

            var memoryResource=new MemoryResource();
            var wcfResource = new WcfResource();
            var fileResource = new FileResource();

            for (int i = 0; i < 5; i++)
            {
                memoryResourceBuffer.SendAsync(memoryResource.Read());
                networkResourceBuffer.SendAsync(wcfResource.Read());
                wcfResourceBuffer.SendAsync(fileResource.Read());
            }

            Console.ReadLine();
        } 
        private static void Operation1(Tuple<int, int> data)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Wykonywanie operacji I");
        }
        private static void Operation2(Tuple<int, int> data)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Wykonywanie operacji II");
        }
    }
}

Właściwości tylko do odczytu w WPF

Czasami w WPF zachodzi potrzeba stworzenia dependency property ale tylko do odczytu. Zwykłą właściwość bardzo łatwo zaimplementować tzn.:

public double Area
{
  get 
  {
      return _width*_height;
  }
}

Jeśli korzystamy z binding’u danych, wtedy powyższe rozwiązanie jest niewystarczające i należy skorzystać z dependnecy property. Dla przypomnienia, standardowa deklaracja wygląda następująco:

public class MyStateControl : ButtonBase
{
  public MyStateControl() : base() { }
  public Boolean State
  {
    get { return (Boolean)this.GetValue(StateProperty); }
    set { this.SetValue(StateProperty, value); } 
  }
  public static readonly DependencyProperty StateProperty = DependencyProperty.Register(
    "State", typeof(Boolean), typeof(MyStateControl),new PropertyMetadata(false));
}

Ktoś mógłby zaproponować takie rozwiązanie:

public class MyStateControl : ButtonBase
{
  public MyStateControl() : base() { }
  public Boolean State
  {
    get { return (Boolean)this.GetValue(StateProperty); }
  }
  public static readonly DependencyProperty StateProperty = DependencyProperty.Register(
    "State", typeof(Boolean), typeof(MyStateControl),new PropertyMetadata(false));
}

Niestety powyższy kod nie zabroni nikomu zmieniania StateProperty. Powyższe gettery\settery to wyłącznie wrapper C#. WPF czy inni programiści, wciąż mogą używać MyStateControl.StateProperty.SetValue w celu modyfikacji danych.

Prawidłowa implementacja powinna używać DependencyPropertyKey  oraz metodę RegisterReadOnly:

internal static readonly DependencyPropertyKey AquariumSizeKey = DependencyProperty.RegisterReadOnly(
  "AquariumSize",
  typeof(double),
  typeof(Aquarium),
  new PropertyMetadata(double.NaN)
);
public static readonly DependencyProperty AquariumSizeProperty =
  AquariumSizeKey.DependencyProperty;
public double AquariumSize
{
  get { return (double)GetValue(AquariumSizeProperty); }
}

Należy pamiętać aby DependencyPropertyKey było typu internal – dzięki temu, nie będzie można zmodyfikować właściwości z spoza klasy. Publiczny dostęp ma wyłącznie DependencyProperty ale próba modyfikacji zakończy się zawsze wyjątkiem. Jedyny zatem sposób na ustawienie wartości to użycie DependencyPropertyKey, który z kolei ma dostęp internal:

this.SetValue(AquariumSizeKey,size);

Poniższy kod z kolei wywoła wyjątek:

this.SetValue(AquariumSizeProperty,size);

TPL Dataflows – część VII (BatchedJoinBlock)

W ostatnich postach pisałem o JoinBlock oraz BatchedBlock czyli o blokach grupujących.  Dzisiaj czas na połączenie tych dwóch konstrukcji, a mianowicie BatchedJoinBlock. Jak sama nazwa sugeruje, block będzie łączył elementy z kilku źródeł w Tuple ale zamiast Tuple<T> (czysty Join) będzie to Tuple<IList<T>> czyli elementy będą dodatkowo buforowane. Zmodyfikujmy ostatni przykład:

internal class Program
{
   private static void Main(string[] args)
   {
       var joinBlock = new BatchedJoinBlock<string, int>(3);
       joinBlock.LinkTo(new ActionBlock<Tuple<IList<string>, IList<int>>>(data => Display(data)));

       joinBlock.Target1.Post("Klucz");
       joinBlock.Target1.Post("Klucz 2");
       joinBlock.Target2.Post(5);
       joinBlock.Target2.Post(6);
       joinBlock.Target1.Post("Klucz 3");

       Console.ReadLine();
   }

   private static void Display(Tuple<IList<string>, IList<int>> data)
   {
       foreach (string key in data.Item1)
           Console.WriteLine(key);

       foreach (int value in data.Item2)
           Console.WriteLine(value);
   }
}

Na ekranie wyświetli się:

image

Dlaczego? Określiliśmy rozmiar porcji na 3 elementy. Pierwszym elementem jest “Klucz”, drugim “Klucz2” a ostatnim 5. Przekazany rozmiar w konstruktorze zatem określa liczbę wszystkich właściwości Tuple.

Myślę, że taki krótki opis wystarczy. Jeśli ktoś rozumie BatchedBlock oraz JoinBlock (opisane w poprzednich postach) z pewnością zrozumie BatchedJoinBlock – po prostu blok zwraca Tuple<IList<T>…>.

W kolejnym wpisie postaram się już o jakiś sensowniejszy przykład z użyciem grupujących bloków.

TPL Dataflows – część VI (JoinBlock)

W poprzednim wpisie pisałem o pierwszym bloku grupującym – BatchBlock. Dziś czas przyszedł na kolejny element, tym razem JoinBlock. JoinBlock grupuje elementy podane na wejście w formie Tuple (pisałem o tej klasie kiedyś na blogu). Jeśli zatem podamy na wejście dwa integer’y, na wyjściu pojawi się Tuple<int,int>.

Tak samo jak BatchBlock, JoinBlock działa w dwóch trybach – greedy oraz non-greedy. Zasada jest taka sama – zachęcam do przeczytania poprzedniego postu. W skrócie, w trybie zachłannym wszystkie wiadomości są akceptowane od razu, z kolei w niezachłannym są one “odraczane” co może umożliwić innym blokom ich użycie. Za kilka postów planuje napisać jakiś przykład pokazujący tą różnicę.

W przeciwieństwie do poprzednich elementów, JoinBlock nie implementuje interfejsu ITargetBlock a eksponuje właściwości Target1, Target2, które z  kolei implementują ITargetBlock. Przejdźmy więc do przykładu:

class Program
{
   private static void Main(string[] args)
   {
       var joinBlock = new JoinBlock<string, int>();
       joinBlock.LinkTo(new ActionBlock<Tuple<string, int>>(data=>Display(data)));

       joinBlock.Target1.Post("Klucz");                        
       joinBlock.Target1.Post("Klucz 2");
       joinBlock.Target2.Post(5);            
       joinBlock.Target2.Post(6);
       joinBlock.Target1.Post("Klucz 3");


       Console.ReadLine();
   }
   private static void Display(Tuple<string,int> data)
   {
       Console.WriteLine(data);
   }
}

Na ekranie najpierw wyświetli się (Klucz,5), potem (Klucz 2,6), a na końcu JoinBlock będzie czekał na wartość integer aby połączyć ją z Klucz3.

TPL Dataflows – część V (BatchBlock)

Dzisiaj pierwszy post o tzw. grouping block czyli blokach grupujących. Ich zasada jest prosta – grupują dane z różnych źródeł w sposób zależny już od konkretnego bloku. W tej części zajmiemy się BatchBlock, który przychodzące dane buforuje, a następnie przesuwa je na wyjście w zdefiniowanych porcjach.

BatchBlock działa w dwóch trybach: greedy i non-greedy. W przypadku implementacji zachłannej, wszystko co pojawia się na wejściu jest akceptowane i przekazywane na wyjście gdy uzbiera się określona liczba elementów. W przypadku trybu niezachłannego, wyłącznie dane, które mają określoną liczbę elementów będą akceptowane. Innymi słowy, jeśli zdefiniujemy batch jako 10 elementów, wtedy w trybie niezachłannym wyłącznie tablice o długości 10 lub więcej będą akceptowane, a reszta będzie uznawana jako postponed. Z kolei w zachłannym trybie, wszystko będzie akceptowane, buforowane i przekazane na wyjście gdy bufor odpowiednio zapełni się.

Domyślnie, BatchBlock działa w sposób zachłanny. Przejdźmy do przykładu:

class Program
{
   private static void Main(string[] args)
   {
       var batchBlock = new BatchBlock<int>(5);
       batchBlock.LinkTo(CreateActionBlock("Output A"));

       for (int i = 0; i < 7; i++)
           batchBlock.Post(i);

       Console.ReadLine();
   }
   private static ActionBlock<int[]> CreateActionBlock(string label)
   {
       return new ActionBlock<int[]>(delegate(int[] batch)
           {
               Console.WriteLine(label);

               foreach (var item in batch)
               {
                   Console.WriteLine(item);
               }
           });
   }
}

W konstruktorze definiujemy rozmiar batch’a (porcji) na 5 elementów. Następnie podłączamy do niego zwykłą akcję wyświetlającą wyjście. Pamiętajmy, że wyjście BatchBlock to tablica elementów czyli zbuforowane dane. Jeśli ustawiamy batch size na 5 wtedy, wyjście zawsze będzie stanowić tablica 5-elementowa. W powyższym przykładzie, mimo, że dodaliśmy 7 elementów to i tak 5 tylko zostanie wyświetlonych na ekranie. Zmieńmy teraz tryb na niezachłanny, aby zobaczyć w praktyce różnicę w działaniu:

class Program
{
   private static void Main(string[] args)
   {
       var batchBlock = new BatchBlock<int>(5, new GroupingDataflowBlockOptions {Greedy = false});
       batchBlock.LinkTo(CreateActionBlock("Output A"));            

       for (int i = 0; i < 5; i++)
           batchBlock.Post(i);

       Console.ReadLine();
   }

   private static ActionBlock<int[]> CreateActionBlock(string label)
   {
       return new ActionBlock<int[]>(delegate(int[] batch)
           {                    
               Console.WriteLine(label);

               foreach (var item in batch)
               {
                   Console.WriteLine(item);
               }
           });
   }
}

Na ekranie nic nie zobaczymy. Dlaczego? Jak wspomniałem, niezachłanna implementacja nie zaakceptuje wiadomości jeśli bufor nie zapełni się. W tym problem, że używając Post, nigdy nie uda nam się tego osiągnąć, ponieważ Post traktuje opóźnienie w przetwarzaniu danych jako ich odrzucenie. W dokumentacji znajdziemy następującą informację:

“For target blocks that support postponing offered messages, or for blocks that may do more processing in their Post implementation, consider using SendAsync, which will return immediately and will enable the target to postpone the posted message and later consume it after SendAsync returns.”

Zamieniając Post na SendAsync ujrzymy output na ekranie.  Wiadomość nie zostanie ani odrzucona ani zaakceptowana – po prostu w późniejszym czasie będzie mogła zostać przetworzona. Jeśli jest to jeszcze niejasne nie ma co się martwić – w następnych postach pokażę bardziej praktyczny przykład gdzie ustawienie trybu non-greedy jest przydatne. Po dzisiejszym wpisie należy wiedzieć, że tryb niezachłanny ustawia status jako postponed – czyli opóźnienie w przetwarzaniu, z kolei tryb zachłanny akceptuje natychmiast wszystkie dane.

AutoResetEvent\ManualResetEvent–synchronizacja między procesami

AutoResetEvent\ManualResetEvent może być używany do synchronizacji międzyprocesowej tak samo jak np. mutex. Posiada podobny zestaw metod do tworzenia obiektu z nazwą oraz późniejszego jego otwierania.

Aby móc go użyć do synchronizacji międzyprocesowej należy oczywiście nadać obiektowi nazwę – tak samo jak to jest z Mutex. W tym problem, że konstruktory ManualResetEvent czy AutoResetEvent nie przyjmują takich parametrów. Zaglądając jednak do dokumentacji dowiemy się, że:

public sealed class ManualResetEvent : EventWaitHandle

public sealed class AutoResetEvent : EventWaitHandle

Obie klasy dziedziczą po EventWaitHandle, którego konstruktor wygląda następująco:

public EventWaitHandle(bool initialState,EventResetMode mode);

public EventWaitHandle(bool initialState,EventResetMode mode,string name);

public EventWaitHandle(bool initialState,EventResetMode mode,string name,out bool createdNew);

public EventWaitHandle(bool initialState,EventResetMode mode,string name,out bool createdNew,EventWaitHandleSecurity eventSecurity);

Tak naprawdę ManualResetEvent oraz AutoResetEvent to wrappery, ułatwiające pracę ze zdarzeniami. Możemy bezpośrednio użyć EventWaitHandle i przekazać  nazwę. Stwórzmy więc proces A, który użyje takiego konstruktora:

internal class Program
{
   public static void Main()
   {
       var manualResetEvent = new EventWaitHandle(false, EventResetMode.ManualReset, "TestEvent");
       Console.WriteLine("Utworzono obiekt EventWaitHandle");
       manualResetEvent.WaitOne();
   }
}

Następnie w procesie B, chcemy odczytać ten obiekt i np. zamknąć aplikację, gdy on istnieje (używaliśmy mutex’ów do tego samego celu):

class Program
{
   static void Main(string[] args)
   {
       try
       {
           EventWaitHandle eventHandle = EventWaitHandle.OpenExisting("TestEvent");
       }
       catch(WaitHandleCannotBeOpenedException)
       {
           Console.WriteLine("Nie udało się otworzyć obiektu.");
           return;
       }
       Console.WriteLine("Obiekt znaleziony.");                
   }
}

Gdy nie ma obiektu o podanej nazwie wyrzucany jest wyjątek WaitHandleCannotBeOpenedException. W przeciwnym wypadku możemy korzystać z niego, tak jakby został on utworzony w tym samym procesie (Set, Wait itp.).

Co jeśli wywołamy kontruktory tworzące ten sam obiekt w dwóch różnych procesach? Tzn.:

// Proces A
internal class Program
{
   public static void Main()
   {
       var manualResetEvent = new EventWaitHandle(false, EventResetMode.ManualReset, "TestEvent");
       manualResetEvent.WaitOne();
       Console.WriteLine("Sygnal odebrany.");
   }
}
// Proces B
class Program
{
   static void Main(string[] args)
   {
       var manualResetEvent = new EventWaitHandle(false, EventResetMode.ManualReset, "TestEvent");
       manualResetEvent.Set();
       Console.WriteLine("Sygnal wyslany.");
   }
}

Proces B, automatycznie otworzy obiekt zainicjalizowany w procesie A.  W wielu przypadkach to jest logiczne i pożądane rozwiązanie ale co w przypadku gdy Proces A utworzył ManualResetEvent, z kolei proces B AutoResetEvent?

// Proces A
internal class Program
{
   public static void Main()
   {
       var manualResetEvent = new EventWaitHandle(false, EventResetMode.ManualReset, "TestEvent");
       manualResetEvent.WaitOne();
       Console.WriteLine("Sygnal odebrany.");
   }
}
// Proces B
class Program
{
   static void Main(string[] args)
   {
       var manualResetEvent = new EventWaitHandle(false, EventResetMode.AutoReset, "TestEvent");
       manualResetEvent.Set();
       Console.WriteLine("Sygnal wyslany.");
   }
}

Z opisanych przyczyn powinniśmy unikać takiej konstrukcji ponieważ nie wiemy, czy zdarzenie zostało utworzone czy otworzone – stąd nie mamy pojęcia o jego typie, stanie itp. Istnieje inny konstruktor, który poinformuje nas, jaka operacja została właściwie wykonana:

public EventWaitHandle(
    bool initialState,
    EventResetMode mode,
    string name,
    out bool createdNew
)

Parametr createNew określa, czy zdarzenie jest nowe czy zostało tylko otworzone. Pamiętajmy, że gdy zdarzenie jest otwierane przez proces B, to nieważne jaki stan (lub typ) przekażemy – zawsze będzie używany ten pierwotny z procesu A.

Code review: pula wątków a maksymalna liczba wątków

W ostatnim poście wspomniałem o minimalnej liczbie wątków. Istnieje również górny próg, określający ile maksymalnie może zostać stworzonych wątków. Zbyt niski próg oraz zła architektura może spowodować bardzo trudny w znalezieniu błąd a mianowicie deadlock. Wyobraźmy sobie następującą sekwencję zdarzeń:

  1. Wątek T0 (lub główny, nie ma znaczenia) dodaje zadanie do puli.
  2. Stworzone zadanie tworzy n nowych zadań.
  3. T0 czeka aż wszystkie n zadań zostanie wykonanych (wait).

Następnie przyjmijmy, że w tych n wątkach, któryś czeka na zakończenie jakiegoś innego. Jeśli wątek numer n-2 czeka na n-1 to dojdzie do deadlock. W przypadku, gdy limit jest ustawiony na 50, a zadań do wykonania jest 70 z tym, że np. zadanie numer 60 czeka na wykonanie 61 to będziemy mieli do czynienia z zakleszczeniem. Jeśli jest to niejasne, zobrazujmy to kodem:

class Program
{
   static ManualResetEvent _blocker=new ManualResetEvent(false);

   static void Main(string[] args)
   {
       int maximal, temp;
       ThreadPool.GetMaxThreads(out maximal, out temp);
       Console.WriteLine("Max threads: {0}",maximal);

       for (int i = 0; i < maximal + 1; i++)
       {
           ThreadPool.QueueUserWorkItem(Run);
       }
       ThreadPool.QueueUserWorkItem(Signal);
       Console.ReadLine();
   }
   private static void Signal(object state)
   {
       Console.WriteLine("Signaling");
       _blocker.Set();
   }
   private static void Run(object state)
   {
       _blocker.WaitOne();
       Console.WriteLine("Run");
   }
}

W .NET 2.0 i 3.5 domyślnie maksymalna liczba wątków na procesor (rdzeń) to 250. W .NET 1.0 było to tylko 25. Powyższy kod najpierw tworzy wątki, które dopiero zostaną wykonane gdy ManualResetEvent wyśle sygnał. W tym problem, że wyczerpie to limit wolnych wątków i Signal nigdy nie zostanie wykonany.

Od wersji .NET 4.0 ta liczba ma charakter dynamiczny. W zależności od dostępnych zasobów może zostać zwiększona lub zmniejszona (mam na myśli wciąż wartość domyślną).

Problem może się wydawać niewarty uwagi ale w praktyce bardzo łatwo go popełnić. Jeśli korzystamy z zewnętrznych komponentów nie mamy pojęcia jak bardzo pula jest obciążona. Co jeśli mamy dostępne tylko 2 wątki? W takiej sytuacji, może okazać się, że bardzo prosty  problem spowoduje deadlock.

Powyższy przykład pokazuje również problem zaprezentowany w poprzednim wpisie – zbyt niska wartość minimalnej liczby wątków. Stworzenie 250 wątków zajmie ponad kilka minut!

Wątki z puli–optymalna liczba wątków.

Pula wątków to specjalny mechanizm zaimplementowany w CLR, mający na celu ponowne używanie tych samych wątków. W dzisiejszym wpisie chciałbym wyjaśnić co to jest optymalna liczba wątków i jaki ona ma wpływ na wydajność. Rozważmy następujący kod:

internal class Program
{
   public static void Main()
   {
       for (int i = 0; i < 10; i++)
       {
           Stopwatch stopwatch = Stopwatch.StartNew();
           ThreadPool.QueueUserWorkItem(Run, stopwatch);
       }
       Console.ReadLine();
   }
   private static void Run(object state)
   {
       Stopwatch stopwatch = (Stopwatch) state;

       Console.WriteLine(stopwatch.ElapsedMilliseconds);
       Thread.Sleep(10000);
   }
}

Na ekranie (procesor dwurdzeniowy) zobaczymy:

image

Dlaczego tworzenie kolejnych wątków (oprócz dwóch pierwszych) jest tak wolne? Zasada jest prosta – gdy liczba utworzonych wątków przekroczy optymalną wartość wtedy kolejne będą spowalniane tak, aby wyłącznie jeden został tworzony w ciągu 500 milisekund. Stąd możemy zaobserwować odstępy półsekundowe.Co jest zatem tym optymalnym progiem? Domyślnie jest to Max(MinThreads, CPU_CORES) czyli jest to albo liczba rdzeni  (procesorów) albo minimalna liczba wątków, którą można samemu zdefiniować. Domyślnie (dla aplikacji desktop)  minimalna liczba wątków to zero stąd została użyta w moim przypadku wartość 2 (liczba rdzeni).

Można oczywiście ustawić samemu minimalną liczbę wątków:

ThreadPool.SetMinThreads(10, 0);

Wtedy zobaczymy:

image

Code Review: wątki z puli oraz modyfikacja ich stanu

Dzisiaj bardzo krótka notka, mająca na celu przestrzec przed modyfikacją jakichkolwiek właściwości wątku, który pochodzi z puli. Bardzo łatwo zmienić jego stan poprzez ustawienie nowego priorytetu albo zmianę kultury. Inny przykład to TLS o którym już pisałem na blogu. Dlaczego jest to tak złe?

private void Run()
{
  Thread.CurrentThread.Priority = ThreadPriority.Highest;
}

Musimy zdać sobie sprawę, że takowe wątki wyłącznie wypożyczamy. Ktoś na forum porównał to do wypożyczalni samochodów. Gdy wypożyczamy samochód nie możemy go przemalować i potem zwrócić jakby nic nie stało się. Podobnie jest z wątkami. Za kilka sekund może on wykonywać kompletnie inną logikę, niezależną od nas.

Błędy powstałe przez łamanie tej zasady są bardzo trudne w znalezieniu – zależą od historii callback’ow wykonanych przez dany wątek. Jeśli na produkcji pojawi się takowy problem będzie to bardzo mozolne poszukiwanie przyczyny.

Implementacja CLR zapewnia, że nazwa wątku, priorytet, typ (IsBackground) są zawsze resetowane po zwróceniu go z powrotem do puli. Mimo to, uważam, że nie powinniśmy na tym polegać i nigdy nie powinniśmy modyfikować stanu wątku, nawet w .NET (dla programistów WinApi jest to niekwestionowana zasada). Pomimo, że CLR zawiera kilka udoskonaleń to i tak nie zresetuje stanu TLS ponieważ jest to zbyt bardzo czasochłonne. Z tego wynika, że lepiej nie ryzykować i nie zmieniać stanu – nie zawsze wiemy co jest resetowane w aktualnej wersji .NET.