TPL Dataflows – część VIII (przykład z BufferBlock i JoinBlock)

W kilku ostatnich postach pisałem o blokach grupujących. Obiecywałem, że pokaże kilka przykładów ich zastosowania, w szczególności wyjaśniające zachowanie zachłanne i niezachłanne. W dzisiejszym wpisie, zaprezentuję przykład (lekko zmodyfikowany z MSDN) jak JoinBlock i BufferBlock mogą zostać zastosowane.

Wyobraźmy sobie, że mamy kilka źródeł danych. Na przykład: WCF, pliki oraz pamięć. Odczyt z WCF prawdopodobnie będzie najwolniejszy. W zależności od lokalizacji, odczyt pliku prawdopodobnie będzie dużo szybszy. Z kolei czytanie danych z pamięci będzie zdecydowanie najszybsze. Stwórzmy trzy klasy reprezentujące powyższe źródła danych:

internal interface IDataSource
{
    int Read();
}
class MemoryResource:IDataSource
{
   public int Read()
   {
       Console.WriteLine("Odczyt danych z memory resource.");
       Thread.Sleep(1000);

       return 1;
   }
}
class WcfResource : IDataSource
{
   public int Read()
   {
       Console.WriteLine("Odczyt danych z wcf resource.");
       Thread.Sleep(7000);

       return 1;
   }
}
class FileResource : IDataSource
{
   public int Read()
   {
       Console.WriteLine("Odczyt danych z file resource.");
       Thread.Sleep(4000);

       return 1;
   }
}

Następnie, aby wykonać jakąś fikcyjną operację, należy mieć dane z WCFResource oraz MemoryResource lub z FileResource i MemoryResource. Można podzielić logię na dwa JoinBlock. Dane z MemoryBlock zawsze są potrzebne oraz w zależności od kontekstu, wymagane jest pobranie informacji z WcfResource lub MemoryResource.  Ponadto, będziemy musieli kolejkować (buforować) przychodzące dane:

image

Dlaczego musimy korzystać z BufferBlock? Zakładamy, że dane ciągle przychodzą z podanych źródeł i należy je obsłużyć jak najszybciej. Można tego dokonać  wykonując operacje I albo operację II, w zależności jaka para danych przyszła. Wyłącznie MemoryResource może zostać użyty w obydwu operacjach.

Zamieńmy zatem powyższy schemat na kod. Najpierw musimy kolejkować\buforować przychodzące dane:

var memoryResourceBuffer = new BufferBlock<int>();
var networkResourceBuffer = new BufferBlock<int>();
var wcfResourceBuffer = new BufferBlock<int>();

Kolejnym etapem jest stworzenie dwóch JoinBlock i odpowiednie podtrzepienie danych z właśnie utworzonych buforów:

var joinBlock1 = new JoinBlock<int, int>();
memoryResourceBuffer.LinkTo(joinBlock1.Target1);
wcfResourceBuffer.LinkTo(joinBlock1.Target2);
joinBlock1.LinkTo(new ActionBlock<Tuple<int, int>>(data=>Operation1(data)));

var joinBlock2 = new JoinBlock<int, int>();
memoryResourceBuffer.LinkTo(joinBlock2.Target1);
networkResourceBuffer.LinkTo(joinBlock2.Target2);
joinBlock2.LinkTo(new ActionBlock<Tuple<int, int>>(data => Operation2(data)));

//..

private static void Operation1(Tuple<int, int> data)
{
  Thread.Sleep(5000);
  Console.WriteLine("Wykonywanie operacji I");
}
private static void Operation2(Tuple<int, int> data)
{
  Thread.Sleep(5000);
  Console.WriteLine("Wykonywanie operacji II");
}

Każdy JoinBlock (patrz poprzednie posty) eksponuje Target1 oraz Target2, na które wysyłamy dane z buforów. Następnie łączymy wynik, który składa się z danych z obydwu źródeł i wysyłamy je do kolejnego bloku, którym jest ActionBlock: operation1 oraz operation2.

