PLINQ (część III) – scalanie oraz zachowanie kolejności

W ostatnich postach przedstawiałem różne scenariusze użycia PLINQ. Ze względu na zrównoleglenie przetwarzania, kolejność na wyjściu nie zawsze będzie taka sama. Najlepiej to rozważyć na przykładzie:

int[] numbers = Enumerable.Range(1, 50).ToArray();
foreach(int number in numbers.AsParallel().Where(n=>n>2))
{
 Console.WriteLine(number);
}

W scenariuszu sekwencyjnym, spodziewalibyśmy się liczb z zakresu 3-50. Skoro sekwencja wejściowa ma uporządkowane liczby od 1-50 to po wykonaniu zapytania oczekujemy po prostu filtracji. W PLINQ jednak jak wiemy, kilka osobnych wątków wykonuje zapytanie i z tego względu nie ma gwarancji, że kolejność będzie zachowana. Na wyjściu oczywiście uzyskamy elementy większe niż 2 ale w losowej kolejności typu 5,6,4,10,3. Jeśli to konieczne, istnieje  sposób na zachowanie kolejności a mianowicie metoda AsOrdered:

int[] numbers = Enumerable.Range(1, 50).ToArray();
foreach(int number in numbers.AsParallel().AsOrdered().Where(n=>n>2))
{
 Console.WriteLine(number);
}

Powinniśmy jednak tak projektować algorytmy, aby unikać potrzeby korzystania z AsOrdered – operacja oczywiście spowoduje zauważalną stratę w wydajności.

Kolejna kwestia to foreach. W poprzednich postach używałem zawsze ForEach. Nie jest to jednak optymalne rozwiązanie. W końcu najpierw dzielimy dane na podgrupy, które później będą wykonywane niezależnie w różnych wątkach. Następnie wywołujemy ForEach, który ma charakter sekwencyjny i zawsze wymaga wykonania całości. Jeśli korzystamy z ForEach, zawsze musimy poczekać, aż wszystkie wątki wykonają się i na końcu trzeba scalić wynik – dość czasochłonne. Na szczęście istnieje funkcja ForAll, która nie czeka do końca a korzysta z danych, które aktualnie są już wygenerowane:

int[] numbers = Enumerable.Range(1, 50).ToArray();
numbers.AsParallel().Where(n=>n>2).ForAll(Console.WriteLine);            

ForAll zatem przetwarza dane, zanim jeszcze wszystkie wątki skończą zadanie.  W ten sposób unikamy ostatniego kroku – scalenia poszczególnych zapytań.

Ponadto istnieje sposób, na manualne określenie metody scalania:

int[] numbers = Enumerable.Range(1, 50).ToArray();
var query = numbers.AsParallel().WithMergeOptions(ParallelMergeOptions.FullyBuffered).Where(n => n > 2);

ParallelMergeOptions przyjmuje następujące wartości:

  1. AutoBuffered – dokonuje buforowania elementów. W praktyce to oznacza, że gdy mamy ForEach, elementy dostępne będą dopiero po jakimś czasie, gdy bufor zapełni się.
  2. FullyBuffered – elementy będą dopiero dostępne gdy wszystkie wątki zostaną wykonane. W międzyczasie będą one buforować dane a na końcu bufor zostanie w całości zwrócony.
  3. NotBuffered – brak bufora. Elementy będą wyświetlane natychmiast gdy są tylko dostępne.
  4. Default – aktualnie jest to AutoBufferred.

Należy jednak pamiętać, że metoda WithMergeOptions to tylko wskazówka dla PLINQ – jeśli w czasie wykonania zapytania okaże się, że żądany typ scalania nie ma sensu to po prostu zostanie on zignorowany. Funkcja ForAll zatem używa NotBuffered ponieważ nic nie scala. Podobnie każda inna funkcja ma skojarzony ze sobą sposób scalania. Funkcja WithMergeOptions powinna być wykorzystana wyłącznie w scenariuszach, gdzie mamy pewność, że domyślne zachowanie nie jest najoptymalniejsze.

PLINQ (część II)–porównanie wydajności prostych zapytań. Wymuszenie PLINQ.

W ostatnim poście omówiłem podstawy PLINQ. Dzisiaj już czysta praktyka. Zacznijmy od prostego zapytania, które może zostać wykonane równolegle:

internal static class Sample
{
   public static void Main()
   {
       IEnumerable<int> numbers = Enumerable.Range(1, 10000000);

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.Where(i => i < 10000);

       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }
}

