Dzisiaj wracamy do tematu TPL Dataflows. W ostatniej części zajęliśmy się m.in. BroadcastBlock, który jest jednym z bloków buforujących. Dla przypomnienia przykład:
class Program { private static void Main(string[] args) { BroadcastBlock<double> broadcastBlock=new BroadcastBlock<double>(i=>i); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.LinkTo(CreateTransformBlock()); broadcastBlock.Post(1); broadcastBlock.Post(0); broadcastBlock.Post(3.14); Console.ReadLine(); } private static TransformBlock<double, double> CreateTransformBlock() { var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i)); var actionBlock = new ActionBlock<double>(i => Console.WriteLine(i)); sinBlock.LinkTo(actionBlock); return sinBlock; } }
Normalnie jeśli blok A wysyła dane do bloku B, to po zaakceptowaniu ich przez blok B, są one usuwane z bloku A. BroadcastBlock ma jednak bufor, który umożliwia przechowanie tych informacji i wysłanie ich od kolejnych elementów. W DataFlows do dyspozycji mamy jeszcze dwa inne bloki buforujące.
Kolejnym blokiem jest zatem BufferBlock. W przeciwieństwie do BroadcastBlock, przechowuje on również dane historyczne a nie tylko najnowsze. Drugą różnicą w stosunku do BroadcastBlock jest fakt, że gdy target zaakceptuje wiadomość, nie będą one już rozsyłane do kolejnych bloków. Pierwszą różnicę pokazuje następujący kod:
BufferBlock<double> bufferBlock = new BufferBlock<double>(); BroadcastBlock<double> broadcastBlock = new BroadcastBlock<double>(i=>i); bufferBlock.Post(0); bufferBlock.Post(5); broadcastBlock.Post(0); broadcastBlock.Post(5); Thread.Sleep(1000); Console.WriteLine("Buffer block:"); Console.WriteLine(bufferBlock.Receive()); Console.WriteLine(bufferBlock.Receive()); Console.WriteLine("Broadcast block:"); Console.WriteLine(broadcastBlock.Receive()); Console.WriteLine(broadcastBlock.Receive());
BroadtcastBlock zwróci dwa razy “5” ponieważ przechowuje on wyłącznie ostatnie wartości. BufferBlock z kolei ma pełną historie danych. Druga różnica polega na tym, że gdy jakiś blok zaakceptuje dane, wtedy BufferBlocked nie wysyła ich do kolejnych bloków:
BufferBlock<double> bufferBlock = new BufferBlock<double>(); BroadcastBlock<double> broadcastBlock = new BroadcastBlock<double>(i=>i); ActionBlock<double> actionBlock1=new ActionBlock<double>(i=>Console.WriteLine("A:{0}",i)); ActionBlock<double> actionBlock2=new ActionBlock<double>(i=>Console.WriteLine("B:{0}",i)); bufferBlock.LinkTo(actionBlock1); bufferBlock.LinkTo(actionBlock2); broadcastBlock.LinkTo(actionBlock1); broadcastBlock.LinkTo(actionBlock2); bufferBlock.Post(0); bufferBlock.Post(5); broadcastBlock.Post(0); broadcastBlock.Post(5);
Broadcast wyśle wiadomości zarówno do actionBlock1 jak i do actionBlock2. Z kolei bufferBlock wyłącznie do actionblock1 – wysyłane one są do pierwszego bloku, który zaakceptuje dane.
Ostatnim blokiem jest WriteOnceBlock. Zachowuje się on jak BroadcastBlock, z tym, że można zapisać do niego tylko raz jakąś informacje (analogiczne do słowa kluczowego readonly w c#). Przykład (MSDN):
// Create a WriteOnceBlock<string> object. var writeOnceBlock = new WriteOnceBlock<string>(null); // Post several messages to the block in parallel. The first // message to be received is written to the block. // Subsequent messages are discarded. Parallel.Invoke( () => writeOnceBlock.Post("Message 1"), () => writeOnceBlock.Post("Message 2"), () => writeOnceBlock.Post("Message 3")); // Receive the message from the block. Console.WriteLine(writeOnceBlock.Receive()); /* Sample output: Message 2 */
W powyższym przypadku Message2 była zapisana jako pierwsza i kolejne “posty” są po prostu ignorowane. W następnych wpisach postaram się podać jakieś bardziej praktyczne przykłady zastosowania BufferBlock i WriteOnceBlock.