Kolejna część artykułu o GC – tym razem o zasobach niezarządzanych. Zapraszam do lektury:
http://msdn.microsoft.com/pl-pl/library/garbage-collector-cz-3-zasoby-niezarzadzane
Kolejna część artykułu o GC – tym razem o zasobach niezarządzanych. Zapraszam do lektury:
http://msdn.microsoft.com/pl-pl/library/garbage-collector-cz-3-zasoby-niezarzadzane
W ostatnim wpisie pokazałem dostępne bloki buforujące. Dzisiaj zajmiemy się prostym przykładem, który jest bardziej praktyczny od tego przedstawionego w poprzednim poście. Załóżmy, że piszemy system, który składa się z kilku wątków przetwarzających. Każdy z nich pełni rolę konsumenta – przetwarza dane. Chcemy to tak zoptymalizować, aby nowe dane były wysyłane wyłącznie do jak najmniej zajętych węzłów. Oczywiście temat jest bardziej skomplikowany niż może wydawać się, ale dzisiaj pokażemy jak można do tego celu wykorzystać BufferBlock.
Producentem oczywiście będzie BufferBlock. Z kolei konsumenci to dowolne akcje – może to być TransformBlock czy ActionBlock. W tym przykładzie chcemy wyświetlić wyłącznie dane więc ActionBlock jest wystarczający.
Pierwsza wersja algorytmu mogłaby wyglądać następująco:
class Program { private static void Main(string[] args) { BufferBlock<int> bufferBlock = new BufferBlock<int>(); bufferBlock.LinkTo(CreateActionBlock("A")); bufferBlock.LinkTo(CreateActionBlock("B")); bufferBlock.LinkTo(CreateActionBlock("C")); for (int i = 0; i < 10; i++) bufferBlock.SendAsync(i); Console.ReadLine(); } private static ActionBlock<int> CreateActionBlock(string actionName) { var actionBlock = new ActionBlock<int>(i => Consume(actionName, i)); return actionBlock; } private static void Consume(string actionName,int i) { Console.WriteLine("{0}:{1}", actionName, i); Thread.Sleep(1000); // symulacja przetwarzania danych } }
Wiemy, że BufferBlock dostarczy dane tylko do pierwszego bloku, który je zaakceptuje (w przeciwieństwie do BroadcastBlock). Jeśli zatem ActionA obsłuży dane, nie są one potem duplikowane w ActionB i ActionC (patrz poprzedni post jeśli jest to niezrozumiałe). W tym przypadku jest to dokładnie zachowanie jakiego oczekujemy. Jeśli ActionA nie może obsłużyć zapytania (bo jest zajęty przetwarzaniem innego), wtedy BufferBlock spróbuje dostarczyć dane do kolejnego bloku ActionB. Po uruchomieniu powyższego kodu dostaniemy jednak następujące wartości:
Co to oznacza? Wszystkie zapytania zostały skierowane do węzła A. Oczywiście nie o to nam chodziło. Przetworzenie każdej danej zajmuje >1000ms (patrz funkcja Sleep) zatem spodziewalibyśmy się, ze na wyjściu ujrzymy coś w stylu A,B,C,A,B,C. Spowodowane to jest domyślnymi ustawieniami. Możemy ustawić właściwość BoundedCapacity określającą co ma się stać z wiadomościami, których nie można natychmiast obsłużyć. Domyślna wartość to –1, co oznacza, że wszystkie wiadomości będą kolejkowane. My zmienimy oczywiście na 1:
private static ActionBlock<int> CreateActionBlock(string actionName) { var options = new ExecutionDataflowBlockOptions {BoundedCapacity = 1}; var actionBlock = new ActionBlock<int>(i => Consume(actionName, i),options); return actionBlock; }
Teraz na wyjściu otrzymamy:
W następnym poście o kolejnej grupie bloków oraz o ich chciwości.
Dzisiaj wracamy do tematu TPL Dataflows. W ostatniej części zajęliśmy się m.in. BroadcastBlock, który jest jednym z bloków buforujących. Dla przypomnienia przykład:
class Program { private static void Main(string[] args) { BroadcastBlock<double> broadcastBlock=new BroadcastBlock<double>(i=>i); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.Post(1); broadcastBlock.Post(0); broadcastBlock.Post(3.14); Console.ReadLine(); } private static TransformBlock<double, double> CreateTransformBlock() { var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i)); var actionBlock = new ActionBlock<double>(i => Console.WriteLine(i)); sinBlock.LinkTo(actionBlock); return sinBlock; } }
Normalnie jeśli blok A wysyła dane do bloku B, to po zaakceptowaniu ich przez blok B, są one usuwane z bloku A. BroadcastBlock ma jednak bufor, który umożliwia przechowanie tych informacji i wysłanie ich od kolejnych elementów. W DataFlows do dyspozycji mamy jeszcze dwa inne bloki buforujące.
Kolejnym blokiem jest zatem BufferBlock. W przeciwieństwie do BroadcastBlock, przechowuje on również dane historyczne a nie tylko najnowsze. Drugą różnicą w stosunku do BroadcastBlock jest fakt, że gdy target zaakceptuje wiadomość, nie będą one już rozsyłane do kolejnych bloków. Pierwszą różnicę pokazuje następujący kod:
BufferBlock<double> bufferBlock = new BufferBlock<double>(); BroadcastBlock<double> broadcastBlock = new BroadcastBlock<double>(i=>i); bufferBlock.Post(0); bufferBlock.Post(5); broadcastBlock.Post(0); broadcastBlock.Post(5); Thread.Sleep(1000); Console.WriteLine("Buffer block:"); Console.WriteLine(bufferBlock.Receive()); Console.WriteLine(bufferBlock.Receive()); Console.WriteLine("Broadcast block:"); Console.WriteLine(broadcastBlock.Receive()); Console.WriteLine(broadcastBlock.Receive());
BroadtcastBlock zwróci dwa razy “5” ponieważ przechowuje on wyłącznie ostatnie wartości. BufferBlock z kolei ma pełną historie danych. Druga różnica polega na tym, że gdy jakiś blok zaakceptuje dane, wtedy BufferBlocked nie wysyła ich do kolejnych bloków:
BufferBlock<double> bufferBlock = new BufferBlock<double>(); BroadcastBlock<double> broadcastBlock = new BroadcastBlock<double>(i=>i); ActionBlock<double> actionBlock1=new ActionBlock<double>(i=>Console.WriteLine("A:{0}",i)); ActionBlock<double> actionBlock2=new ActionBlock<double>(i=>Console.WriteLine("B:{0}",i)); bufferBlock.LinkTo(actionBlock1); bufferBlock.LinkTo(actionBlock2); broadcastBlock.LinkTo(actionBlock1); broadcastBlock.LinkTo(actionBlock2); bufferBlock.Post(0); bufferBlock.Post(5); broadcastBlock.Post(0); broadcastBlock.Post(5);
Broadcast wyśle wiadomości zarówno do actionBlock1 jak i do actionBlock2. Z kolei bufferBlock wyłącznie do actionblock1 – wysyłane one są do pierwszego bloku, który zaakceptuje dane.
Ostatnim blokiem jest WriteOnceBlock. Zachowuje się on jak BroadcastBlock, z tym, że można zapisać do niego tylko raz jakąś informacje (analogiczne do słowa kluczowego readonly w c#). Przykład (MSDN):
// Create a WriteOnceBlock<string> object. var writeOnceBlock = new WriteOnceBlock<string>(null); // Post several messages to the block in parallel. The first // message to be received is written to the block. // Subsequent messages are discarded. Parallel.Invoke( () => writeOnceBlock.Post("Message 1"), () => writeOnceBlock.Post("Message 2"), () => writeOnceBlock.Post("Message 3")); // Receive the message from the block. Console.WriteLine(writeOnceBlock.Receive()); /* Sample output: Message 2 */
W powyższym przypadku Message2 była zapisana jako pierwsza i kolejne “posty” są po prostu ignorowane. W następnych wpisach postaram się podać jakieś bardziej praktyczne przykłady zastosowania BufferBlock i WriteOnceBlock.
ReaderWriterLockSlim jest klasą, która ma zastąpić ReadWriterLock, znanego ze starych wersji framework’a. Ale zacznijmy od początku…
Dlaczego zwykły lock nie zawsze jest wystarczający? ReaderWriterLockSlim pracuje w trzech trybach:
Innymi słowy ReaderWriterLockSlim może nadać dostęp wielu wątkom jednocześnie, pod warunkiem, że nie modyfikują one danych – w końcu jeśli tylko czytają jest to bezpieczne. Jeśli tylko jakiś wątek będzie chciał zmodyfikować dane, wtedy konieczne jest nałożenie mutual lock i wyłączenie wszystkich shared\upgrade.
Rozważmy przykład z MSDN:
public class SynchronizedCache { private ReaderWriterLockSlim cacheLock = new ReaderWriterLockSlim(); private Dictionary<int, string> innerCache = new Dictionary<int, string>(); public string Read(int key) { cacheLock.EnterReadLock(); try { return innerCache[key]; } finally { cacheLock.ExitReadLock(); } } public void Add(int key, string value) { cacheLock.EnterWriteLock(); try { innerCache.Add(key, value); } finally { cacheLock.ExitWriteLock(); } } }
Funkcja Read może zostać wywołana jednocześnie z wielu wątków. Z kolei, jeśli ktoś będzie chciał wywołać Add, wtedy Read nie jest możliwe. Proszę zauważyć jak to znacząco może zwiększyć wydajność. Klasycznym rozwiązaniem byłoby umieszczenie Read w lock. Niestety w takiej sytuacji, tylko jeden wątek mógłby czytać dane, jeśli nawet nie są one aktualnie modyfikowane.
Ktoś może zadać pytanie, po co nam jakieś blokady gdy chcemy czytać tylko dane? Czy odczyt nie jest zawsze bezpieczny? Odczyt jest bezpieczny jeśli akurat dane nie są modyfikowane. Co jeśli np. Add dodał klucz do słownika a nie ustawił jeszcze danych? W końcu nie są to operacje atomowe. W takim przypadku, brak jakiejkolwiek blokady skutkowałby odczytem nieprawidłowych (częściowych) danych.
Istnieje jeszcze jeden scenariusz. Po odczycie danych, może okazać się, że niezbędna jest ich modyfikacja. W takim przypadku używamy trzeciego trybu – upgrade. Załóżmy, że chcemy zaimplementować funkcje AddOrUpdate. Nie chcemy przy typ blokować całego kodu. Możemy w końcu najpierw odczytać dane i jeśli wartość jest taka sama jak parametr wejściowy wtedy w ogóle nic nie musimy robić. Bardzo to zoptymalizuje wywołania, które nie zmieniają stanu:
public AddOrUpdateStatus AddOrUpdate(int key, string value) { cacheLock.EnterUpgradeableReadLock(); try { string result = null; if (innerCache.TryGetValue(key, out result)) { if (result == value) { return AddOrUpdateStatus.Unchanged; } else { cacheLock.EnterWriteLock(); try { innerCache[key] = value; } finally { cacheLock.ExitWriteLock(); } return AddOrUpdateStatus.Updated; } } else { cacheLock.EnterWriteLock(); try { innerCache.Add(key, value); } finally { cacheLock.ExitWriteLock(); } return AddOrUpdateStatus.Added; } } finally { cacheLock.ExitUpgradeableReadLock(); } }
EnterUpgradeableReadLock wchodzi w tryb Upgradeable. W przypadku takiej blokady, żaden z mutual lock nie może być nałożony. Dozwolone jednak są reader’y, które aktualnie są już nałożone – nowe nie mogą mieć już miejsca. Następnie po przeczytaniu stanu można zdecydować, czy chcemy uzyskać dostęp wyłączony czy nie. Jeśli tak, po prostu wywołujemy EnterWriteLock. Proszę zauważyć, że upgradeable lock nie służy do modyfikacji danych! Wyłącznie bezpieczne w nim jest przeczytanie stanu i zdecydowanie następnie czy chcemy przełączyć się z powrotem do read czy jednak do writer. Wynika z tego, że powinniśmy jak najszybciej podjąć taką decyzje – upgradeable tym różni się od read, że nie dopuszcza nowych reader’ow.
Podejrzewam, ze tryb upgradeable wprowadził trochę zamieszania. W następnym wpisie wyjaśnię różnice między starym ReadWriterLock a nowym i przy okazji stanie się jasne, dlaczego ten tryb jest konieczny.
Klasa Monitor to chyba najpopularniejszy, najłatwiejszy i często najlepszy sposób synchronizacji danych w .NET. Większość programistów używa słowa kluczowego lock zamiast bezpośrednio Monitor.Enter. W większości przypadków jest to poprawne i zdecydowanie najbardziej przejrzyste. Dzisiaj chciałbym przyjrzeć się kilku sposobom konstrukcji Monitor.Enter\MonitorExit. Pierwszy, zdecydowanie najgorszy to:
Monitor.Enter(_sync); // sekcja krytyczna tutaj Monitor.Exit(_sync);
W powyższym kodzie brakuje obsługi błędów. Kolejnym sposobem jest:
Monitor.Enter(_sync); try { //sekcja krytyczna tutaj } finally { Monitor.Exit(_sync); }
Co w przedstawionym kodzie jest nie złego? Obsługujemy wyjątki – to jest na plus. Niestety, co jeśli jakiś wyjątek asynchroniczny (opisany kilka postów wcześniej ThreadAbortException) będzie miał miejsce między try a Monitor.Enter? Nigdy nie wyjedziemy do bloku try-catch a co tym idzie, nigdy nie zostanie wywołany Monitor.Exit. Spowoduje to na końcu deadlock ponieważ blokada nigdy nie zostanie zwolniona.
Przed C# 4.0, lock generował dokładnie taką konstrukcję. Skompilujemy następujący lock w .NET 3.5:
lock(_sync) { // sekcja krytyczna tutaj }
Na wyjściu będziemy mieli:
private static void Main(string[] args) { object CS$2$0000; Monitor.Enter(CS$2$0000 = _sync); Label_000E: try { goto Label_001A; } finally { Label_0012: Monitor.Exit(CS$2$0000); } Label_001A: return; }
W rzeczywistości CLR rozpoznane powyższy wzorzec i od kilku wersji wyjątki asynchroniczne nie będą miały miejsca między Enter a try. Microsoft miał świadomość, że wiele osób polega na takim wzorcu (lub na słowie lock) i sztucznie zapobiega jakimkolwiek instrukcjom między Enter a try.
Ktoś może zapytać, dlaczego nie umieścić Enter w try-catch? Spróbujmy:
try { Monitor.Enter(_sync); // sekcja krytyczna tutaj } finally { Monitor.Exit(_sync); }
Co prawda, nie ma możliwości, że Monitor.Exit nie zostanie wywołany. Jeśli wyjątek asynchroniczny będzie miał miejsce zaraz po Monitor.Enter i tak zostanie wywołana klauzula finally. Kod jednak jest nieodporny na inny scenariusz.Co jeśli wyjątek spowoduje funkcja Monitor.Enter? Wtedy wejdziemy do finally, spróbujemy wywołać Exit na blokadzie, która nie została uruchomiona, co spowoduje kolejny wyjątek.
Z tego względu lepszym rozwiązaniem jest:
bool lockTaken = false; try { Monitor.Enter(_sync, ref lockTaken); // ... } finally { if (lockTaken) Monitor.Exit(_sync); }
Jeśli skompilujemy w C# 4.0 konstrukcje ze słowem kluczowym lock dostaniemy właśnie powyższy kod:
lock(_sync) { // sekcja krytyczna tutaj } Reflector: private static unsafe void Main(string[] args) { bool <>s__LockTaken0; object CS$2$0000; bool CS$4$0001; <>s__LockTaken0 = 0; Label_0003: try { Monitor.Enter(CS$2$0000 = _sync, &<>s__LockTaken0); goto Label_0026; } finally { Label_0016: if ((<>s__LockTaken0 == 0) != null) { goto Label_0025; } Monitor.Exit(CS$2$0000); Label_0025:; } Label_0026: return; }
Kod gwarantuje, że zawsze wyjdziemy z sekcji krytycznej, nigdy nie wywołamy Exit gdy nie został prawidłowo wykonany Enter. Czy to znaczy, że nasz kod jest perfekcyjny? Moim zdaniem nie i osobiście staram się unikać lock w kodzie w wielu scenariuszach. Co jeśli nasz kod wywołał wyjątek w sekcji krytycznej? Używając lock, natychmiast wywołamy Exit POZWALAJĄC innym wątkom na dostęp do tych danych. Skoro wystąpił jakiś wyjątek to oznacza, że coś nie poszło po naszej myśli i być może mamy do czynienia z nieprawidłowym stanem aplikacji. W takiej sytuacji lepiej jest mieć deadlock, niż pozwolić innym wątkom zepsuć jeszcze bardziej nasze dane.
Drugi problem z lock to brak timeout. Moim zdaniem lepiej przekazać jakiś timeout, po którym wątek nie będzie blokował już więcej wykonania. Używając metody Enter.TryEnter mamy kontrole, ile czasu będziemy próbować wejść do sekcji krytycznej. Oczywiście, należy dobrze zaprojektować algorytm abyśmy mogli w łatwy sposób wykryć deadlocki a nie po prostu je ignorować.
Jakiś czas temu zapowiadałem drugą część artykułu o GC. Tym razem będzie o różnych trybach GC i kiedy z jakiego należy korzystać, tak, aby aplikacja zachowywała się płynnie oraz sprawiała wrażenie, że działa w czasie rzeczywistym. Zapraszam do lektury!
http://msdn.microsoft.com/pl-pl/library/garbage-collector-cz-2
W ostatnim poście zajęliśmy się wprowadzeniem do TPL Dataflows. Użyliśmy ActionBlock do implementacji wzorca producent\konsument. Dzisiaj dołączymy kolejne bloki, aby pokazać na czym polega tworzenie współbieżnych algorytmów w TPL.
ActionBlock przetwarzał wyłącznie dane – nie zwracał żadnego rezultatu. Innymi słowy, przyjmował parametry wejściowe ale zwracał wyłącznie void. TransformBlock implementuje zarówno ITargetBlock jak i ISourceBlock – stanowi również źródło danych. Rozważmy przykład:
private static void Main(string[] args) { var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i)); sinBlock.Post(1); sinBlock.Post(0); sinBlock.Post(3.14); sinBlock.Complete(); for (int i = 0; i < 3;i++ ) { Console.WriteLine(sinBlock.Receive()); } }
Funkcje Post i Complete powinny być znane z poprzedniego wpisu. Proszę zauważyć, że TransformBlock, w przeciwieństwie do ActionBlock przyjmuje dwa typy generyczne. Jeden to parametr wejściowy a drugi to oczywiście typ wartości zwracanej. Wartości wygenerowane na wyjściu można odbierać synchronicznie za pomocą Receiver – funkcja blokuje wykonanie aż do momentu zwrócenia wartości. Można również przekazać timeout w Receive. Innym sposobem odbierania danych to TryReceive:
for (int i = 0; i <5 ;i++ ) { double value; if (sinBlock.TryReceive(out value)) Console.WriteLine(value); }
Ponadto można użyć słowa await, aby odebrać dane w pełni asynchronicznie:
double resultReceived = await sinBlock.ReceiveAsync();
Jeśli chcemy otrzymać wszystkie dostępne dane wtedy należy użyć TryReceiveAll:
IList<double> results; sinBlock.TryReceiveAll(out results);
Istnieje naprawdę wiele sposobów odbierania danych – zachęcam do przejrzenia dostępnych funkcji. W praktyce jednak, dużo częściej będziemy chcieli przekazać wyjście TransformBlock na wejście ActionBlock – to tak naprawdę jest przyczyna dla której TPL Dataflows został zaimplementowany. Załóżmy, że TransformBlock liczy funkcję sinus, z kolei ActionBlock ma zadanie wyświetlenie tej wartości:
private static void Main(string[] args) { var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i)); var actionBlock = new ActionBlock<double>(i=>Console.WriteLine(i)); sinBlock.LinkTo(actionBlock); sinBlock.Post(1); sinBlock.Post(0); sinBlock.Post(3.14); sinBlock.Complete(); sinBlock.Completion.Wait(); }
Wystarczy użyć LinkTo w celu połączenia dwóch bloków ze sobą. Istnieje kilka przeładowań LinkTo. Jednym z nich, jest możliwość przekazania filtru:
sinBlock.LinkTo(actionBlock,i=>i>5);
Takim sposobem, tylko wartości większe niż 5 będą przekazywane do następnego bloku.
Ostatnim blokiem do omówienia w tym poście jest BroadcastBlock, służący do klonowania wiadomości i rozsyłania ich. Przykład:
class Program { private static void Main(string[] args) { BroadcastBlock<double> broadcastBlock=new BroadcastBlock<double>(i=>i); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.Post(1); broadcastBlock.Post(0); broadcastBlock.Post(3.14); Console.ReadLine(); } private static TransformBlock<double, double> CreateTransformBlock() { var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i)); var actionBlock = new ActionBlock<double>(i => Console.WriteLine(i)); sinBlock.LinkTo(actionBlock); return sinBlock; } }
W konstruktorze BroadCastBlock definiujemy funkcje klonowania elementów – w naszym przypadku jest to po prostu zwrócenie tej samej wartości. W przypadku typów referencyjnych, być może będziemy chcieli dokonać własnej kopii obiektów, aby poszczególne bloki nie operowały na tym samym obiekcie.
Czy TransformBlock może również wywołać kilka razy LinkTo? Jak się okazuje, nic nie stoi na przeszkodzie, aby napisać:
private static void Main(string[] args) { var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i)); var actionBlock1 = new ActionBlock<double>(i => Console.WriteLine(i)); var actionBlock2 = new ActionBlock<double>(i => Console.WriteLine(i)); var actionBlock3 = new ActionBlock<double>(i => Console.WriteLine(i)); sinBlock.LinkTo(actionBlock1); sinBlock.LinkTo(actionBlock2); sinBlock.LinkTo(actionBlock3); sinBlock.Post(1); Console.ReadLine(); }
Czy oznacza to, że TransformBlock zachowa się tak samo jak BroadCastBlock? Tylko BroadcastBlock wyśle wiadomości do wszystkich podłączonych elementów. TransformBlock, wyśle do pierwszego, który zaakceptuje wiadomość. Na ekranie zatem wyświetli się jedna wartość a nie trzy – pierwszy blok zaakceptował wiadomość i następnie nie będzie ona rozsyłana do kolejnych bloków. Z kolei taki kod, wyświetli 3 wiadomości:
private static void Main(string[] args) { var sinBlock = new BroadcastBlock<double>(i => Math.Sin(i)); var actionBlock1 = new ActionBlock<double>(i => Console.WriteLine(i)); var actionBlock2 = new ActionBlock<double>(i => Console.WriteLine(i)); var actionBlock3 = new ActionBlock<double>(i => Console.WriteLine(i)); sinBlock.LinkTo(actionBlock1); sinBlock.LinkTo(actionBlock2); sinBlock.LinkTo(actionBlock3); sinBlock.Post(1); Console.ReadLine(); }
W kolejnym poście omówię kolejne bloki, które podobnie jak BroadcastBlock, potrafią buforować dane i rozsyłać je do podłączonych elementów.
Biblioteka TPL istnieje już od jakiegoś czasu i raczej jest znana dla większości programistów. W .NET 4.5 Microsoft poszedł jednak o kilka kroków do przodu i dostarczył tzw. TPL DataFlows. Jest on oparty oczywiście na bibliotece TPL, dostarcza jednak kilka bardzo ciekawych klas, przydatnych do modelowania współbieżnego. Czasami algorytmy składają się z kilku “bloków”, które należy ze sobą synchronizować. TPL DataFlows służy do modelowania przepływu między różnymi wątkami. We wczesnych wersjach .NET byliśmy skazani na callback’i i manualne przekazywanie danych. Za pomocą TPL DataFlwos możemy w łatwy sposób przekazać wyjście jednego wątku na wejście drugiego. Część zastosowania będzie pokrywać się zatem z RX Extensions lub po prostu słowem kluczowym async\await.
Zacznijmy od początku – instalacji TPL Dataflows z NuGet:
Idea TPL DataFlows polega na konstrukcji bloków, które następnie mogą być przetwarzane równolegle (coś analogicznego do Windows Workflow Foundation). W tym cyklu wpisów, poznamy różne typy bloków. Wszystkie z nich zawsze będą implementowały interfejs IDataFlowBlock:
// Summary: // Represents a dataflow block. public interface IDataflowBlock { // Summary: // Gets a System.Threading.Tasks.Task that represents the asynchronous operation // and completion of the dataflow block. // // Returns: // The task. Task Completion { get; } // Summary: // Signals to the System.Threading.Tasks.Dataflow.IDataflowBlock that it should // not accept nor produce any more messages nor consume any more postponed messages. void Complete(); // // Summary: // Causes the System.Threading.Tasks.Dataflow.IDataflowBlock to complete in // a System.Threading.Tasks.TaskStatus.Faulted state. // // Parameters: // exception: // The System.Exception that caused the faulting. // // Exceptions: // System.ArgumentNullException: // The exception is null. void Fault(Exception exception); }
Funkcja Complete jest wywoływana gdy blok skończył przetwarzanie danych. Fault może zostać wywołany, gdy wystąpił jakiś wyjątek. Właściwość Completion zwraca wątek za pomocą którego można np. sprawdzić czy operacje zostały już wykonane. Upraszczając, poszczególne elementy, mogą akceptować parametry wejściowe oraz zwracać jakiś wynik. Bloki, które wyłącznie przetwarzają dane implementują interfejs ITargetBlock:
// Summary: // Represents a dataflow block that is a target for data. // // Type parameters: // TInput: // Specifies the type of data accepted by the System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.This // type parameter is contravariant. That is, you can use either the type you // specified or any type that is less derived. For more information about covariance // and contravariance, see Covariance and Contravariance in Generics. public interface ITargetBlock<in TInput> : IDataflowBlock { // Summary: // Offers a message to the System.Threading.Tasks.Dataflow.ITargetBlock<TInput>, // giving the target the opportunity to consume or postpone the message. // // Parameters: // messageHeader: // A System.Threading.Tasks.Dataflow.DataflowMessageHeader instance that represents // the header of the message being offered. // // messageValue: // The value of the message being offered. // // source: // The System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> offering the message. // This may be null. // // consumeToAccept: // Set to true to instruct the target to call System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.ConsumeMessage() // synchronously during the call to System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.OfferMessage(), // prior to returning System.Threading.Tasks.Dataflow.DataflowMessageStatus.Accepted, // in order to consume the message. // // Returns: // The status of the offered message. If the message was accepted by the target, // System.Threading.Tasks.Dataflow.DataflowMessageStatus.Accepted is returned, // and the source should no longer use the offered message, because it is now // owned by the target. If the message was postponed by the target, System.Threading.Tasks.Dataflow.DataflowMessageStatus.Postponed // is returned as a notification that the target may later attempt to consume // or reserve the message; in the meantime, the source still owns the message // and may offer it to other blocks.If the target would have otherwise postponed // message, but source was null, System.Threading.Tasks.Dataflow.DataflowMessageStatus.Declined // is instead returned. If the target tried to accept the message but missed // it due to the source delivering the message to another target or simply discarding // it, System.Threading.Tasks.Dataflow.DataflowMessageStatus.NotAvailable is // returned. If the target chose not to accept the message, System.Threading.Tasks.Dataflow.DataflowMessageStatus.Declined // is returned. If the target chose not to accept the message and will never // accept another message from this source, System.Threading.Tasks.Dataflow.DataflowMessageStatus.DecliningPermanently // is returned. // // Exceptions: // System.ArgumentException: // The messageHeader is not valid.-or-consumeToAccept may only be true if provided // with a non-null source. DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept); }
Z kolei bloki będące źródłem danych implementują ISourceBlock:
// Summary: // Represents a dataflow block that is a source of data. // // Type parameters: // TOutput: // Specifies the type of data supplied by the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.This // type parameter is covariant. That is, you can use either the type you specified // or any type that is more derived. For more information about covariance and // contravariance, see Covariance and Contravariance in Generics. public interface ISourceBlock<out TOutput> : IDataflowBlock { // Summary: // Called by a linked System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to // accept and consume a System.Threading.Tasks.Dataflow.DataflowMessageHeader // previously offered by this System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. // // Parameters: // messageHeader: // The System.Threading.Tasks.Dataflow.DataflowMessageHeader of the message // being consumed. // // target: // The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> consuming the message. // // messageConsumed: // true if the message was successfully consumed; otherwise, false. // // Returns: // The value of the consumed message. This may correspond to a different System.Threading.Tasks.Dataflow.DataflowMessageHeader // instance than was previously reserved and passed as the messageHeader to // System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.ConsumeMessage(). The // consuming System.Threading.Tasks.Dataflow.ITargetBlock<TInput> must use the // returned value instead of the value passed as messageValue through System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.OfferMessage().If // the message requested is not available, the return value will be null. // // Exceptions: // System.ArgumentException: // The messageHeader is not valid. // // System.ArgumentNullException: // The target is null. [SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "2#")] TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed); // // Summary: // Links the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> to the specified // System.Threading.Tasks.Dataflow.ITargetBlock<TInput>. // // Parameters: // target: // The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to which to connect // this source. // // linkOptions: // A System.Threading.Tasks.Dataflow.DataflowLinkOptions instance that configures // the link. // // Returns: // An IDisposable that, upon calling Dispose, will unlink the source from the // target. // // Exceptions: // System.ArgumentNullException: // target is null (Nothing in Visual Basic) or linkOptions is null (Nothing // in Visual Basic). IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions); // // Summary: // Called by a linked System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to // release a previously reserved System.Threading.Tasks.Dataflow.DataflowMessageHeader // by this System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. // // Parameters: // messageHeader: // The System.Threading.Tasks.Dataflow.DataflowMessageHeader of the reserved // message being released. // // target: // The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> releasing the message // it previously reserved. // // Exceptions: // System.ArgumentException: // The messageHeader is not valid. // // System.ArgumentNullException: // The target is null. // // System.InvalidOperationException: // The target did not have the message reserved. void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target); // // Summary: // Called by a linked System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to // reserve a previously offered System.Threading.Tasks.Dataflow.DataflowMessageHeader // by this System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. // // Parameters: // messageHeader: // The System.Threading.Tasks.Dataflow.DataflowMessageHeader of the message // being reserved. // // target: // The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> reserving the message. // // Returns: // true if the message was successfully reserved; otherwise, false. // // Exceptions: // System.ArgumentException: // The messageHeader is not valid. // // System.ArgumentNullException: // The target is null. bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target); }
Oczywiście są bloki, które implementują obydwa interfejsy– przetwarzają dane oraz są również źródłem danych. Stwórzmy zatem pierwszą aplikację korzystającą z TPL DataFlows. Użyjemy bloku ActionBlock<T>, który implementuje ITargetBlock – akceptuje dane ale nie jest źródłem danych:
class Program { static void Main(string[] args) { var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a)); for (int i = 0; i < 10; i++) { actionBlock.Post(i); } Console.ReadLine(); } }
Powyższy blok, po prostu wyświetla przekazany parametr. Funkcja Post, wysyła dane do bloku w sposób “synchroniczny”. W celu wysłania danych całkowicie asynchronicznie należy użyć SendAsync:
class Program { static void Main(string[] args) { var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a)); for (int i = 0; i < 10; i++) { actionBlock.SendAsync(i); } Console.ReadLine(); } }
Jaka jest różnica? Obie funkcje wysyłają tylko dane – przetwarzanie zawsze jest wykonywane w innym wątku w ActionBlock. Jeśli zatem oprócz wyświetlenia tekstu umieścilibyśmy Thread.Sleep(1000), zarówno Post jak i SendAsync nie blokowałyby wykonania. Synchroniczność\asynchroniczność w tym przypadku dotyczy jedynie w jaki sposób dane są wpompowane do bloku a nie jak są przetwarzane.
Operacje są zatem wykonywane w osobnym wątku. Można nawet zdecydować ile wątków powinno przetwarzać dane:
var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a),new new ExecutionDataflowBlockOptions(){MaxDegreeOfParallelism = 4});
Domyślnie MaxDegreeOfParallelism równy jest 1, co w praktyce oznacza, że operacje wykonywane są sekwencyjnie. Po wpompowaniu wszystkich danych można zakończyć przetwarzanie funkcją Complete:
static void Main(string[] args) { var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a),new ExecutionDataflowBlockOptions(){MaxDegreeOfParallelism = 4}); for (int i = 0; i < 10; i++) { actionBlock.SendAsync(i); } actionBlock.Complete(); actionBlock.Completion.Wait(); Console.WriteLine("Koniec"); }
Wait po prostu czeka, aż wszystkie elementy zostaną przetworzone. Można “czekać” również w sposób asynchroniczny:
await actionBlock.Completion;
Przedstawione informacje, póki co nie są czymś, co może w praktyce ułatwić nam pracę. Cała siła TPL DataFlows to modelowanie procesów za pomocą workflow. Aby to uczynić musimy oczywiście stworzyć więcej bloków i połączyć je ze sobą. Wyświetlanie tekstu w sposób asynchroniczny można przecież wykonywać chociażby w zwykłym TPL – w przyszłym wpisie dowiemy się już jak łączyć ze sobą różne bloki w całość. Dzisiaj tak naprawdę zaimplementowaliśmy wzorzec producent\konsument. SendAsync\Post produkuje dane, z kolei ActionBlock konsumuje je.
Serialziacja jest dobrze znanym tematem. Jakiś czas temu pisałem, jak oddelegować serializację jednego obiektu do drugiego. Pokazałem to na przykładzie wzorca singleton – w tamtym przypadku chcieliśmy oddelegować serializację do IObjectReference, który zwracał po prostu zawsze tą samą instancję.
Dziś trochę inny scenariusz. Załóżmy, że w plikach, zawsze chcemy trzymać czas w UTC a nie w konkretnej strefie. Ponadto nie mamy dostępu ani do kodu źródłowego DateTime ani nie chcemy korzystać z DateTimeOffset. Rozwiązaniem będzie użycie ISerializationSurrogate – interfejsu odpowiedzialnego za podstawienie logiki serializacji dla konkretnego typu. Gdybyśmy mieli dostęp do kodu źródłowego moglibyśmy po prostu zaimplementować interfejs ISerializable. W naszym przypadku nie mamy takiego komfortu albo po prostu nie chcemy mieszać logiki biznesowej z mechanizmem serializacji.
Zacznijmy od szablonu, na którym będziemy testować nasze rozwiązanie:
private static void Main(string[] args) { var timestamp=new DateTime(2010,1,1,11,50,0); using (MemoryStream stream = new MemoryStream()) { serializer.Serialize(stream, timestamp); stream.Position = 0; DateTime timestamp2 = (DateTime)serializer.Deserialize(stream); Console.WriteLine(timestamp2); } }
Datetime nie przechowuje informacji o danej strefie czasowej – stanowi zwykły kontener na daty. Co prawda, jest tam pole oznaczające czy mamy do czynienia z UTC czy lokalną strefą ale nie definiuje to dokładnego przesunięcia. Napiszmy zatem obiekt zastępczy, który będzie zapisywał do bazy to co dokładnie chcemy:
public class TimestampSurrogate : ISerializationSurrogate { public void GetObjectData(object obj, SerializationInfo info, StreamingContext context) { info.AddValue("Date", ((DateTime)obj).ToUniversalTime().ToString("u")); } public object SetObjectData(object obj, SerializationInfo info, StreamingContext context, ISurrogateSelector selector) { return DateTime.ParseExact(info.GetString("Date"), "u", null).ToLocalTime(); } }
Interfejs implementuje dwie metody w zależności czy obiekt jest właśnie zapisywany czy odczytywany. Konstrukcja powinna być już znana ponieważ przypomina np. tą przedstawioną w poście o serializacji singleton. Klasa SerializationInfo (parametr wejściowy) nie jest niczym nowym – służy do zapisu\odczytu danych ze strumienia. Metoda GetObjectData ma za zadanie zapisanie obiektu do strumienia. Za pomocą SerializationInfo.AddValue dodajemy pole “Date”, którego wartością jest czas w formacie UTC. Parametr wejściowy obj to obiekt, dla którego TimestampSurrogate jest zastępczy. Zatem jak najbardziej możliwe jest w naszym przypadku zrzutowanie obj do DateTime – chcemy stworzyć obiekt surrogate dla DateTime.
Analogicznie sprawa wygląda z SetObjectData. SetObjectData wywoływany jest w momencie deserializacji i ma za zadanie wypełnienie przetwarzanego obiektu danymi w prawidłowym formacie. W naszym przypadku chcemy odczytać zapisaną datę w formacie UTC (info.GetString(“Date”)) a następnie skonwertować do strefy lokalnej.
Proszę zauważyć, że w SetObjectData nie wykorzystujemy parametru obj, którym jest dany obiekt. Przed wywołaniem SetObjectData jest utworzona instancja danego obiektu (DateTime). Utworzona instancja ma jednak pewne specyficzne cechy. Wszystkie jej pola są ustawione na NULL albo 0. Konstruktor również nie jest wywołany. Zadaniem SetObjectData jest wypełnienie tych pól prawidłowymi wartościami. W naszym przypadku mamy do czynienia z Immutable Object więc nie ma to sensu – tworzymy i zwracamy całkowicie nowy obiekt.
Na końcu w jakiś sposób musimy powiedzieć serializatorowi, że chcemy używać danego surrogate. Służy do tego SurrogateSelctor, który definiuje mapowanie pomiędzy obiektami a ich surrogate:
private static void Main(string[] args) { var timestamp=new DateTime(2010,1,1,11,50,0); var serializer = new SoapFormatter(); SurrogateSelector surrogateSelector = new SurrogateSelector(); surrogateSelector.AddSurrogate(typeof(DateTime), serializer.Context, new TimestampSurrogate()); serializer.SurrogateSelector = surrogateSelector; using (MemoryStream stream = new MemoryStream()) { serializer.Serialize(stream, timestamp); stream.Position = 0; DateTime timestamp2 = (DateTime)serializer.Deserialize(stream); Console.WriteLine(timestamp2); } }
Warto jeszcze raz podkreślić, że SurrogateSelector zachowuje się dokładnie jak mapowanie – możemy wielokrotnie wywoływać AddSurrogate aby określić relacje między obiektami a ich surrogate. Dlaczego w AddSurrogate przekazujemy kontekst? Kontekst zawiera informacje o typie serializacji – czyli np. zawiera informacje o tym czy zapisujemy do pliku, pamięci czy sieci. Dzięki temu, możemy użyć różnego surrogate w zależności od danego “kontekstu” zapisu. Jedną z wartości Context.State może być: CrossProcess , CrossMachine, File, Persistence, Remoting , Other , Clone, CrossDomain. Więcej informacji można znaleźć tutaj.
Przykład:
ss.AddSurrogate(typeof(Employee),new StreamingContext(StreamingContextStates.Remoting),new EmployeeSerializationSurrogate());
Ponadto, oprócz kilku mapowań istnieje możliwość definicji nawet kilka ISurrogateSelector. Jeśli jest ktoś zainteresowany, to odsyłam do dokumentacji – tutaj chciałbym tylko zaznaczyć, że metody ChainSelector oraz GetNextSelector będą nas interesować:
public interface ISurrogateSelector { void ChainSelector(ISurrogateSelector selector); ISurrogateSelector GetNextSelector(); ISerializationSurrogate GetSurrogate(Type type, StreamingContext context,out ISurrogateSelector selector); }
Kilka wpisów wcześniej pisałem, dlaczego należy unikać funkcji Suspend. Dzisiaj przyszła kolej na metodę Abort, która również jest sygnałem, że zaprojektowana architektura jest po prostu zła. Aby zrozumieć, dlaczego Abort jest tak niebezpieczny, należy poznać najpierw zasadę jego działania. Wywołując Abort, wyrzucany jest tzw. asynchroniczny wyjątek ThreadAbortException. Dlaczego asynchroniczny? Ponieważ może on zostać wstrzyknięty w “dowolne” miejsce w kodzie. Istnieją pewne zasady, kiedy dokładnie może on zostać wyrzucony ale o tym później. Najpierw spróbujmy udowodnić, że faktycznie takowy wyjątek istnieje:
internal class Program { private static void Main(string[] args) { Task.Factory.StartNew(Run); Console.Read(); } private static void Run() { while (true) { Console.WriteLine("Iteracja..."); try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException e) { Console.WriteLine("Zlapano ThreadAbortException"); } } } }
Powyższa funkcja złapie wyjątek. ThreadAbortException jest specjalnym wyjątkiem ponieważ jest wyrzucany asynchronicznie oraz nie da się go zignorować – automatycznie jest zawsze ponownie wyrzucony przez CLR. Pomimo, że mamy catch i tak wątek będzie zatrzymany. Na końcu bowiem, zawsze jest rethrow. Aby anulować Abort, należy wywołać funkcję ResetAbort – wtedy wyjątek nie będzie ponownie wyrzucany:
internal class Program { private static void Main(string[] args) { Task.Factory.StartNew(Run); Console.Read(); } private static void Run() { while (true) { Console.WriteLine("Iteracja..."); try { try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException e) { Console.WriteLine("Zlapano ThreadAbortException"); } } catch(ThreadAbortException e) { Console.WriteLine("Wyjatek ponownie został wyrzucony."); Thread.ResetAbort(); } } } }
Powyższy drugi catch, zostanie wywołany ponieważ jak napisałem, CLR automatycznie zrobi rethrow. Abort jest bardziej inteligentniejszy od Suspend i nie zostanie wyrzucony wyjątek jeśli aktualnie wykonywany jest kod:
W takich przypadkach, wątek zostanie dopiero przerwany, gdy IP opuści te klauzule. W praktyce jednak istnieje wiele niebezpiecznych sytuacji np.:
IntPtr pointer = AllocateUnmanagedCode(); try { // jakas logika } finally { Free(pointer); }
Co w przypadku gdy wyjątek zostanie wyrzucony po alokacji zasobów ale przed przypisaniem ich do pointer? Zasoby nigdy nie zostaną zwolnione. Klauzula finally oczywiście nie zostanie wywołana ani destruktor IntPtr. Problem z abort jest taki, że nie wiadomo kiedy zostanie wywołany. Jeśli stanie się to w sytuacji gdy np. otwieramy plik, ale przed przypisaniem do wskaźnika, wtedy plik zostanie otwarty mimo, że wątek już nie pracuje. Bardzo trudno byłoby zaprojektować kod odporny na ThreadAbortException i z tego względu należy unikać asynchronicznych wyjątków.