Powyższe, sekwencyjne wykonanie zapytania zajmuje 6500. Następnie spróbujmy użyć PLINQ:

internal static class Sample
{
   public static void Main()
   {
       IEnumerable<int> numbers = Enumerable.Range(1, 10000000);

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.AsParallel().Where(i => i < 10000);

       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }
}

Wynik to 5000 (na maszynie dwurdzeniowej). Wynik nie powala, ale zależy to oczywiście od liczby rdzeni oraz samego zapytania. Spróbujmy sprawdzić zapytanie, które w Select ma skomplikowaną operację. Wersja sekwencyjna:

internal static class Sample
{
   public static void Main()
   {
       IEnumerable<int> numbers = Enumerable.Range(1, 10000000);

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.Where(i => i < 10000).
           Select(TimeConsumingOperation);
      
       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }
   private static int TimeConsumingOperation(int number)
   {
       Thread.Sleep(10);
       return number;
   }
}

Wynik to 15 000. Wersja PLINQ:

internal static class Sample
{
   public static void Main()
   {
       IEnumerable<int> numbers = Enumerable.Range(1, 10000000);

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.AsParallel().
           Where(i => i < 10000).
           Select(TimeConsumingOperation);
      
       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }
   private static int TimeConsumingOperation(int number)
   {
       Thread.Sleep(10);
       return number;
   }
}

W PLINQ na tym samym CPU (2 rdzenie) uzyskano wynik 8500. Widać znaczącą poprawę. Nie ma co się dziwić ponieważ w LINQ TimeConsumingOperation blokuje dalsze przetwarzanie. Na dwóch rdzeniach, całość została podzielona i jest przetwarzana niezależnie. Pamiętajmy, że zyskujemy przez PLINQ najwięcej gdy pojedyncza operacja trwa jak najdłużej. Z tego względu, dla zwykłej filtracji danych nie zawsze PLINQ przynosi dobre efekty. Zawsze należy sprawdzać wydajność PLINQ przez umieszczeniem zapytania w kodzie produkcyjnym.

PLINQ zawsze analizuje zapytanie i sprawdza czy warto je wykonać równolegle. Oznacza to jednak, że gdy zapytanie, które najlepiej wykonać sekwencyjnie, będziemy próbować wykonać równolegle to wiąże to się z pewnym overhead. Sprawdźmy np. poniższy kod:

internal static class Sample
{
   public static void Main()
   {
       int[] numbers = Enumerable.Range(1, 10000000).ToArray();

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.Take(1);
      
       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }        
}

Prosty Take(1) nie ma sensu wykonywać w PLINQ – nie ma co tutaj zrównoleglić.  W czystym LINQ otrzymano wynik 2500, z kolei w PLINQ 6110:

internal static class Sample
{
   public static void Main()
   {
       int[] numbers = Enumerable.Range(1, 10000000).ToArray();

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.AsParallel().Take(1);
      
       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }        
}

Powyższe zapytanie mimo, że jest PLINQ, zostało wykonane sekwencyjnie. Wynik jest gorszy ponieważ najpierw trzeba było przeanalizować zapytanie aby podjąć tą decyzje. W PLINQ istnieje jednak sposób na zmuszenie wykonania kodu równolegle. Oczywiście dla powyższego przykładu przyniesie to skutki bardzo negatywne:

internal static class Sample
{
   public static void Main()
   {
       int[] numbers = Enumerable.Range(1, 10000000).ToArray();

       Stopwatch stopwatch = Stopwatch.StartNew();
       IEnumerable<int> subset = numbers.AsParallel().
           WithExecutionMode(ParallelExecutionMode.ForceParallelism).
           Take(1);
      
       stopwatch.Stop();
       Console.WriteLine("Time:{0}",stopwatch.ElapsedTicks);
   }        
}

Wynik to 9000! Funkcja WithExecutionMode przyjmuje jako parametr ParallelExecutionMode, który z kolei ma dwie wartości: Default i ForceParallelism.

Code Review: Timery

Załóżmy, że mamy timer, który co jakiś próbuje połączyć się np. z usługą:

internal static class Sample
{
   public static void Main()
   {
       using (Timer timer = new Timer(Run, null,0,1000))
       {
           Thread.Sleep(500000);
       }
   }
   private static void Run(Object state)
   {
       Console.WriteLine("Operacja, ktora moze potrwac czasami nawet kilka minut.");
   }
}

