TPL Dataflows–wprowadzenie (część I)

Biblioteka TPL istnieje już od jakiegoś czasu i raczej jest znana dla większości programistów. W .NET 4.5 Microsoft poszedł jednak o kilka kroków do przodu i dostarczył tzw. TPL DataFlows. Jest on oparty oczywiście na bibliotece TPL, dostarcza jednak kilka bardzo ciekawych klas, przydatnych do modelowania współbieżnego. Czasami algorytmy składają się z kilku “bloków”, które należy ze sobą synchronizować. TPL DataFlows służy do modelowania przepływu między różnymi wątkami. We wczesnych wersjach .NET byliśmy skazani na callback’i i manualne przekazywanie danych. Za pomocą TPL DataFlwos możemy w łatwy sposób przekazać wyjście jednego wątku na wejście drugiego. Część zastosowania będzie pokrywać się zatem z RX Extensions lub po prostu słowem kluczowym async\await.

Zacznijmy od początku – instalacji TPL Dataflows z NuGet:

image

Idea TPL DataFlows polega na konstrukcji bloków, które następnie mogą być przetwarzane równolegle (coś analogicznego do Windows Workflow Foundation). W tym cyklu wpisów, poznamy różne typy bloków. Wszystkie z nich zawsze będą implementowały interfejs IDataFlowBlock:

// Summary:
//     Represents a dataflow block.
public interface IDataflowBlock
{
   // Summary:
   //     Gets a System.Threading.Tasks.Task that represents the asynchronous operation
   //     and completion of the dataflow block.
   //
   // Returns:
   //     The task.
   Task Completion { get; }

   // Summary:
   //     Signals to the System.Threading.Tasks.Dataflow.IDataflowBlock that it should
   //     not accept nor produce any more messages nor consume any more postponed messages.
   void Complete();
   //
   // Summary:
   //     Causes the System.Threading.Tasks.Dataflow.IDataflowBlock to complete in
   //     a System.Threading.Tasks.TaskStatus.Faulted state.
   //
   // Parameters:
   //   exception:
   //     The System.Exception that caused the faulting.
   //
   // Exceptions:
   //   System.ArgumentNullException:
   //     The exception is null.
   void Fault(Exception exception);
}

Funkcja Complete jest wywoływana gdy blok skończył przetwarzanie danych. Fault może zostać wywołany, gdy wystąpił jakiś wyjątek. Właściwość Completion zwraca wątek za pomocą którego można np. sprawdzić czy operacje zostały już wykonane. Upraszczając, poszczególne elementy, mogą akceptować parametry wejściowe oraz zwracać jakiś wynik. Bloki, które wyłącznie przetwarzają dane implementują interfejs ITargetBlock:

// Summary:
//     Represents a dataflow block that is a target for data.
//
// Type parameters:
//   TInput:
//     Specifies the type of data accepted by the System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.This
//     type parameter is contravariant. That is, you can use either the type you
//     specified or any type that is less derived. For more information about covariance
//     and contravariance, see Covariance and Contravariance in Generics.
public interface ITargetBlock<in TInput> : IDataflowBlock
{
   // Summary:
   //     Offers a message to the System.Threading.Tasks.Dataflow.ITargetBlock<TInput>,
   //     giving the target the opportunity to consume or postpone the message.
   //
   // Parameters:
   //   messageHeader:
   //     A System.Threading.Tasks.Dataflow.DataflowMessageHeader instance that represents
   //     the header of the message being offered.
   //
   //   messageValue:
   //     The value of the message being offered.
   //
   //   source:
   //     The System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> offering the message.
   //     This may be null.
   //
   //   consumeToAccept:
   //     Set to true to instruct the target to call System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.ConsumeMessage()
   //     synchronously during the call to System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.OfferMessage(),
   //     prior to returning System.Threading.Tasks.Dataflow.DataflowMessageStatus.Accepted,
   //     in order to consume the message.
   //
   // Returns:
   //     The status of the offered message. If the message was accepted by the target,
   //     System.Threading.Tasks.Dataflow.DataflowMessageStatus.Accepted is returned,
   //     and the source should no longer use the offered message, because it is now
   //     owned by the target. If the message was postponed by the target, System.Threading.Tasks.Dataflow.DataflowMessageStatus.Postponed
   //     is returned as a notification that the target may later attempt to consume
   //     or reserve the message; in the meantime, the source still owns the message
   //     and may offer it to other blocks.If the target would have otherwise postponed
   //     message, but source was null, System.Threading.Tasks.Dataflow.DataflowMessageStatus.Declined
   //     is instead returned. If the target tried to accept the message but missed
   //     it due to the source delivering the message to another target or simply discarding
   //     it, System.Threading.Tasks.Dataflow.DataflowMessageStatus.NotAvailable is
   //     returned. If the target chose not to accept the message, System.Threading.Tasks.Dataflow.DataflowMessageStatus.Declined
   //     is returned. If the target chose not to accept the message and will never
   //     accept another message from this source, System.Threading.Tasks.Dataflow.DataflowMessageStatus.DecliningPermanently
   //     is returned.
   //
   // Exceptions:
   //   System.ArgumentException:
   //     The messageHeader is not valid.-or-consumeToAccept may only be true if provided
   //     with a non-null source.
   DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept);
}

