TPL Dataflows – część III (bloki buforujące)

Dzisiaj wracamy do tematu TPL Dataflows. W ostatniej części zajęliśmy się m.in. BroadcastBlock, który jest jednym z bloków buforujących. Dla przypomnienia przykład:

class Program
{
   private static void Main(string[] args)
   {
       BroadcastBlock<double> broadcastBlock=new BroadcastBlock<double>(i=>i);

       broadcastBlock.LinkTo(CreateTransformBlock());
       broadcastBlock.LinkTo(CreateTransformBlock());
       broadcastBlock.LinkTo(CreateTransformBlock());

       broadcastBlock.Post(1);
       broadcastBlock.Post(0);
       broadcastBlock.Post(3.14);

       Console.ReadLine();
   }

   private static TransformBlock<double, double> CreateTransformBlock()
   {
       var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i));
       var actionBlock = new ActionBlock<double>(i => Console.WriteLine(i));
       sinBlock.LinkTo(actionBlock);

       return sinBlock;
   }
}

Normalnie jeśli blok A wysyła dane do bloku B, to po zaakceptowaniu ich przez blok B, są one usuwane z bloku A. BroadcastBlock ma jednak bufor, który umożliwia przechowanie tych informacji i wysłanie ich od kolejnych elementów. W DataFlows do dyspozycji mamy jeszcze dwa inne bloki buforujące.

Kolejnym blokiem jest zatem BufferBlock. W przeciwieństwie do BroadcastBlock, przechowuje on również dane historyczne a nie tylko najnowsze. Drugą różnicą w stosunku do BroadcastBlock jest fakt, że gdy target zaakceptuje wiadomość, nie będą one już rozsyłane do kolejnych bloków. Pierwszą różnicę pokazuje następujący kod:

BufferBlock<double> bufferBlock = new BufferBlock<double>();
BroadcastBlock<double> broadcastBlock = new BroadcastBlock<double>(i=>i);

bufferBlock.Post(0);
bufferBlock.Post(5);

broadcastBlock.Post(0);
broadcastBlock.Post(5);


Thread.Sleep(1000);

Console.WriteLine("Buffer block:");
Console.WriteLine(bufferBlock.Receive());
Console.WriteLine(bufferBlock.Receive());

Console.WriteLine("Broadcast block:");
Console.WriteLine(broadcastBlock.Receive());
Console.WriteLine(broadcastBlock.Receive());  

BroadtcastBlock zwróci dwa razy “5” ponieważ przechowuje on wyłącznie ostatnie wartości. BufferBlock z kolei ma pełną historie danych. Druga różnica polega na tym, że gdy jakiś blok zaakceptuje dane, wtedy BufferBlocked nie wysyła ich do kolejnych bloków:

BufferBlock<double> bufferBlock = new BufferBlock<double>();
BroadcastBlock<double> broadcastBlock = new BroadcastBlock<double>(i=>i);

ActionBlock<double> actionBlock1=new ActionBlock<double>(i=>Console.WriteLine("A:{0}",i));
ActionBlock<double> actionBlock2=new ActionBlock<double>(i=>Console.WriteLine("B:{0}",i));

bufferBlock.LinkTo(actionBlock1);
bufferBlock.LinkTo(actionBlock2);

broadcastBlock.LinkTo(actionBlock1);
broadcastBlock.LinkTo(actionBlock2);

bufferBlock.Post(0);
bufferBlock.Post(5);

broadcastBlock.Post(0);
broadcastBlock.Post(5);

Broadcast wyśle wiadomości zarówno do actionBlock1 jak i do actionBlock2. Z kolei bufferBlock wyłącznie do actionblock1 – wysyłane one są do pierwszego bloku, który zaakceptuje dane.

Ostatnim blokiem jest WriteOnceBlock. Zachowuje się on jak BroadcastBlock, z tym, że można zapisać do niego tylko raz jakąś informacje (analogiczne do słowa kluczowego readonly w c#). Przykład (MSDN):

// Create a WriteOnceBlock<string> object. 
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first  
// message to be received is written to the block.  
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */

W powyższym przypadku Message2 była zapisana jako pierwsza i kolejne “posty” są po prostu ignorowane. W następnych wpisach postaram się podać jakieś bardziej praktyczne przykłady zastosowania BufferBlock i WriteOnceBlock.

Leave a Reply

Your email address will not be published.