Powyższy kod w wielu sytuacjach jest poprawny. Należy jednak mieć na uwadze, że operacje takie jak połączenie z bazą danych czy usługą mogą potrwać bardzo długo. Z tego względu, może okazać się, że metoda Run będzie wywołana kilka razy w tym samym czasie. Łatwo przetestować taki scenariusz, dodając Thread.Sleep:

internal static class Sample
{
   public static void Main()
   {
       using (Timer timer = new Timer(Run, null,0,1000))
       {
           Thread.Sleep(500000);
       }
   }
   private static void Run(Object state)
   {
       Console.WriteLine("Przed");
       Thread.Sleep(3000);
       Console.WriteLine("Po");
   }
}

W powyższym przypadku, Run i tak będzie wywoływany co każdą sekundę, skutkując wykonywaniem jednoczesnym kilku Run. Taki przypadek nie jest thread-safe i czasami po prostu zachowanie jest niepożądane – po co łączyć się dwa razy do bazy danych aby odświeżyć jakąś informacje? Jeśli jeden wątek nie może tego zrobić, to nie ma sensu wykonywanie dokładnie tego  samego w drugim. Lepszym podejściem jest uruchomienie Run tylko raz i potem wywoływaniem w Run za każdym razem funkcji Change:

internal static class Sample
{
   private static Timer _timer;
   public static void Main()
   {
       using (_timer = new Timer(Run, null, 1000, Timeout.Infinite))
       {
           Thread.Sleep(500000);
       }
   }
   private static void Run(Object state)
   {
       Console.WriteLine("Przed");
       Thread.Sleep(3000);
       Console.WriteLine("Po");
       _timer.Change(1000, Timeout.Infinite);
   }
}

Pierwsza różnica to utworzenie instancji Timer – zamiast parametru period, teraz ustawiamy dueTime, czyli opóźnienie z jakim ma zostać uruchomiony Timer. Następnie po każdym wykonaniu kodu w Run, wywołujemy Change ponownie z dueTime równym 1000.

PLINQ – wprowadzenie

Chciałbym rozpocząć nowy cykl na blogu, tym razem o PLINQ.  Dzisiaj zaczniemy od podstaw czyli czym jest LINQ oraz kiedy z niego korzystać.

PLINQ to skrót od Parallel Linq czyli są to zapytania wykonywane równolegle. W dzisiejszym świecie, programiści starają się zrównoleglić co tylko jest możliwe. Samodzielne pisanie LINQ w sposób równoległy jest dość niewygodne i dlatego Microsoft wprowadził PLINQ. Należy oczywiście zawsze pamiętać, że próba zrównoleglenia operacji, które muszą po prostu zostać wykonane sekwencyjnie, zwykłe pogarsza wydajność.

Z tego wynika fakt, że możemy zrównoleglić wyłącznie zadania, które da się od siebie oddzielić i można je wykonywać niezależnie od siebie – tzn. zadanie A nie potrzebuje danych z zadania B. Taka sytuacja jest komfortowa i wtedy najwięcej zyskujemy z PLINQ. W przypadku gdy zadanie B musi czekać na wynik z A, należy unikać PLINQ i tą cześć wykonać w sposób całkowicie sekwencyjny. Kluczem do prawidłowej implementacji PLINQ jest zatem podział (partition) zapytania na części. PLINQ wewnętrznie dokonuje analizy zapytania i na podstawie tego, dokonuje wyboru najbardziej optymalnej strategii. Nie chcę omawiać szczegółowo wszystkich metod podziału ponieważ jest to implementacja wewnętrzna i może ona się z czasem zmienić. Myślę jednak, że warto chociaż ogólnikowo przedstawić jak to może wyglądać. Załóżmy, że mamy następujące zapytanie:

 (from employee in employees where CalculateBonus(employee) > 10 select employee.Salary).Sum();

Powyższe zapytanie jest dobre do zrównoleglenia jeśli np. klauzula CalculateBonus lub employee.Salary pochłaniają dużo czasu. W końcu PLINQ mógłby np. cześć pracowników przetwarzać na rdzeniu A a cześć na rdzeniu B a potem tylko na końcu złączyć rezultaty aby policzyć Sum. W jaki sposób  PLINQ rozdziela zadania? Zależy to od typu kolekcji. Wewnętrzna implementacja korzysta np. z Range Partitioning dla tablic oraz list. Jeśli do danego źródła danych można dostać się za pomocą indeksów (T[]), wtedy Range Partitioning rozdziela równomiernie pomiędzy różne wątki np. elementy od 0 do 5 są przetwarzane przez ThreadA, od 6 do 10 przez Thread B itp. Taki komfort niestety można mieć dla kolekcji z góry określonych czyli takich, które mają właściwość Length oraz można do nich dostać się przez indeks.