Z kolei bloki będące źródłem danych implementują ISourceBlock:

// Summary:
//     Represents a dataflow block that is a source of data.
//
// Type parameters:
//   TOutput:
//     Specifies the type of data supplied by the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.This
//     type parameter is covariant. That is, you can use either the type you specified
//     or any type that is more derived. For more information about covariance and
//     contravariance, see Covariance and Contravariance in Generics.
public interface ISourceBlock<out TOutput> : IDataflowBlock
{
   // Summary:
   //     Called by a linked System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to
   //     accept and consume a System.Threading.Tasks.Dataflow.DataflowMessageHeader
   //     previously offered by this System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.
   //
   // Parameters:
   //   messageHeader:
   //     The System.Threading.Tasks.Dataflow.DataflowMessageHeader of the message
   //     being consumed.
   //
   //   target:
   //     The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> consuming the message.
   //
   //   messageConsumed:
   //     true if the message was successfully consumed; otherwise, false.
   //
   // Returns:
   //     The value of the consumed message. This may correspond to a different System.Threading.Tasks.Dataflow.DataflowMessageHeader
   //     instance than was previously reserved and passed as the messageHeader to
   //     System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.ConsumeMessage(). The
   //     consuming System.Threading.Tasks.Dataflow.ITargetBlock<TInput> must use the
   //     returned value instead of the value passed as messageValue through System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.OfferMessage().If
   //     the message requested is not available, the return value will be null.
   //
   // Exceptions:
   //   System.ArgumentException:
   //     The messageHeader is not valid.
   //
   //   System.ArgumentNullException:
   //     The target is null.
   [SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "2#")]
   TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed);
   //
   // Summary:
   //     Links the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> to the specified
   //     System.Threading.Tasks.Dataflow.ITargetBlock<TInput>.
   //
   // Parameters:
   //   target:
   //     The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to which to connect
   //     this source.
   //
   //   linkOptions:
   //     A System.Threading.Tasks.Dataflow.DataflowLinkOptions instance that configures
   //     the link.
   //
   // Returns:
   //     An IDisposable that, upon calling Dispose, will unlink the source from the
   //     target.
   //
   // Exceptions:
   //   System.ArgumentNullException:
   //     target is null (Nothing in Visual Basic) or linkOptions is null (Nothing
   //     in Visual Basic).
   IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions);
   //
   // Summary:
   //     Called by a linked System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to
   //     release a previously reserved System.Threading.Tasks.Dataflow.DataflowMessageHeader
   //     by this System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.
   //
   // Parameters:
   //   messageHeader:
   //     The System.Threading.Tasks.Dataflow.DataflowMessageHeader of the reserved
   //     message being released.
   //
   //   target:
   //     The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> releasing the message
   //     it previously reserved.
   //
   // Exceptions:
   //   System.ArgumentException:
   //     The messageHeader is not valid.
   //
   //   System.ArgumentNullException:
   //     The target is null.
   //
   //   System.InvalidOperationException:
   //     The target did not have the message reserved.
   void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
   //
   // Summary:
   //     Called by a linked System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to
   //     reserve a previously offered System.Threading.Tasks.Dataflow.DataflowMessageHeader
   //     by this System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.
   //
   // Parameters:
   //   messageHeader:
   //     The System.Threading.Tasks.Dataflow.DataflowMessageHeader of the message
   //     being reserved.
   //
   //   target:
   //     The System.Threading.Tasks.Dataflow.ITargetBlock<TInput> reserving the message.
   //
   // Returns:
   //     true if the message was successfully reserved; otherwise, false.
   //
   // Exceptions:
   //   System.ArgumentException:
   //     The messageHeader is not valid.
   //
   //   System.ArgumentNullException:
   //     The target is null.
   bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
}

