W kilku ostatnich postach pisałem o blokach grupujących. Obiecywałem, że pokaże kilka przykładów ich zastosowania, w szczególności wyjaśniające zachowanie zachłanne i niezachłanne. W dzisiejszym wpisie, zaprezentuję przykład (lekko zmodyfikowany z MSDN) jak JoinBlock i BufferBlock mogą zostać zastosowane.
Wyobraźmy sobie, że mamy kilka źródeł danych. Na przykład: WCF, pliki oraz pamięć. Odczyt z WCF prawdopodobnie będzie najwolniejszy. W zależności od lokalizacji, odczyt pliku prawdopodobnie będzie dużo szybszy. Z kolei czytanie danych z pamięci będzie zdecydowanie najszybsze. Stwórzmy trzy klasy reprezentujące powyższe źródła danych:
internal interface IDataSource { int Read(); } class MemoryResource:IDataSource { public int Read() { Console.WriteLine("Odczyt danych z memory resource."); Thread.Sleep(1000); return 1; } } class WcfResource : IDataSource { public int Read() { Console.WriteLine("Odczyt danych z wcf resource."); Thread.Sleep(7000); return 1; } } class FileResource : IDataSource { public int Read() { Console.WriteLine("Odczyt danych z file resource."); Thread.Sleep(4000); return 1; } }
Następnie, aby wykonać jakąś fikcyjną operację, należy mieć dane z WCFResource oraz MemoryResource lub z FileResource i MemoryResource. Można podzielić logię na dwa JoinBlock. Dane z MemoryBlock zawsze są potrzebne oraz w zależności od kontekstu, wymagane jest pobranie informacji z WcfResource lub MemoryResource. Ponadto, będziemy musieli kolejkować (buforować) przychodzące dane:
Dlaczego musimy korzystać z BufferBlock? Zakładamy, że dane ciągle przychodzą z podanych źródeł i należy je obsłużyć jak najszybciej. Można tego dokonać wykonując operacje I albo operację II, w zależności jaka para danych przyszła. Wyłącznie MemoryResource może zostać użyty w obydwu operacjach.
Zamieńmy zatem powyższy schemat na kod. Najpierw musimy kolejkować\buforować przychodzące dane:
var memoryResourceBuffer = new BufferBlock<int>(); var networkResourceBuffer = new BufferBlock<int>(); var wcfResourceBuffer = new BufferBlock<int>();
Kolejnym etapem jest stworzenie dwóch JoinBlock i odpowiednie podtrzepienie danych z właśnie utworzonych buforów:
var joinBlock1 = new JoinBlock<int, int>(); memoryResourceBuffer.LinkTo(joinBlock1.Target1); wcfResourceBuffer.LinkTo(joinBlock1.Target2); joinBlock1.LinkTo(new ActionBlock<Tuple<int, int>>(data=>Operation1(data))); var joinBlock2 = new JoinBlock<int, int>(); memoryResourceBuffer.LinkTo(joinBlock2.Target1); networkResourceBuffer.LinkTo(joinBlock2.Target2); joinBlock2.LinkTo(new ActionBlock<Tuple<int, int>>(data => Operation2(data))); //.. private static void Operation1(Tuple<int, int> data) { Thread.Sleep(5000); Console.WriteLine("Wykonywanie operacji I"); } private static void Operation2(Tuple<int, int> data) { Thread.Sleep(5000); Console.WriteLine("Wykonywanie operacji II"); }
Każdy JoinBlock (patrz poprzednie posty) eksponuje Target1 oraz Target2, na które wysyłamy dane z buforów. Następnie łączymy wynik, który składa się z danych z obydwu źródeł i wysyłamy je do kolejnego bloku, którym jest ActionBlock: operation1 oraz operation2.
Na końcu musimy “wpompować” dane do systemu np.:
var memoryResource=new MemoryResource(); var wcfResource = new WcfResource(); var fileResource = new FileResource(); for (int i = 0; i < 5; i++) { memoryResourceBuffer.SendAsync(memoryResource.Read()); networkResourceBuffer.SendAsync(wcfResource.Read()); wcfResourceBuffer.SendAsync(fileResource.Read()); }
Jeśli teraz byśmy uruchomili aplikacje to zobaczyliśmy, że wyłącznie operacja I jest wykonywana:
Dlaczego? Po prostu domyślnie, wszystkie bloki grupujące są zachłanne i JoinBlockI przyjmuje wszystkie dane, nawet jak nie jest w stanie ich obsłużyć. W końcu po otrzymaniu pierwszej pary z MemoryResource i WcfResource, JoinBlock powinien ustąpić na rzecz drugiego bloku. Aby zmienić zachowanie na niezachłanne wystarczy:
var joinBlock1 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
Na ekranie teraz powinniśmy zobaczyć wyniki pojawiające się z dwóch operacji:
W trybie zachłannym wszystkie wiadomości są akceptowane, nawet jak nie ma odpowiednika na drugim Target. Innymi słowy, gdy mamy 10 wiadomości na Target I i 0 na Target II, w trybie zachłannym wszystkie 10 zostanie zaakceptowanych. W trybie niezachłannym tylko pierwsza zostanie zaakceptowana, z kolei pozostałe 9 będą miały status “postponed”, co oznacza, że mogą one być wykorzystane przez inny JoinBlock. Dzięki trybowi niezachłannemu, jesteśmy w stanie balansować ruch między tymi dwoma blokami.
Na zakończenie cały listing kodu:
using System; using System.Threading; using System.Threading.Tasks.Dataflow; namespace ConsoleApplication4 { internal interface IDataSource { int Read(); } class MemoryResource:IDataSource { public int Read() { Console.WriteLine("Odczyt danych z memory resource."); Thread.Sleep(1000); return 1; } } class WcfResource : IDataSource { public int Read() { Console.WriteLine("Odczyt danych z wcf resource."); Thread.Sleep(7000); return 1; } } class FileResource : IDataSource { public int Read() { Console.WriteLine("Odczyt danych z file resource."); Thread.Sleep(4000); return 1; } } internal class Program { private static void Main() { var memoryResourceBuffer = new BufferBlock<int>(); var networkResourceBuffer = new BufferBlock<int>(); var wcfResourceBuffer = new BufferBlock<int>(); var joinBlock1 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false }); memoryResourceBuffer.LinkTo(joinBlock1.Target1); wcfResourceBuffer.LinkTo(joinBlock1.Target2); joinBlock1.LinkTo(new ActionBlock<Tuple<int, int>>(data=>Operation1(data))); var joinBlock2 = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false }); memoryResourceBuffer.LinkTo(joinBlock2.Target1); networkResourceBuffer.LinkTo(joinBlock2.Target2); joinBlock2.LinkTo(new ActionBlock<Tuple<int, int>>(data => Operation2(data))); var memoryResource=new MemoryResource(); var wcfResource = new WcfResource(); var fileResource = new FileResource(); for (int i = 0; i < 5; i++) { memoryResourceBuffer.SendAsync(memoryResource.Read()); networkResourceBuffer.SendAsync(wcfResource.Read()); wcfResourceBuffer.SendAsync(fileResource.Read()); } Console.ReadLine(); } private static void Operation1(Tuple<int, int> data) { Thread.Sleep(5000); Console.WriteLine("Wykonywanie operacji I"); } private static void Operation2(Tuple<int, int> data) { Thread.Sleep(5000); Console.WriteLine("Wykonywanie operacji II"); } } }