Jeśli długość nie jest znana wtedy należy skorzystać z bardziej wyrafinowanego algorytmu np. Chunk Partitioning. Najpierw przydzielana jest stała liczba elementów do każdego wątku. Po pierwszej iteracji, jeśli nie wszystkie elementy zostały przetworzone, liczba jest podwajana. Szczegóły wewnętrzne zmieniają się, więc nie wiadomo dokładnie czy ThreadA będzie miał najpierw jeden elementy a potem 2,4,8,16 czy skok jest np. poczwórny. Jedynie co trzeba wiedzieć to fakt, że dla IEnumerable nie można w łatwy sposób dokonać podziału zadań i trzeba wykonywać to heurystycznie.

Z tego co mi wiadomo, PLINQ implementuje jeszcze dwie strategie: Striped Partitioning  oraz Hash Partitioning. Pierwsza z nich jest wykorzystywana dla SkipWhile oraz TakeWhile, druga z kolei dla GroupBy

PLINQ jest dość sprytny i jeśli uzna, że dane zapytanie nie powinno zostać wykonane w sposób równoległy to zostanie one przetworzone w klasyczny sposób. Można oczywiście takie zachowanie zmienić (tzn. zmusić .NET do wykonywania zawsze zapytania równolegle), ale zwykle jest to pożądana decyzja. Najważniejsza będzie dla nas klasa ParallelEnumerable, która dostarcza implementacje oraz rozszerzenia metod dla PLINQ.

W celu wykonania jakiegoś zapytania równolegle, zawsze należy wywołać funkcję AsParallel. Domyślnie wszystkie zapytania to czysty LINQ. Na przykład rozważmy poniższy kod:

var source = Enumerable.Range(1, 10000);
var evenNums = from num in source
               where Compute(num) > 0
               select num;

Wersja PLINQ będzie wyglądała następująco:

var source = Enumerable.Range(1, 10000);
var evenNums = from num in source.AsParallel()
               where Compute(num) > 0
               select num;

Domyślnie PLINQ wykonuje zapytania na wszystkich dostępnych CPU\Cores. Czyli jeśli mamy dwa rdzenie, tylko dwa wątki będą tworzone. Można takie zachowanie przeładować:

 (from employee in employees.AsParallel().WithDegreeOfParallelism(4) where CalculateBonus(employee) > 10 select employee.Salary).Sum();

Funkcja WithDegreeOfParallelism określa ile wątków maksymalnie może zostać stworzonych.

Jak wspomniałem, bardzo ważne jest rozpoznanie sytuacji w których PLINQ jest dobrym rozwiązaniem.

Jeśli mamy jakaś funkcję, której wykonanie zajmuje dużo czasu to jest to pierwszy sygnał do zrównoleglenia czegoś. W pierwszym zapytaniu (na początku wpisu) była użyta funkcja CalculateBonus. Jeśli jest ona czasochłonna to naturalne jest, ze dobrze byłoby ją wykonywać na więcej niż jednym rdzeniu. Załóżmy, że mamy 4 pracowników i wykonanie CalculateBonus zajmuje 1 sekundę. Wtedy poniższe zapytanie zajmie >4s:

 (from employee in employees where CalculateBonus(employee) > 10 select employee.Salary).Sum();

Wersja PLINQ na procesorze z 4 rdzeniami zajmie trochę więcej niż jedna sekunda:

 (from employee in employees.AsParallel() where CalculateBonus(employee) > 10 select employee.Salary).Sum();

W przypadku jednak, gdy zapytanie jest proste wtedy nie warto korzystać z PLINQ. Zmodyfikujmy trochę powyższy przykład:

 (from employee in employees where employee.Bonus > 10 select employee.Salary).Sum();

Jeśli właściwości Bonus, Salary nie są zbyt czasochłonne a zwracają jedynie czystą wartość, wtedy nie uzyskamy już takich dobrych rezultatów. Oczywiście dla tablic (gdzie rozmiar jest z góry znany) prawdopodobnie można zaobserwować małą optymalizację ale nie jest to przypadek wpisujący się w dobre zastosowanie PLINQ.