Na końcu musimy “wpompować” dane do systemu np.:

var memoryResource=new MemoryResource();
var wcfResource = new WcfResource();
var fileResource = new FileResource();

for (int i = 0; i < 5; i++)
{
    memoryResourceBuffer.SendAsync(memoryResource.Read());
    networkResourceBuffer.SendAsync(wcfResource.Read());
    wcfResourceBuffer.SendAsync(fileResource.Read());
}

Jeśli teraz byśmy uruchomili aplikacje to zobaczyliśmy, że wyłącznie operacja I jest wykonywana:

image

Dlaczego? Po prostu domyślnie, wszystkie bloki grupujące są zachłanne i JoinBlockI przyjmuje wszystkie dane, nawet jak nie jest w stanie ich obsłużyć. W końcu po otrzymaniu pierwszej pary z MemoryResource i WcfResource, JoinBlock powinien ustąpić na rzecz drugiego bloku. Aby zmienić zachowanie na niezachłanne wystarczy:

var joinBlock1 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });

Na ekranie teraz powinniśmy zobaczyć wyniki pojawiające się z dwóch operacji:

image

W trybie zachłannym wszystkie wiadomości są akceptowane, nawet jak nie ma odpowiednika na drugim Target. Innymi słowy, gdy mamy 10 wiadomości na Target I i 0 na Target II, w trybie zachłannym wszystkie 10 zostanie zaakceptowanych. W trybie niezachłannym tylko pierwsza zostanie zaakceptowana, z kolei pozostałe 9 będą miały status “postponed”, co oznacza, że mogą one być wykorzystane przez inny JoinBlock. Dzięki trybowi niezachłannemu, jesteśmy w stanie balansować ruch między tymi dwoma blokami.

Na zakończenie cały listing kodu:

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;


namespace ConsoleApplication4
{
    internal interface IDataSource
    {
         int Read();
    }
    class MemoryResource:IDataSource
    {
        public int Read()
        {
            Console.WriteLine("Odczyt danych z memory resource.");
            Thread.Sleep(1000);

            return 1;
        }
    }
    class WcfResource : IDataSource
    {
        public int Read()
        {
            Console.WriteLine("Odczyt danych z wcf resource.");
            Thread.Sleep(7000);

            return 1;
        }
    }
    class FileResource : IDataSource
    {
        public int Read()
        {
            Console.WriteLine("Odczyt danych z file resource.");
            Thread.Sleep(4000);

            return 1;
        }
    }

    internal class Program
    {
        private static void Main()
        {
            var memoryResourceBuffer = new BufferBlock<int>();
            var networkResourceBuffer = new BufferBlock<int>();
            var wcfResourceBuffer = new BufferBlock<int>();

            var joinBlock1 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
            memoryResourceBuffer.LinkTo(joinBlock1.Target1);
            wcfResourceBuffer.LinkTo(joinBlock1.Target2);
            joinBlock1.LinkTo(new ActionBlock<Tuple<int, int>>(data=>Operation1(data)));

            var joinBlock2 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
            memoryResourceBuffer.LinkTo(joinBlock2.Target1);
            networkResourceBuffer.LinkTo(joinBlock2.Target2);
            joinBlock2.LinkTo(new ActionBlock<Tuple<int, int>>(data => Operation2(data)));

            var memoryResource=new MemoryResource();
            var wcfResource = new WcfResource();
            var fileResource = new FileResource();

            for (int i = 0; i < 5; i++)
            {
                memoryResourceBuffer.SendAsync(memoryResource.Read());
                networkResourceBuffer.SendAsync(wcfResource.Read());
                wcfResourceBuffer.SendAsync(fileResource.Read());
            }

            Console.ReadLine();
        } 
        private static void Operation1(Tuple<int, int> data)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Wykonywanie operacji I");
        }
        private static void Operation2(Tuple<int, int> data)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Wykonywanie operacji II");
        }
    }
}

Leave a Reply

Your email address will not be published.