TPL Dataflows – część IV (przykład z BufferBlock)

W ostatnim wpisie pokazałem dostępne bloki buforujące. Dzisiaj zajmiemy się prostym przykładem, który jest bardziej praktyczny  od tego przedstawionego w poprzednim poście. Załóżmy, że piszemy system, który składa się z kilku wątków przetwarzających. Każdy z nich pełni rolę konsumenta – przetwarza dane. Chcemy to tak zoptymalizować, aby nowe dane były wysyłane wyłącznie do jak najmniej zajętych węzłów. Oczywiście temat jest bardziej skomplikowany niż może wydawać się, ale dzisiaj pokażemy jak można do tego celu wykorzystać BufferBlock.

image

Producentem oczywiście będzie BufferBlock. Z kolei konsumenci to dowolne akcje – może to być TransformBlock czy ActionBlock. W tym przykładzie chcemy wyświetlić wyłącznie dane więc ActionBlock jest wystarczający.

Pierwsza wersja algorytmu mogłaby wyglądać następująco:

class Program
{
   private static void Main(string[] args)
   {
       BufferBlock<int> bufferBlock = new BufferBlock<int>();

       bufferBlock.LinkTo(CreateActionBlock("A"));
       bufferBlock.LinkTo(CreateActionBlock("B"));
       bufferBlock.LinkTo(CreateActionBlock("C"));

       for (int i = 0; i < 10; i++)
           bufferBlock.SendAsync(i);

       Console.ReadLine();
   }
   private static ActionBlock<int> CreateActionBlock(string actionName)
   {
       var actionBlock = new ActionBlock<int>(i => Consume(actionName, i));
       return actionBlock;
   }
   private static void Consume(string actionName,int i)
   {       
       Console.WriteLine("{0}:{1}", actionName, i);
       Thread.Sleep(1000); // symulacja przetwarzania danych   
   }
}

Wiemy, że BufferBlock dostarczy dane tylko do pierwszego bloku, który je zaakceptuje (w przeciwieństwie do BroadcastBlock). Jeśli zatem ActionA obsłuży dane, nie są one potem duplikowane w ActionB i ActionC (patrz poprzedni post jeśli jest to niezrozumiałe). W tym przypadku jest to dokładnie zachowanie jakiego oczekujemy. Jeśli ActionA nie może obsłużyć zapytania (bo jest zajęty przetwarzaniem innego), wtedy BufferBlock spróbuje dostarczyć dane do kolejnego bloku  ActionB. Po uruchomieniu powyższego kodu dostaniemy jednak następujące wartości:

 

image

Co to oznacza? Wszystkie zapytania zostały skierowane do węzła A. Oczywiście nie o to nam chodziło. Przetworzenie każdej danej zajmuje >1000ms (patrz funkcja Sleep) zatem spodziewalibyśmy się, ze na wyjściu ujrzymy coś w stylu A,B,C,A,B,C. Spowodowane to jest domyślnymi ustawieniami. Możemy ustawić właściwość BoundedCapacity określającą co ma się stać z wiadomościami, których nie można natychmiast obsłużyć. Domyślna wartość to –1, co oznacza, że wszystkie wiadomości będą kolejkowane. My zmienimy oczywiście na 1:

private static ActionBlock<int> CreateActionBlock(string actionName)
{
    var options = new ExecutionDataflowBlockOptions {BoundedCapacity = 1};
    var actionBlock = new ActionBlock<int>(i => Consume(actionName, i),options);            
    return actionBlock;
}

Teraz na wyjściu otrzymamy:

image

W następnym poście o kolejnej grupie bloków oraz o ich chciwości.

Leave a Reply

Your email address will not be published.