Następna kwestia to liczba dostępnych rdzeni. Dzisiaj na szczęście chyba wszystkie komputery mają już po kilka rdzeni. Jeśli jednak, zdarzyłoby się, że przyjdzie nam uruchamiać nasz kod na komputerach wyłącznie z jednym procesorem\rdzeniem wtedy PLINQ to po prostu zmarnowanie czasu.

Najlepsze rezultaty osiąga się, gdy kolejność elementów nie ma znaczenia. Oczywiście różne elementy są przetwarzane na różnych rdzeniach zatem kolejność domyślnie może zostać niezachowana. Istnieje funkcja AsOrdered (o niej w przyszłych wpisach) ale powoduje to utratę wydajności, podobnie jak OrderBy. Podobnie, użycie zwykłego foreach również powoduje zakłócenie wielowątkowości ponieważ co prawda zapytanie samo w sobie, zostanie wykonane równolegle, ale już foreach będzie musiał czekać na całość kolekcji.

Wydajność: spinning a synchronizacja kernel

Wielokrotnie pisałem o różnych metodach definiowania sekcji krytycznej w kodzie. Do dyspozycji mamy spinning, który nie usypia wątku. Tak naprawdę dla systemu Windows, taki wątek wciąż istnieje i wykonuje pracę – innymi słowy marnuje  czas CPU. Jeśli chcemy zatrzymać wątek na krótko wtedy jest to bardzo wydajne ponieważ nie musimy zmieniać kontekstu (BARDZO kosztowne), korzystać z funkcji Windows (spinning to czysta metoda .NET) czy planować (scheduling) następnych wątków. Jeśli mechanizmy takie jak spinning czy semafor nie są jasne, koniecznie zachęcam do przeczytania poprzednich wpisów. Dzisiaj wyłącznie zaprezentuję pewny test, który pokaże różnice w liczbach – część teoretyczna została już omówiona kiedyś na tym blogu.

Test:

internal static class Sample
{
   public static void Main()
   {
       Task.Factory.StartNew(Run);
       Console.ReadLine();
   }
   private static void Run()
   {
       int value = 0;
       const Int32 iterations = 10000000;

       // bez synchronizacji
       Stopwatch sw = Stopwatch.StartNew();
       for (int i = 0; i < iterations; i++)
           value++;

       Console.WriteLine("Bez synchronizacji:{0}", sw.ElapsedMilliseconds);

       // spinlock
       sw.Restart();
       value = 0;
       SpinLock spinLock = new SpinLock();
       for (int i = 0; i < iterations; i++)
       {
           bool lockTaken = false;
           try
           {
               spinLock.Enter(ref lockTaken);
               value++;
           }
           finally
           {
               if (lockTaken)
                   spinLock.Exit();
           }
       }
       Console.WriteLine("Synchronizacja spinlock:{0}", sw.ElapsedMilliseconds);

       // semafor (kernel)
       sw.Restart();
       value = 0;
       Semaphore semaphore=new Semaphore(1,1);
       for (int i = 0; i < iterations; i++)
       {
           semaphore.WaitOne();
               value++;
           semaphore.Release();
       }
       Console.WriteLine("Synchronizacja lock: {0}", sw.ElapsedMilliseconds);
   }
}

Wynik to:

  1. Bez synchronizacji: 35
  2. Spinning: 1586
  3. Wywołanie funkcji systemowej: 21604

Spinning jest dużo szybszy gdy nie ma konfliktów. W powyższym przykładzie wyłącznie jeden wątek próbuje w tym samym czasie wejść do sekcji krytycznej. Oczywiście w praktyce w takiej sytuacji nie korzystalibyśmy z synchronizacji. Celem wpisu jest jednak pokazanie, że dla krótkotrwałych blokad, szybszy jest spinning. Semaphore to funkcja systemowa (kernel) i  nawet same wywołanie jest wolniejsze. Ponadto powoduje ona uśpienie wątku, zmianę kontekstu i ponowne planowanie. W jednym z następnych wpisów, przyjrzymy się lock, który jest tak naprawdę dość skomplikowany.

Code Review: Asynchroniczne strumienie danych

Operacje na plikach mogą być bardzo czasochłonne. Z tego względu, dobrym zwyczajem jest umieszczenie kodu w osobnym wątku. Często popełnianym błędem jest samodzielne tworzenie wątku:

