TPL Dataflows–część II (TransformBlock i BroadcastBlock)

W ostatnim poście zajęliśmy się wprowadzeniem do TPL Dataflows. Użyliśmy ActionBlock do implementacji wzorca producent\konsument. Dzisiaj dołączymy kolejne bloki, aby pokazać na czym polega tworzenie współbieżnych algorytmów w TPL.

ActionBlock przetwarzał wyłącznie dane – nie zwracał żadnego rezultatu. Innymi słowy, przyjmował parametry wejściowe ale zwracał wyłącznie void. TransformBlock implementuje zarówno ITargetBlock jak i ISourceBlock – stanowi również źródło danych. Rozważmy przykład:

private static void Main(string[] args)
{
  var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i));

  sinBlock.Post(1);
  sinBlock.Post(0);
  sinBlock.Post(3.14);
  sinBlock.Complete();
  
  for (int i = 0; i < 3;i++ )
  {
      Console.WriteLine(sinBlock.Receive());
  }
}

Funkcje Post i Complete powinny być znane z poprzedniego wpisu. Proszę zauważyć, że TransformBlock, w przeciwieństwie do ActionBlock przyjmuje dwa typy generyczne. Jeden to parametr wejściowy a drugi to oczywiście typ wartości zwracanej. Wartości wygenerowane na wyjściu można odbierać synchronicznie za pomocą Receiver – funkcja blokuje wykonanie aż do momentu zwrócenia wartości. Można również przekazać timeout w Receive. Innym sposobem odbierania danych to TryReceive:

for (int i = 0; i <5 ;i++ )
{
 double value;
 if (sinBlock.TryReceive(out value))
     Console.WriteLine(value);
}

Ponadto można użyć słowa await, aby odebrać dane w pełni asynchronicznie:

double resultReceived = await sinBlock.ReceiveAsync();

Jeśli chcemy otrzymać wszystkie dostępne dane wtedy należy użyć TryReceiveAll:

IList<double> results;
sinBlock.TryReceiveAll(out results);

Istnieje naprawdę wiele sposobów odbierania danych – zachęcam do przejrzenia dostępnych funkcji. W praktyce jednak, dużo częściej będziemy chcieli przekazać wyjście TransformBlock na wejście ActionBlock – to tak naprawdę jest przyczyna dla której TPL Dataflows został zaimplementowany. Załóżmy, że TransformBlock liczy funkcję sinus, z kolei ActionBlock ma zadanie wyświetlenie tej wartości:

private static void Main(string[] args)
{
  var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i));
  var actionBlock = new ActionBlock<double>(i=>Console.WriteLine(i));

  sinBlock.LinkTo(actionBlock);

  sinBlock.Post(1);
  sinBlock.Post(0);
  sinBlock.Post(3.14);
  sinBlock.Complete();
  
  sinBlock.Completion.Wait();
}

Wystarczy użyć LinkTo w celu połączenia dwóch bloków ze sobą.  Istnieje kilka przeładowań LinkTo. Jednym z nich, jest możliwość przekazania filtru:

sinBlock.LinkTo(actionBlock,i=>i>5);

Takim sposobem, tylko wartości większe niż 5 będą przekazywane do następnego bloku.

Ostatnim blokiem do omówienia w tym poście jest BroadcastBlock, służący do klonowania wiadomości i rozsyłania ich. Przykład:

class Program
{
   private static void Main(string[] args)
   {
       BroadcastBlock<double> broadcastBlock=new BroadcastBlock<double>(i=>i);

       broadcastBlock.LinkTo(CreateTransformBlock());
       broadcastBlock.LinkTo(CreateTransformBlock());
       broadcastBlock.LinkTo(CreateTransformBlock());

       broadcastBlock.Post(1);
       broadcastBlock.Post(0);
       broadcastBlock.Post(3.14);

       Console.ReadLine();
   }

   private static TransformBlock<double, double> CreateTransformBlock()
   {
       var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i));
       var actionBlock = new ActionBlock<double>(i => Console.WriteLine(i));
       sinBlock.LinkTo(actionBlock);

       return sinBlock;
   }
}

W konstruktorze BroadCastBlock definiujemy funkcje klonowania elementów – w naszym przypadku jest to po prostu zwrócenie tej samej wartości. W przypadku typów referencyjnych, być może będziemy chcieli dokonać własnej kopii obiektów, aby poszczególne bloki nie operowały na tym samym obiekcie.

Czy TransformBlock może również wywołać kilka razy LinkTo? Jak się okazuje, nic nie stoi na przeszkodzie, aby napisać:

private static void Main(string[] args)
{             
  var sinBlock = new TransformBlock<double, double>(i => Math.Sin(i));
  var actionBlock1 = new ActionBlock<double>(i => Console.WriteLine(i));
  var actionBlock2 = new ActionBlock<double>(i => Console.WriteLine(i));
  var actionBlock3 = new ActionBlock<double>(i => Console.WriteLine(i));
  
  sinBlock.LinkTo(actionBlock1);
  sinBlock.LinkTo(actionBlock2);
  sinBlock.LinkTo(actionBlock3);

  sinBlock.Post(1);

  Console.ReadLine();
}  

Czy oznacza to, że TransformBlock zachowa się tak samo jak BroadCastBlock? Tylko BroadcastBlock wyśle wiadomości do wszystkich podłączonych elementów. TransformBlock, wyśle do pierwszego, który zaakceptuje wiadomość. Na ekranie zatem wyświetli się jedna wartość a nie trzy – pierwszy blok zaakceptował wiadomość i następnie nie będzie ona rozsyłana do kolejnych bloków. Z kolei taki kod, wyświetli 3 wiadomości:

private static void Main(string[] args)
{             
  var sinBlock = new BroadcastBlock<double>(i => Math.Sin(i));
  var actionBlock1 = new ActionBlock<double>(i => Console.WriteLine(i));
  var actionBlock2 = new ActionBlock<double>(i => Console.WriteLine(i));
  var actionBlock3 = new ActionBlock<double>(i => Console.WriteLine(i));
  
  sinBlock.LinkTo(actionBlock1);
  sinBlock.LinkTo(actionBlock2);
  sinBlock.LinkTo(actionBlock3);

  sinBlock.Post(1);

  Console.ReadLine();
}  

W kolejnym poście omówię kolejne bloki, które podobnie jak BroadcastBlock, potrafią buforować dane i rozsyłać je do podłączonych elementów.

Leave a Reply

Your email address will not be published.