Oczywiście są bloki, które implementują obydwa interfejsy– przetwarzają dane oraz są również źródłem danych. Stwórzmy zatem pierwszą aplikację korzystającą z TPL DataFlows. Użyjemy bloku ActionBlock<T>, który implementuje ITargetBlock – akceptuje dane ale nie jest źródłem danych:

class Program
{        
   static void Main(string[] args)
   {
       var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a));

       for (int i = 0; i < 10; i++)
       {
           actionBlock.Post(i);
       }
       Console.ReadLine();
   }
}

Powyższy blok, po prostu wyświetla przekazany parametr. Funkcja Post, wysyła dane do bloku w sposób “synchroniczny”. W celu wysłania danych całkowicie asynchronicznie należy użyć SendAsync:

class Program
{        
   static void Main(string[] args)
   {
       var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a));

       for (int i = 0; i < 10; i++)
       {
           actionBlock.SendAsync(i);
       }
       Console.ReadLine();
   }
}

Jaka jest różnica? Obie funkcje wysyłają tylko dane – przetwarzanie zawsze jest wykonywane w innym wątku w ActionBlock. Jeśli zatem oprócz wyświetlenia tekstu umieścilibyśmy Thread.Sleep(1000), zarówno Post jak i SendAsync nie blokowałyby wykonania. Synchroniczność\asynchroniczność w tym przypadku dotyczy jedynie w jaki sposób dane są wpompowane do bloku a nie jak są przetwarzane.

Operacje są zatem wykonywane w osobnym wątku. Można nawet zdecydować ile wątków powinno przetwarzać dane:

var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a),new new ExecutionDataflowBlockOptions(){MaxDegreeOfParallelism = 4});

Domyślnie MaxDegreeOfParallelism równy jest 1, co w praktyce oznacza, że operacje wykonywane są sekwencyjnie. Po wpompowaniu wszystkich danych można zakończyć przetwarzanie funkcją Complete:

static void Main(string[] args)
{
  var actionBlock = new ActionBlock<int>(a => Console.WriteLine(a),new ExecutionDataflowBlockOptions(){MaxDegreeOfParallelism = 4});

  for (int i = 0; i < 10; i++)
  {
      actionBlock.SendAsync(i);                
  }
  actionBlock.Complete();
  actionBlock.Completion.Wait();  
  Console.WriteLine("Koniec");
}

Wait po prostu czeka, aż wszystkie elementy zostaną przetworzone. Można “czekać”  również w sposób asynchroniczny:

await actionBlock.Completion;

Przedstawione informacje, póki co nie są czymś, co może w praktyce ułatwić nam pracę. Cała siła TPL DataFlows to modelowanie procesów za pomocą workflow. Aby to uczynić musimy oczywiście stworzyć więcej bloków i połączyć je ze sobą. Wyświetlanie tekstu w sposób asynchroniczny można przecież wykonywać chociażby w zwykłym TPL – w przyszłym wpisie dowiemy się już jak łączyć ze sobą różne bloki w całość. Dzisiaj tak naprawdę zaimplementowaliśmy wzorzec producent\konsument. SendAsync\Post produkuje dane, z kolei ActionBlock konsumuje je.

Leave a Reply

Your email address will not be published.