internal static class Sample
{        
   public static void Main()
   {
       var reader = new FileStream(@"c:\setup\1.txt", FileMode.Open);
       Task.Factory.StartNew(()=>ReadAsync(reader));
   }
   private static void ReadAsync(Stream reader)
   {
       byte[]buffer=new byte[100];
       reader.Read(buffer, 0, 100);
       reader.Close();
   }
}

Pomijam już kwestie obsługi błędów w powyższym kodzie oraz zamykanie strumienia. Skupmy się wyłącznie na wielowątkowości.  Dlaczego powyższy kod jest tak bardzo zły? Podobnym błędem byłoby stworzenie własnej delegaty oraz wywołanie na niej BeginInvoke. Prześledźmy jak wygląda synchroniczne wywołanie operacji na strumieniach w Windows:

  1. Użytkownik wywołuje metodę FileStream.Read.
  2. FileStream to tak naprawdę wrapper na zasoby niezarządzane. Pod spodem kryje się zwykła funkcja Win32 (ReadFile).
  3. ReadFile (Win32) stworzy strukturę IRP (Input\Output Request Package). Zawiera ona informacje o tym co chcemy przeczytać (offset, length itp.).
  4. Następnie Kernel przekazuje IRP do danego urządzenia. Każde urządzenie ma kolejkę IRP. Po jakimś czasie, przekazany IRP zostanie zdjęty z kolejki a dane zostaną przeczytane. W tym momencie, wywołanie synchroniczne zostanie uśpione przez Windows. Jest to bardzo korzystne, ponieważ w momencie umieszczenia IRP, Windows wie, że wątek nie ma nic do roboty i go usypia. Niestety wciąż marnujemy zasoby na call stack, ponieważ w końcu taki wątek musi zostać potem obudzony.

Co jest więc złego w powyższym kodzie? Tworzymy nowy wątek ale tak naprawdę będzie on uśpiony i będzie pochłaniał zasoby. Należy pamiętać, że czasami urządzenie może być zajęte i przeczytanie danych zajmie trochę czasu. Szczególnie gdy mamy do czynienia ze strumieniem sieciowym (NetworkStream) albo danymi na np. DVD. Strumienie są bardzo generycznym pojęciem.

Prawidłowa implementacja asynchronicznych operacji zawsze powinna wykorzystywać dostępne metody typu BeginRead. Przyjrzyjmy się jednak kolejnemu rozwiązaniu, które wciąż jest niepoprawne:

internal static class Sample
{
   private static FileStream _reader;

   public static void Main()
   {
       _reader = new FileStream(@"c:\setup\1.txt", FileMode.Open);
       byte[] buffer = new byte[100];
       _reader.BeginRead(buffer, 0, 100, Callback,buffer);
       Console.ReadKey();
   }

   private static void Callback(IAsyncResult ar)
   {
       int result=_reader.EndRead(ar);
       byte[] buffer = (byte[]) ar.AsyncState;
       _reader.Dispose();
       _reader = null;
   }       
}

Dlaczego wciąż jest to złe? Musimy koniecznie przekazać flagę FileOptions.Asynchronous w momencie gdy jest tworzony strumień:

internal static class Sample
{
   private static FileStream _reader;

   public static void Main()
   {
       _reader = new FileStream(@"c:\setup\1.txt", FileMode.Open,FileOptions.Asynchronous);
       byte[] buffer = new byte[100];
       _reader.BeginRead(buffer, 0, 100, Callback,buffer);
       Console.ReadKey();
   }

   private static void Callback(IAsyncResult ar)
   {
       int result=_reader.EndRead(ar);
       byte[] buffer = (byte[]) ar.AsyncState;
       _reader.Dispose();
       _reader = null;
   }       
}

Należy zawsze przekazać powyższą flagę ponieważ bez niej, .NET symuluje tylko asynchroniczne zachowanie. W rzeczywistości zostanie stworzony wątek, który będzie marnował zasoby aż do momentu przetworzenia zapytania. Z powyższą flagą, w momencie wysłania IRP, wątek kończy swoje działanie. Następnie, gdy urządzenie przeczyta dane, Kernel\sterownik urządzenia wywoła przekazany callback. Żaden dodatkowy wątek nie musi zostać tworzony – dla strumieni istnieje specjalna infrastruktura zaimplementowana przez Windows i sterownik urządzenia.

CPU, caching a wydajność.

W celu optymalizacji każdy procesor posiada swój cache. Temat jest dosyć rozbudowany bo zwykłe cache jest podzielony na kilka warstw aby przyśpieszyć dostęp do niego. W dzisiejszym w poście chciałbym wprowadzić pojęcie cache line co jest tak naprawdę po prostu wpisem w pamięci podręcznej. Jeśli procesor czyta jakieś dane to umieszcza je w cache line. Cache line to nie tylko jedna, pojedyncza zmienna a na przykład 64 bajty. Jeśli zatem czytamy pojedynczą zmienną Int32,  w rzeczywistości procesor przeczyta również zmienną przylegającą do Int32 (tak aby razem przeczytać 64).

Takie zachowanie bardzo często ma pozytywne skutki – bardzo prawdopodobne, że w praktyce będziemy chcieli skorzystać z dwóch, przylegających do siebie zmiennych jednocześnie. Wadą, mającą ogromny wpływ na wydajność, jest sytuacja kiedy wątek 1 chce przeczytać pierwszą zmienną a wątek 2, drugą.  Niestety gdy dwa rdzenie chcą mieć dostęp do tych samych danych, wtedy CPU musi więcej wykonać roboty aby przekazać tą wartość w bezpieczny sposób. Chcemy zatem unikać scenariuszy, gdzie dwie zmienne z tego samego cache line, będą potrzebne dwóm różnym rdzeniom procesora (dla operacji write). Rozważmy następującą klasę:

class CacheLineSample
{
   public Int32 Integer1;
   public Int32 Integer2;
}

Mój CPU posiada L1 równy 128KB a CacheLine 64 KB. Oznacza to, że oba pola powyższej klasy mogą znaleźć się w tym samym cache. Napiszmy teraz program, który tworzy dwa wątki (potencjalnie mogą być wykonywane na różnych rdzeniach) i każdy z nich będzie zwiększał wartość jednego z pól:

class Program
{
   private static CacheLineSample _sample=new CacheLineSample();
   static void Main(string[] args)
   {
       var stopwatch=Stopwatch.StartNew();
       Task t1 = Task.Factory.StartNew(()=>TestField(0));
       Task t2 = Task.Factory.StartNew(() => TestField(1));
       Task.WaitAll(t1, t2);
       stopwatch.Stop();
       Console.WriteLine(stopwatch.ElapsedTicks);
   }
   private static void TestField(int index)
   {
       for (int i = 0; i < 100000000; i++)
       {
           if (index == 0)
               _sample.Integer1++;
           else
               _sample.Integer2++;
       }
   }
}

Następnie odpalmy ten sam program, ale modyfikując naszą klasę następująco:

[StructLayout(LayoutKind.Explicit)]
class CacheLineSample
{
   [FieldOffset(0)]
   public Int32 Integer1;
   [FieldOffset(64)]
   public Int32 Integer2;
}

Zauważymy, że w zależności od kompilatora, procesora, różnica może być 2 lub nawet 5 krotna! W momencie gdy oba rdzenie dokonają cache wtedy aktualizacja zmiennych jest bardzo czasochłonna ponieważ należy o tym powiadomić drugi rdzeń. Powyższy mechanizm nazywa się “false sharing” ponieważ z punktu widzenia programisty, żadne zmienne nie są dzielone (czego należy unikać w programowaniu wielowątkowym) ale z punktu widzenia CPU są one niestety współdzielone. Każda operacja zapisu powoduje, że cache jest nieważny oraz czas jest marnowany na odpowiednią synchronizację. Inny przykład to pętla po tablicy dwuwymiarowej:

for (int row = 0; row < N; row++) 
{
    for (int column = 0; column < N; column++) 
    {
        array[row, column] = GetRandomValue();
    }
}

for (int column = 0; column < N; column++) 
{
    for (int row = 0; row < N; row++) 
    {
        array[row, column] = GetRandomValue();
    }
}

Pierwsza wersja może okazać się znacząco szybsza niż druga. Dlaczego? W pamięci tablicy dwuwymiarowej, dane ułożone są wierszami. Oznacza to, że w pierwszym rozwiązaniu czytając jedną wartość, umieszczamy w cache dwie wartości i nie musimy potem w następnej iteracji robić tego samego. Druga pętla z kolei, zawsze będzie aktualizować cache i wykona 2x więcej operacji na cache niż pierwsze rozwiązanie.

Należy również uważać na tablice jednowymiarowe. Rozmiar w nich jest przechowywany na początku. Każda próba dostępu do tablicy również powoduje użycie Length w celu sprawdzenia czy nie wykracza to poza zasięg. Z tego względu, gdy jeden wątek modyfikuje początek tablicy a drugi np. koniec, to pomimo, że operują na odległych od siebie miejscach w pamięci, wciąż ma miejsce false sharing.

Opisane problemy są bardzo trudne w identyfikacji.  Jeśli jednak wydajność jest krytyczna, warto monitorować (za pomocą performance counter) liczbę tzw. L1 misses, określającą ile razy trzeba było przeładować pamięć podręczną CPU.

Serializacja obiektów typu singleton\NULL object

Czasami zachodzi potrzeba serializacji obiektów, które powinny mieć  maksymalnie jedną kopie w tym samym AppDomain. Klasycznym przykładem jest System.DBNull, którego deklaracja wygląda następująco:

[SerializableAttribute]
[ComVisibleAttribute(true)]
public sealed class DBNull : ISerializable, 
    IConvertible

Załóżmy, że mamy klasę, w której jedna z właściwości ma wartość System.DBNull. Oczywiście podczas serializacji i potem deserializacji nie chcemy tworzyć nowej instancji DBNull – jest to sprzeczne z założeniem wzorca singleton albo special case pattern.

Na szczęście  istnieje sposób w .NET na prawidłową serializację singletów ale trzeba najpierw zaimplementować interfejs ISerializable. Zacznijmy jednak od przykładu, który pokaże nam jeszcze raz problem:

[Serializable]
public sealed class Singleton
{
   private static readonly Singleton _instance = new Singleton();
   private Singleton() { }

   public static Singleton Instance { get { return _instance; } }  
}    
class Program
{
   static void Main(string[] args)
   {
       BinaryFormatter serializer = new BinaryFormatter();
       
       Singleton singleton1 = Singleton.Instance;

       using (MemoryStream stream = new MemoryStream())
       {
           serializer.Serialize(stream,singleton1);
           stream.Position = 0;
           Singleton singleton2 = (Singleton)serializer.Deserialize(stream);
           Debug.Assert(ReferenceEquals(singleton1,singleton2));
       }
   }
}

Powyższy assert zakończy się niepowodzeniem ponieważ po serializacji stworzony zostanie nowy obiekt. W celu rozwiązania problemu najpierw trzeba zaimplementować interfejs ISerializable, który umożliwia własną serializację:

[Serializable]
public sealed class Singleton:ISerializable
{
   private static readonly Singleton _instance = new Singleton();
   private Singleton() { }

   public static Singleton Instance { get { return _instance; } }

   public void GetObjectData(SerializationInfo info, StreamingContext context)
   {
       info.SetType(typeof(SingletonSerializationHelper));
   }
}

W metodzie GetObjectData możemy umieścić kod odpowiedzialny za serializację. W naszym przypadku chcemy wywołać tylko metodę SetType, która deleguję serialziację do innej klasy. Innymi słowy, Serializer zamiast zapisywać obiekt Singleton, przejdzie do SingletonSerializationHelper. Przyjrzyjmy się teraz SingletonSerializationHelper:

[Serializable]
public sealed class SingletonSerializationHelper:IObjectReference
{
   public object GetRealObject(StreamingContext context)
   {
       return Singleton.Instance;
   }
}

Interfejs IObjectRefernece oznacza, że obiekt, który jest właśnie zapisywany, stanowi tak naprawdę referencję na inny obiekt. Po uruchomieniu przykładu, asercja zakończy się sukcesem. Microsoft jednak poleca aby wszelkie implementacje ISerializable wyglądały następująco:

[Serializable]
public sealed class Singleton:ISerializable
{
   private static readonly Singleton _instance = new Singleton();
   private Singleton() { }

   public static Singleton Instance { get { return _instance; } }

   [SecurityPermissionAttribute(SecurityAction.Demand, SerializationFormatter = true)]
   public void GetObjectData(SerializationInfo info, StreamingContext context)
   {
       info.SetType(typeof(SingletonSerializationHelper));
   }
} 

W powyższym kodzie dodałem tylko atrybut SecurityPermissionAttribute. Oczywiście serializacja wyłącznie singletona nie ma żadnego sensu ponieważ po co zapisywać coś, co jest zawsze takie same? W praktyce jednak często pola innych obiektów zawierają referencję np. do wspomnianego System.DBNull. Jeśli implementujemy własny Special Case pattern, warto rozważyć również interfejs ISerializable jeśli serializacja wchodzi w grę.