Category Archives: Programowanie rozproszone

CONSUMER DRIVEN CONTRACTS w PACT-NET – implementacja testów dostawcy

W poprzednim wpisie przedstawiłem framework PACT.NET. Zdołaliśmy zaimplementować testy po stronie konsumentów. Wynikiem tego, był wygenerowany plik JSON zawierający nagrane testy. Dla przypomnienia, test po stronie konsumenta wyglądał następująco:

[TestFixture]
public class ConsumerTests
{
    private IMockProviderService _mockProviderService;
    private IPactBuilder _pactProvider;
 
    [OneTimeSetUp]
    public void OneTimeSetUp()
    {
        _pactProvider = new PactBuilder()
            .ServiceConsumer("ConsumerName")
            .HasPactWith("ProviderName");
 
        _mockProviderService = _pactProvider.MockService(3423,false);
    }
 
    [OneTimeTearDown]
    public void OneTimeTearDown()
    {
        _pactProvider.Build();
    }
 
    [Test]
    public void Should_Return_FirstName()
    {
        //Arrange
        _mockProviderService.Given("there are some persons data")
            .UponReceiving("a request to retrieve all persons")
            .With(new ProviderServiceRequest
            {
                Method = HttpVerb.Get,
                Path = "/api/persons"     
            })
            .WillRespondWith(new ProviderServiceResponse
            {
                Status = 200,
                Headers = new Dictionary<string, string>
                {
                    { "Content-Type", "application/json; charset=utf-8" }
                },
                Body = new[]
                {
                    new
                    {
                        FirstName = "Piotr",
                    }
                }
            });
 
        var consumer = new PersonsApiClient("http://localhost:3423");
 
        //Act
        var persons = consumer.GetAllPersons();
 
        //Assert
        CollectionAssert.IsNotEmpty(persons);
        CollectionAssert.AllItemsAreNotNull(persons.Select(x=>x.FirstName));
 
        _mockProviderService.VerifyInteractions();
    }
}

Z kolei wygenerowany JSON:

{
  "provider": {
    "name": "ProviderName"
  },
  "consumer": {
    "name": "ConsumerName"
  },
  "interactions": [
    {
      "description": "a request to retrieve all persons",
      "provider_state": "there are some persons data",
      "request": {
        "method": "get",
        "path": "/api/persons"
      },
      "response": {
        "status": 200,
        "headers": {
          "Content-Type": "application/json; charset=utf-8"
        },
        "body": [
          {
            "FirstName": "Piotr"
          }
        ]
      }
    }
  ],
  "metadata": {
    "pactSpecificationVersion": "1.1.0"
  }
}

Przejdźmy zatem do testu po stronie dostawcy:

    [TestFixture]
    public class ProducerTests
    {
        [Test]
        public void Ensure_That_Consumer_Requirements_Are_Met()
        {
            var config = new PactVerifierConfig();
            IPactVerifier pactVerifier = new PactVerifier(() => { }, () => { }, config);

            pactVerifier
                .ProviderState(
                    "there are some persons data",
                    setUp: DataSetup,tearDown:DataTearDown);
       
            using (var testServer = TestServer.Create<Startup>())
            {
                pactVerifier
                   .ServiceProvider("Persons API", testServer.HttpClient)
                   .HonoursPactWith("Consumer")
                   .PactUri(@"consumername-providername.json")
                   .Verify();
            }
        }

        private void DataTearDown()
        {
        }


        private void DataSetup()
        {
        }

    }

W celu uruchomienia serwera, używamy Microsoft.Owin.Testing czyli klasy TestServer. W momencie odpalenia testu, zostanie zatem stworzony serwer z naszą aplikacją. Następnie “odgrywany” będzie test wygenerowany przez konsumenta. Operujemy na prawdziwych danych, a nie na mock’ach, stąd metody DataSetup oraz DataTearDown w praktyce zawierają odpowiedni kod, dodający oraz usuwający dane testowe.

Po wykonaniu testu, zostanie wygenerowany plik persons_api_verifier.log:

2016-06-01 19:54:01.894 +01:00 [Debug] Verifying a Pact between ConsumerName and ProviderName
  Given there are some persons data
    a request to retrieve all persons
      with GET /api/persons
        returns a response which
          has status code 200
          includes headers
            'Content-Type' with value application/json; charset=utf-8
          has a matching body (FAILED - 1)

Failures:

1) Expected: [
  {
    "FirstName": "Piotr"
  }
], Actual: [
  {
    "FirstName": "Piotr",
    "LastName": "Zielinski"
  },
  {
    "FirstName": "first name",
    "LastName": "last name"
  }
]

Widzimy, że test zakończył się niepowodzeniem, ponieważ oczekiwana zawartość Content jest inna niż ta zwrócona przez prawdziwy serwis.

Za pomocą PathVerifier definiujemy, które testy chcemy wykonać. W naszym przypadku jest tylko jeden. W praktyce definiuje się wiele stanów i kilka plików JSON.

Consumer driven contracts – pact-net

O Consumer-Driven-Contracts pisałem już tutaj. Sam koncept jest dosyć prosty, ale bez odpowiednich narzędzi może być mozolny w wdrożeniu.
Pack-net, jak nie trudno się domyślić jest implementacją biblioteki pact w C#. Kod źródłowy możną znaleźć na GitHub.

Wiemy, aby zaimplementować Consumer-Driven-Contracts (CDC) musimy napisać testy zarówno po stronie konsumenta jak i dostawcy. Zwykle, konsumenci nie testują prawdziwej usługi, a jedynie operują na mock’ach. Następnie dostawca, wykona wszystkie testy konsumenckie na prawdziwej usłudzę. Sztuczka polega na tym, aby w jakiś sposób przekazać do dostawcy testy zdefiniowane przez konsumenta. Nie chcemy generować biblioteki DLL i jej przekazywać. Pact wygeneruje za nas plik JSON, ze wszystkimi testami. Podsumowując za pomocą Pact:

1. Konsumenci definiują klasyczne testy jednostkowe. Wykonywane są one w izolacji na mock’u usługi.
2. Testy jednostkowe są “nagrywane” i  zapisywane w pliku JSON.
3. Plik (lub pliki w przypadku wielu konsumentów) JSON  są odczytywane przez producenta. Nagrane testy są odtwarzane, ale będą już wykonywane na prawdziwej usłudze.

Jeśli tworzymy usługę, wtedy krok trzeci gwarantuje nam, że spełniamy wszystkie wymogi konsumentów. Jeśli konsument A potrzebuje np. pole FirstName w odpowiedzi, wtedy zdefiniuje to za pomocą testu jednostkowego, a dostawca następnie zweryfikuje to.
Pact dostarcza łatwy interfejs i nie musimy się martwić np. odpaleniem serwer’a web w testach dostawcy, które zawsze wykonywane są na prawdziwej usłudze. Podobnie, dzięki Pact łatwo tworzyć mock’a usługi. Po prostu wystarczy zdefiniować oczekiwane nagłówki czy zawartość ciała.

Myślę, że najłatwiej będzie to zrozumieć na przykładzie. Załóżmy, że tworzymy usługę zwracającą listę osób. Nasza solucja zatem będzie składać się z:
1. Provider.Web – usługa
2. Provider.Web.Tests – testy dostawcy (wykonywane na prawdziwej usłudze)
3. Consumer – prosty klient łączący się z usługą
4. Consumer.Tests – testy konsumenckie (wykonywane na mock’u)

W dzisiejszym wpisie zajmiemy się punktami 1,3 oraz 4, a następnym razem zajmiemy się testami dostawcy.

Zacznijmy zatem od dostawcy (punkt 1):

    public class PersonsController : ApiController
    {
        public PersonInfo[] GetAll()
        {
            var person1 = new PersonInfo {FirstName = "Piotr", LastName = "Zielinski"};
            var person2 = new PersonInfo { FirstName = "first name", LastName = "last name" };

            return new PersonInfo[] {person1, person2};
        }
    }

    public class PersonInfo
    {
        public string FirstName { get; set; }
        public string LastName { get; set; }
    }

Zwykłe WebApi, którego odpowiedź wygląda następująco:

<ArrayOfPersonInfo xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.datacontract.org/2004/07/Provider.Web.Controllers">
<PersonInfo>
<FirstName>Piotr</FirstName>
<LastName>Zielinski</LastName>
</PersonInfo>
<PersonInfo>
<FirstName>first name</FirstName>
<LastName>last name</LastName>
</PersonInfo>
</ArrayOfPersonInfo>

Następnie zdefiniujemy klienta (konsumenta – punkt 3):

   public class PersonsApiClient
    {
        public string BaseUri { get; set; }

        public PersonsApiClient(string baseUri)
        {
            BaseUri = baseUri;
        }

        public PersonInfo[] GetAllPersons()
        {
            var client = new RestClient(BaseUri);
            var persons = client.Get<List<PersonInfo>>(new RestRequest("api/persons")).Data;

            return persons.ToArray();
        }
    }

Ostatni etap w dzisiejszym wpisie to zdefiniowanie testu konsumenta (punkt 4):

    [TestFixture]
    public class ConsumerTests
    {
        private IMockProviderService _mockProviderService;
        private IPactBuilder _pactProvider;

        [OneTimeSetUp]
        public void OneTimeSetUp()
        {
            _pactProvider = new PactBuilder()
                .ServiceConsumer("ConsumerName")
                .HasPactWith("ProviderName");

            _mockProviderService = _pactProvider.MockService(3423,false);
        }

        [OneTimeTearDown]
        public void OneTimeTearDown()
        {
            _pactProvider.Build();
        }

        [Test]
        public void Should_Return_FirstName()
        {
            //Arrange
            _mockProviderService.Given("there are some persons data")
                .UponReceiving("a request to retrieve all persons")
                .With(new ProviderServiceRequest
                {
                    Method = HttpVerb.Get,
                    Path = "/api/persons"      
                })
                .WillRespondWith(new ProviderServiceResponse
                {
                    Status = 200,
                    Headers = new Dictionary<string, string>
                    {
                        { "Content-Type", "application/json; charset=utf-8" }
                    },
                    Body = new[]
                    {
                        new
                        {
                            FirstName = "Piotr",
                        }
                    }
                });

            var consumer = new PersonsApiClient("http://localhost:3423");

            //Act
            var persons = consumer.GetAllPersons();

            //Assert
            CollectionAssert.IsNotEmpty(persons);
            CollectionAssert.AllItemsAreNotNull(persons.Select(x=>x.FirstName));

            _mockProviderService.VerifyInteractions();
        }
    }

Za pomocą klasy PactBuilder budujemy mock usługi czyli IMockProviderService. Następnie definiujemy wejścia i wyjścia mocka. Widzimy, że jeśli zapytanie GET przyjdzie na /api/persons wtedy zwracamy jeden obiekt, zawierający imię. Za pomocą MockProvider możemy zasymulować dowolne zachowanie usługi. W naszym przypadku jest to po prostu GET i odpowiedź. W bardziej zaawansowanych przypadkach dochodzą do tego parametry query, nagłówki, metody POST, DELETE itp.

Mock zwróci zatem tylko jeden obiekt, zawierający imię Za pomocą następnych asercji, zweryfikujemy, że faktycznie imię musi zawierać jakąś treść. Jeśli pole FirstName na tym etapie jest puste, oznacza to, że implementacja klienta jest błędną – np. deserialziacja nie działa tak jak należy. Widzimy, że port 3423 został dobrany losowo, ponieważ nie ma to znaczenia dla mock’a.
VerifyInteractions sprawdzi również tzw. interakcje. W powyższym przypadku oznacza to, że dokładnie jedno zapytanie GET “api/persons” zostało wysłane do usługi. Jeśli nasz klient zawierałby błąd, np. dwukrotne wysłanie zapytania, wtedy test również zakończyłby się błędem.

Po wykonaniu testu, zostanie on nagrany i zapisany w pliku “consumername-providername.json”:

{
  "provider": {
    "name": "ProviderName"
  },
  "consumer": {
    "name": "ConsumerName"
  },
  "interactions": [
    {
      "description": "a request to retrieve all persons",
      "provider_state": "there are some persons data",
      "request": {
        "method": "get",
        "path": "/api/persons"
      },
      "response": {
        "status": 200,
        "headers": {
          "Content-Type": "application/json; charset=utf-8"
        },
        "body": [
          {
            "FirstName": "Piotr"
          }
        ]
      }
    }
  ],
  "metadata": {
    "pactSpecificationVersion": "1.1.0"
  }
}

W kolejnym wpisie zajmiemy się implementacją testów dostawcy, które będą wykorzystywać właśnie wygenerowany plik JSON, w celu wykonania takiego samego testu na prawdziwej usłudze.

AKKA.NET – zdalni aktorzy

Jak wspomniałem w jednym z wcześniejszych już wpisów, nie ma znaczenia, gdzie aktor jest zlokalizowany. Dzięki AKKA.NET jest to szczegół  konfiguracyjny. Jeśli pewnego dnia, stwierdzimy, że wykonywanie obliczeń na jednym komputerze nie wystarcza, wtedy po prostu  zmieniamy konfigurację, aby hostować danego aktora gdzieś indziej. Framework zadba o komunikację (TCP) między węzłami znajdującymi się w innych sieciach. W ten sposób, bardzo łatwo jest skalować logikę w następujący sposób: wątek->proces->komputer->sieć komputerów.

W dokumentacji znajdziemy szczegóły, ale moim zdaniem brakuje tam prostego przykładu polegającego na wysłaniu wiadomości z węzła A do B.

Zacznijmy od stworzenia struktury projektu. Warto stworzyć jedną bibliotekę, która będzie zawierać wyłącznie kod aktorów. W naszym przypadku będzie to EchoServerActor:

    public class EchoServerActor : ReceiveActor
    {
        public EchoServerActor()
        {
            Receive<EchoMsg>(msg =>
            {
                Sender.Tell($"Server:{AppDomain.CurrentDomain.FriendlyName},{msg.Text}");
            });
        }
    }

    public class EchoMsg
    {
        public string Text { get; set; }
    }

Jak widać, kod nie różni się niczym od implementacji “lokalnych” aktorów. Po odebraniu wiadomości, wyświetlamy na ekranie nazwę domeny oraz przesłaną wiadomość. Wyświetlenie aktualnej domeny będzie pomocne, w analizie gdzie kod został tak naprawdę wykonany. EchoServeActor będzie służył nam jako “serwer”. W praktyce, komunikacja odbywa się klient-klient i nie należy projektować systemów w sposób scentralizowany. Awaria jednego aktora nie powinna spowodować paraliżu całego systemu. W poście jednak, chcemy napisać jak najprostszy fragment kodu, stąd te uproszczenie.

Jako “klient” posłuży nam następująca klasa:

    public class EchoReceiverActor : ReceiveActor
    {
        public EchoReceiverActor()
        {
            Receive<string>(msg =>
            {
                Console.WriteLine($"Received on {AppDomain.CurrentDomain.FriendlyName}:{msg}");
            });

            Receive<EchoMsg>(msg =>
            {
                var remoteActor = Context.ActorSelection("akka.tcp://Server@localhost:8081/user/EchoServerActor");
                remoteActor.Tell(msg);
            });
        }
    }

Po odebraniu wiadomości “EchoMsg” uzyskujemy referencję za pomocą ścieżki (więcej szczegółów tutaj). W praktyce, nie chcemy umieszczać w kodzie aktora, informacji o jego lokalizacji – powinno to mieć miejsce np. w pliku konfiguracyjnym. Powyższy kod jest zatem złapaniem bardzo ważnej zasady o neutralności fizycznej lokalizacji aktora. W ramach wpisu chcę napisać kod jednak jak najprościej tylko można. W każdym razie, po odebraniu EchoMsg, EchoReceiverActor wyśle wiadomość do zdalnego aktora.

Z kolei jeśli przyjdzie wiadomość w formie czystego tekstu (string), wtedy wyświetlamy ją. Nie trudno domyślić się, że w naszym przykładzie taka wiadomość będzie pochodzić od zdalnego aktora.

Innymi słowy, najpierw po stronie klienta wysyłamy EchoMsg do EchoReceiverActor. Aktor z kolei prześle tą wiadomość zdalnie do EchoServerActor, który z kolei odpowie tekstem do EchoReceiverActor.

Przejdźmy teraz do konfiguracji “serwera”:

         var config = ConfigurationFactory.ParseString(@"
akka {  
    actor {
        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
    }
    remote {
        helios.tcp {
            transport-class = ""Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote""
            applied-adapters = []
            transport-protocol = tcp
            port = 8081
            hostname = localhost
        }
    }
}
");

        using (var system = ActorSystem.Create("Server", config))
        {
           system.ActorOf<EchoServerActor>("EchoServerActor");

           Console.ReadLine();
        }

Konfiguracja w prawdziwych projektach umieszczana jest w App\Web.Config, tak aby mnożna ją było zmienić bez potrzeby rekompilacji. Widzimy, że serwer będzie nasłuchiwać na porcie 8081. Zostanie również stworzony system o nazwie “Server” oraz pojedynczy aktor o nazwie “EchoServerActor”.

Klient z kolei wygląda następująco:

            Console.ReadLine();

            var config = ConfigurationFactory.ParseString(@"
akka {  
    actor {
        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
    }
    remote {
        helios.tcp {
            transport-class = ""Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote""
		    applied-adapters = []
		    transport-protocol = tcp
		    port = 0
		    hostname = localhost
        }
    }
}
");

    using (var system = ActorSystem.Create("Client", config))
    {
         var receiver = system.ActorOf(Props.Create<EchoReceiverActor>());

         receiver.Tell(new EchoMsg { Text = "Hello world" });
         Console.ReadLine();
    }

W przypadku “klienta” nie musimy określać portu. Jak wiemy, wszystko działa na zasadzie klient-klient, ale w kodzie sami nie musimy nic samemu wysyłać do klienta – stąd nie ma to znaczenia. Wartość zero oznacza, że port zostanie wybrany automatycznie. Następnie wysyłamy wiadomośc EchoMsg do aktora EchoReceiverActor. Jak wiemy z powyższego kodu, EchoReceiverActor jest hostowany w systemie “Client”. Następnie wyśle on wiadomość do zdalnego systemu “Server”.

Po uruchomieniu serwera, zobaczymy, że faktycznie system Server nasłuchuje na 8081:

1

Z kolei po uruchomieniu klienta, zobaczymy, że nasłuchuje on na automatycznie wybranym porcie 8591:

2

Z powyższego screenu również widać, że wiadomość została z powrotem przesłana do klienta. Dzięki wyświetleniu nazwy domeny, widzimy kiedy wiadomość została odebrana przez serwer (echo) i przesłana do kienta.

AKKA.NET – Przykład obsługi błędów

W poprzednim wpisie pokazałem, w jaki sposób możemy zaprojektować obsługę błędów. Jak widać mamy do dyspozycji sporo opcji. Z punktu widzenia AKKA.NET nie jest to jednak tak skomplikowane. Wystarczy przeładować jedną metodę i zwrócić odpowiedni obiekt.

Tak jak w poprzednim wpisie będziemy testować kod na następującym “systemie”:

controller

Dla przypomnienia nasz ApplicationUserActor wygląda następująco:

public class ApplicationUserActor : UntypedActor
    {
        private readonly string _userName;

        public ApplicationUserActor(string userName)
        {
            _userName = userName;
        }

        protected override void OnReceive(object message)
        {
            Console.WriteLine("Received by {0}: {1}", _userName, message);
        }

        protected override void PreStart()
        {
            Console.WriteLine("{0}: PreStart",_userName);

          
            base.PreStart();
        }

        protected override void PostStop()
        {
            Console.WriteLine("{0}: PostStop", _userName);
            base.PostStop();
        }

        protected override void PreRestart(Exception reason, object message)
        {
            Console.WriteLine("{0}: PreRestart", _userName);
            base.PreRestart(reason, message);
        }

        protected override void PostRestart(Exception reason)
        {
            Console.WriteLine("{0}: PostRestart", _userName);
            base.PostRestart(reason);
        }
    }

Póki co niewiele mamy tam kodu – głównie hooking, które pomogą nam w zrozumieniu propagacji błędów.
Zmodyfikujmy metodę OnReceived tak, aby zasymulować wyjątek:

        protected override void OnReceive(object message)
        {
            if (message.ToString() == "error")
                throw new ArgumentException();

            Console.WriteLine("Received by {0}: {1}", _userName, message);
        }

W celu zdefiniowania obsługi błędów wystarczy przeciążyć metodę SupervisorStrategy aktora zarządzającego. Jeśli chcemy więc obsłużyć wyjątek w ApplicationUserActor, wtedy węzeł zarządzający (rodzic) to ApplicationUserControllerActor. Kod:

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy((exception) =>
            {
                if (exception is ArgumentException)
                    return Directive.Restart;

                return Directive.Escalate;
            });
        }

W przykładzie wybraliśmy strategię OneForOneStrategy, którą opisałem już w poprzednim wpisie. W skrócie, rodzeństwo węzła, który spowodował wyjątek, nie będzie odgrywało tutaj żadnej roli. Wystarczy, że przekażemy wyrażenie lambda, które określa co należy zrobić w zależności od typu wyjątku. W powyższym przykładzie restartujemy aktora. Tak jak napisałem w poprzednim poście, mamy cztery sposoby reakcji:

  public enum Directive
  {
    Resume,
    Restart,
    Escalate,
    Stop,
  }

W celu zaprezentowania efektu, stwórzmy dwóch aktorów i wyślijmy serię wiadomości:

            var system = ActorSystem.Create("FooHierarchySystem");
            IActorRef userControllerActor =
                system.ActorOf<ApplicationUserControllerActor>("ApplicationUserControllerActor");

            userControllerActor.Tell(new AddUser("Piotr"));
            userControllerActor.Tell(new AddUser("Pawel"));
            var actor1 = system.ActorSelection("/user/ApplicationUserControllerActor/Piotr");
            var actor2 = system.ActorSelection("/user/ApplicationUserControllerActor/Pawel");

            Console.ReadLine();
            actor1.Tell("Sample message I");
            Console.ReadLine();
            actor1.Tell("error");
            Console.ReadLine();
            actor1.Tell("Sample message II");

            Console.ReadLine();

1
Widzimy, że w momencie wystąpienia błędu, aktor został zrestartowany. Ze screenu również można zauważyć, że kolejne wiadomości zostaną przetworzone. Stan wewnętrzny został zrestartowany, ale nie kolejka wiadomości. W celu zademonstrowania, że stan wewnętrzny faktycznie jest wymazywany (ponieważ tworzona jest nowa instancja), dodajmy prywatne pole do klasy:

    public class ApplicationUserActor : UntypedActor
    {
        private readonly string _userName;
        private string _internalState;
      ...
    }

InternalState jest wyświetlany i ustawiany w OnReceive:

        protected override void OnReceive(object message)
        {
            Console.WriteLine("{0}:Internal State: {1}",_userName,_internalState);

            if (message.ToString() == "error")
                throw new ArgumentException();

            _internalState = message.ToString();

            Console.WriteLine("Received by {0}: {1}", _userName, message);
        }

Teraz widzimy, że po wystąpieniu wyjątku, InternalState będzie pusty:
2

Analogicznie, spróbujmy zmienić dyrektywę restart na resume:

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy((exception) =>
            {
                if (exception is ArgumentException)
                    return Directive.Resume;

                return Directive.Escalate;
            });
        }

Po uruchomieniu programu, przekonamy się, że stan wewnętrzny nie jest usuwany:
3

Zmieńmy również strategię na AllForOneStrategy:

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new AllForOneStrategy((exception) =>
            {
                if (exception is ArgumentException)
                    return Directive.Restart;

                return Directive.Escalate;
            });
        }

Efekt będzie taki, że wszystkie węzły podrzędne zostaną zrestartowane:
4

Jeśli w jakimś aktorze nie zdefiniujemy strategii obsługi błędów, wtedy domyślna będzie użyta:

   protected virtual SupervisorStrategy SupervisorStrategy()
    {
      return SupervisorStrategy.DefaultStrategy;
    }

Domyślna strategia to z kolei OneForOneStrategy.
Warto również przyjrzeć się  innym przeciążeniom konstruktora, np.:

    ///
<summary>
    /// Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
    ///                 to all children when one fails, as opposed to <see cref="T:Akka.Actor.OneForOneStrategy"/> that applies
    ///                 it only to the child actor that failed.
    /// 
    /// </summary>

    /// <param name="maxNrOfRetries">the number of times a child actor is allowed to be restarted, negative value means no limit,
    ///                 if the limit is exceeded the child actor is stopped.
    ///             </param><param name="withinTimeRange">duration of the time window for maxNrOfRetries, Duration.Inf means no window.</param><param name="localOnlyDecider">mapping from Exception to <see cref="T:Akka.Actor.Directive"/></param>
    public OneForOneStrategy(int? maxNrOfRetries, TimeSpan? withinTimeRange, Func<Exception, Directive> localOnlyDecider)
      : this(maxNrOfRetries.GetValueOrDefault(-1), (int) withinTimeRange.GetValueOrDefault(Timeout.InfiniteTimeSpan).TotalMilliseconds, localOnlyDecider, true)
    {
    }

Widzimy, że oprócz wspomnianego wyrażenia lambda, możemy określić maksymalną liczbę prób oraz przedział czasowy.

AKKA.NET – obsługa błędów

W poprzednich postach o AKKA.NET pisałem o hierarchii aktorów. Stanowi ona bardzo ważny element systemów opartych na aktorach. Zasada jest taka, że uproszczamy problem na podproblmy, aż do momentu, gdy każdy podproblem jest łatwy w implementacji za pomocą pojedynczego aktora.  Dla przypomnienia, stworzyliśmy następującą hierarchię:

controller

Co się stanie, gdy jeden z aktorów wyrzuci wyjątek? AKKA.NET posiada kilka mechanizmów. Przede wszystkim jednak, taki aktor zostanie wstrzymany, możliwe  wraz ze wszystkimi potomkami (w zależności od przyjętej strategii). Następnie do rodzica zależy, co należy zrobić. Możliwe strategie to:

  • Węzeł podrzędny zostanie wznowiony
  • Węzeł podręczny zostanie zatrzymany
  • Węzeł podręczny zostanie zrestartowany
  • Rodzic może nie wiedzieć jak obsłużyć dany błąd. Z tego względu jest możliwość eskalacji problemu do kolejnego rodzica (czyli dziadka względem węzła, który wywołał problem).

Dobór odpowiedniej strategi jest dość trudny i należy wiedzieć czym się charakteryzuje każda z nich.

W przypadku wznowienia działania, stan wewnętrzny aktora jest utrzymany. Ponadto wszystkie dzieci danego węzła również zostaną wznowione.

Dla odmiany restart aktora powoduje usunięcie całego stanu.  Jeśli wiemy, że stan aktora jest błędny (niespójny), wtedy taka strategia jest rozsądna. Analogicznie potomkowie również zostaną zrestartowani.

Przez stan wewnętrzny mam na myśli prywatne pola i zmienne. Kolejka z wiadomościami asynchronicznymi jest przechowywana gdzieś indziej. Jeśli zatem wiadomość “A” wywołała problem i zrestartowaliśmy aktora, wtedy kolejne wiadomości  np. “B”, które były już w kolejce, nie zostaną utracone. Pominiemy wyłącznie wiadomość, która wywołała błąd.

Analogicznie sytuacja wygląda z zatrzymaniem aktora. Jest to skrajna sytuacja, kiedy nie wiemy jak naprawić problem. Wtedy najbezpieczniej jest po prostu zatrzymać węzeł powodujący problem, wraz ze wszystkimi jego potomkami.

Zasadę eskalacji błędu do rodzica, myślę, że można już wywnioskować ponieważ wygląda analogicznie. Węzeł jest wstrzymany tymczasowo w momencie przekazania kontroli do kolejnego rodzica.

Implementacja obsługi błędów w AKKA.NET sprowadza się do tzw.  “Supervision strategy”.

Pierwsza, domyślna strategia to OneForOneStrategy. Oznacza to, że akacja zostanie podjęta wyłącznie na węźle, który spowodował problem. Załóżmy, że mamy rodzica P z dziećmi A, B, C. Jeśli wyjątek został wyrzucony przez “A”, wszelkie akcje (takie jak zatrzymanie, restart) zostaną podjęte na tylko A.

AllForOneStrategy z kolei podejmie akcje dla każdego z rodzeństwa. Oznacza to, że jeśli A wywoła wyjątek to również B,C zostaną np. wstrzymane lub zrestartowane.

Powyższe strategie również definiują jaki wyjątek jaką akcję powinien spowodować (stop, restart, escalate, resume).

W praktyce wygląda to tak, że aktor zwraca swoją strategie, a strategia z kolei przyjmuje kilka parametrów określających możliwe akcje w zależności od wyjątku, liczby powtórzeń itp. W przyszłym wpisie, pokaże jak to wygląda od strony C#.

AKKA .NET – definiowanie ścieżki aktora

W poprzednim poście użyliśmy metody ActorSelection w celu uzyskania referencji do aktora:

var actor1 = system.ActorSelection("/user/ApplicationUserControllerActor/Piotr")

Dzisiaj chciałbym bardziej skupić się na definiowaniu ścieżki do aktora. Pełna ścieżka może zawierać następujące elementy:
– protokół
– nazwa systemu
– adres ip aktora
– seria nazw aktorów opisująca hierarchie np. ApplicationUserControllerActor/actor1/actor2 itp.

Załóżmy, że nasz aktor znajduje się na innym komputerze. Oznacza to, aby wysłać jakieś wiadomości do niego musimy skorzystać z protokołu sieciowego. Nazwa ścieżki będzie wtedy wyglądać:

akka.tcp://FooHierarchySystem@89.124.43.14/user/ApplicationUserControllerActor/Piotr

Jeśli nie korzystamy ze zdalnych aktorów, wtedy moglibyśmy napisać:

akka://FooHierarchySystem/user/ApplicationUserControllerActor/Piotr

W poprzednim poście jednak nie musieliśmy podawać nazwy systemu (FooHierarchySystem). Wynika to z faktu, że akka .net wspiera ścieżki relatywne. ActorSlection wywoływaliśmy bezpośrednio na systemie, zatem nie było potrzeby precyzowania tego w ścieżce. Można to porównać do ścieżek relatywnych występujących w systemie plików.

Załóżmy, że jesteśmy teraz w aktorze ApplicationUserControllerActor i hierarchia jest taka sama jak w poprzednim wpisie:
controller

W celu uzyskania dostępu do jednej z instancji ApplicationUserActor możemy:

// Pierwsze podejście
Context.ActorSelection("akka://FooHierarchySystem/user/ApplicationUserControllerActor/Piotr");
// Drugie podejście
Context.ActorSelection("user/ApplicationUserControllerActor/Piotr");
// Trzecie podejście
Context.ActorSelection("Piotr");

Wszystkie ścieżki wskazują na tego samego aktora, a dwie ostatnie są relatywne. Podobnie z ApplicationUserControllerActor możemy:

Context.ActorSelection("../../ApplicationUserControllerActor").Tell(new AddUser("AnotherUser"));

Tak jak w ścieżkach systemu plików, ../ oznacza wejście o jeden poziom do góry. W powyższym przykładzie ../../ da nam referencję na /user, a potem schodzimy z powrotem na ApplicationUserControllerActor.

AKKA.NET: Hierarchia aktorów

Hierarchia aktorów to kolejny bardzo ważny element w projektowaniu systemów rozproszonych, bazujących na aktorach.

Przede wszystkim pomaga w tworzeniu atomowych aktorów. Dzięki hierarchii, pojedynczy aktor może zawierać na tyle mało logiki, że jesteśmy w stanie wykonać operację wielowątkową bez użycia blokad.

Z poprzedniego artykułu o Reactive Manifesto, pamiętamy, że obsługa błędów (resilent) oraz skalowalność muszą być zagwarantowane w systemach reaktywnych. Dzięki hierarchii aktorów bardzo łatwo możemy izolować awarie pojedynczego węzła od innych. Ponadto, ze względu na relacje między różnymi aktorami, możemy dobrać odpowiednio strategie obsługi błędów.

Hierarchia aktorów stanowi drzewo relacji. Każdy węzeł ma rodzica, który stanowi jego zarządcę. To od niego należy, czy w momencie wystąpienia błędu, wszystkie węzły potomne zostaną anulowane czy tymczasowo zawieszone.

Zasada jest taka więc, że tworzymy coraz to nowe poziomy w drzewie aktorów, aż w do momentu, gdy logika w pojedynczym aktorze jest wystarczająco prosta i odizolowana.

W systemie AKKA.NET każdy węzeł ma zarządce i każdy z nich może być również zarządcą swoich potomków.  W AKKA.NET istnieją również systemowe węzły:

root

Aktor / jest bazowym węzeł dla całego systemu. To od niego zaczyna się hierarchia. Później mamy /user oraz /system. Każdy aktor zdefiniowany przez nas, będzie podlegał /user. Jeśli /user zdecyduje się na anulowanie podrzędnych węzłów, nasi aktorzy zostaną anulowani. Z kolei /system, to inny bazowy węzeł, dla celów czysto systemowych takich jak obsługa błędów czy wykonywanie logów.

Załóżmy, że mamy system składający się z dowolnej liczby użytkowników. Nie ma w tej chwili znaczenia co to dokładnie za system. Wiemy, że jednym z aktorów będzie po prostu “ApplicationUser”.  Jednym z rozwiązań, byłoby stworzenie następującej hierarchii:

h

Powyższa hierarchia, składająca się z pojedynczego aktora nie jest zbyt dobra.  Lepiej wprowadzić koncept ApplicationUserController, który będzie przechowywał użytkowników oraz aktorów. ApplicationUserController będzie zatem zarządzał aktorami ApplicationUser:

controller

W momencie zalogowania się nowego użytkownika, kolejny ApplicationUser jest tworzony przez ApplicationUserController.

Przejdźmy zatem do implementacji. Zaczniemy od ApplicationUser:

  public class ApplicationUserActor : UntypedActor
    {
        private readonly string _userName;

        public ApplicationUserActor(string userName)
        {
            _userName = userName;
        }

        protected override void OnReceive(object message)
        {

            Console.WriteLine("Received by {0}: {1}", _userName, message);
        }

        protected override void PreStart()
        {
            Console.WriteLine("{0}: PreStart",_userName);
            base.PreStart();
        }

        protected override void PostStop()
        {
            Console.WriteLine("{0}: PostStop", _userName);
            base.PostStop();
        }

        protected override void PreRestart(Exception reason, object message)
        {
            Console.WriteLine("{0}: PreRestart", _userName);
            base.PreRestart(reason, message);
        }

        protected override void PostRestart(Exception reason)
        {
            Console.WriteLine("{0}: PostRestart", _userName);
            base.PostRestart(reason);
        }
    }

ApplicationUser w naszym przypadku będzie po prostu wyświetlał przychodzące wiadomości. Ponadto przeciążyłem hook’i opisane w poprzednim poście. Ułatwią one później śledzenie zachowania systemu. Każdy ApplicationUser przechowuje nazwę użytkownika, dzięki której później będziemy w stanie rozróżniać poszczególne instancje aktora.

Kolejny aktor będzie służył do zarządzania ApplicationUser. Często określa się go miastem strażnika (guardian), ponieważ to on jest odpowiedzialny za zarządzanie, jak i obsługę błędów (więcej o tym w następnym wpisie). Kod:

public class ApplicationUserControllerActor : ReceiveActor
    {
        public ApplicationUserControllerActor()
        {
            Receive<AddUser>(msg => AddUser(msg));


        }

        private void AddUser(AddUser msg)
        {
            Console.WriteLine("Requested a new user: {0}", msg.UserName);

            IActorRef newActorRef =
                Context.ActorOf(Props.Create(() => new ApplicationUserActor(msg.UserName)), msg.UserName);

        }

        protected override void PreStart()
        {
            Console.WriteLine("ApplicationUserControllerActor: PreStart");
            base.PreStart();
        }

        protected override void PostStop()
        {
            Console.WriteLine("ApplicationUserControllerActor: PostStop");
            base.PostStop();
        }

        protected override void PreRestart(Exception reason, object message)
        {
            Console.WriteLine("ApplicationUserControllerActor: PreRestart");
            base.PreRestart(reason, message);
        }

        protected override void PostRestart(Exception reason)
        {
            Console.WriteLine("ApplicationUserControllerActor: PostRestart");
            base.PostRestart(reason);
        }
    }

Aktor obsługuje wyłącznie wiadomość AddUser, która wygląda następująco:

    public class AddUser
    {
        public string UserName { get; private set; }

        public AddUser(string userName)
        {
            UserName = userName;
        }
    }

W momencie otrzymania AddUser, dodany jest nowy aktor podrzędny za pomocą:

Context.ActorOf(Props.Create(() => new ApplicationUserActor(msg.UserName)), msg.UserName);

Wywołanie ActorOf w kontekście aktora A, spowoduje utworzenie aktora B, który jest podrzędny względem A. Innymi słowy, ApplicationUserActor podlega ApplicationUserController, który z kolei podlega /user. Jak wiemy z początku wpisu, /user jest systemowym aktorem, który zawsze istnieje. Być może, nie potrzebnie nazywałem swoich aktorów również “User”. W każdym, razie /user jest systemowy i nie ma nic wspólnego z logiką biznesową danej aplikacji. Z kolei ApplicationUser oraz ApplicationUserController zostały stworzone przez nas za pomocą powyższego kodu.

Przejdźmy teraz do testowania. Korzeń systemu tworzymy w znany już sposób:

var system = ActorSystem.Create("FooHierarchySystem");

IActorRef userControllerActor=system.ActorOf<ApplicationUserControllerActor ("ApplicationUserControllerActor");

Następnie wysyłamy dwie wiadomości AddUser, w celu dodania dwóch nowych użytkowników, a co za tym idzie, również dwóch nowych aktorów:

userControllerActor.Tell(new AddUser("Piotr"));
userControllerActor.Tell(new AddUser("Pawel"));

Na ekranie zobaczymy, że najpierw ApplicationUserController zostały stworzony, potem dwóch użytkowników zostało dodanych, a po pewnych czasie hooki OnPreStart dla nowych aktorów zostały wywołane:

1

Następnie, chcemy wysłać wiadomość do nowo utworzonych aktorów:

Console.ReadLine();
Console.WriteLine("Sending a message to ApplicationActor(Piotr)...");
var actor1 = system.ActorSelection("/user/ApplicationUserControllerActor/Piotr");
actor1.Tell("Testing actor 1");

Console.ReadLine();
Console.WriteLine("Sending a message to ApplicationActor(Pawel)...");
var actor2 = system.ActorSelection("/user/ApplicationUserControllerActor/Pawel");
actor2.Tell("Testing actor 2");

Warto zwrócić jak uzyskujemy dostęp do wcześniej już stworzonego aktora:

system.ActorSelection("/user/ApplicationUserControllerActor/Piotr");

Więcej o definiowaniu ścieżek napiszę innym razem. W najprostszej postaci zawierają jednak one serie nazw aktorów. Korzeniem zawsze jest /user. Następnie stworzyliśmy pojedynczą instancję ApplicationUserControllerAcrtor. Ostatnią częścią jest nazwa instancji ApplicationUser. W naszym przypadku, jako nazwę tej instancji podaliśmy nazwę użytkownika. Dla przypomnienia:

                Context.ActorOf(Props.Create(() => new ApplicationUserActor(msg.UserName)), msg.UserName);

Drugi parametr, metody Props.Create to nazwa aktora. Ponadto, jak widzimy, hierarchia aktorów nie pełni roli routing’u wiadomości. Nie musimy wysyłać wiadomości do korzenia systemu. Wystarczy zaznaczyć konkretnego aktora i komunikować się bezpośrednio z nim. Relacje aktorów mają wyłącznie znaczenie dla obsługi błędów (zobaczymy w następnym wpisie), jak i zarządzania czasem życia instancji.

W tej chwili, na ekranie powinniśmy zobaczyć następujące logi:
2

Na koniec, zobaczmy jak aktorzy zachowują się w momencie zamykania systemu:

            Console.ReadLine();
            Console.WriteLine("Requesting the system shutdown.");
            system.Shutdown();
            system.AwaitTermination();

Nie trudno domyślić się, że najpierw zostanie zamknięty ApplicationUserController, a dopiero potem dwie instancje ApplicationUser. Całość kodu do testowania:

 class Program
    {
        static void Main(string[] args)
        {
            var system = ActorSystem.Create("FooHierarchySystem");
            IActorRef userControllerActor =
                system.ActorOf<ApplicationUserControllerActor>("ApplicationUserControllerActor");

            userControllerActor.Tell(new AddUser("Piotr"));
            userControllerActor.Tell(new AddUser("Pawel"));

            Console.ReadLine();
            Console.WriteLine("Sending a message to ApplicationActor(Piotr)...");
            var actor1 = system.ActorSelection("/user/ApplicationUserControllerActor/Piotr");
            actor1.Tell("Testing actor 1");

            Console.ReadLine();
            Console.WriteLine("Sending a message to ApplicationActor(Pawel)...");
            var actor2 = system.ActorSelection("/user/ApplicationUserControllerActor/Pawel");
            actor2.Tell("Testing actor 2");

            Console.ReadLine();
            Console.WriteLine("Requesting the system shutdown.");
            system.Shutdown();
            system.AwaitTermination();

            Console.ReadLine();
        }
    }

AKKA.NET – czas życia aktorów, zdarzenia (hooks)

Dzisiaj zacząłem pisać post o hierarchii aktorów. Jest to bardzo ważny element w celu osiągnięcia skalowalności i dobrej obsługi błędów (np. poprzez izolacje wadliwych aktorów).

W połowie jednak stwierdziłem, że najpierw wypada napisać krótki wpis o zdarzeniach (hooks), jakie możemy zdefiniować w AKKA. Pozwoli nam to potem lepiej zrozumieć przepływ informacji w hierarchiach aktorów.

Każdy aktor,  może znajdować się w następujących etapach:

  • Starting – aktor został dopiero stworzony i nie przetwarza jeszcze wiadomości
  • Receiving – aktor może teraz otrzymywać wiadomości
  • Stopping – zwalnianie zasobów
  • Terminated – aktor nie może już otrzymywać wiadomości, ponadto w tym stanie, nie może zostać już wznowiony.
  • Restarting – aktor aktualnie jest resetowany. Oznacza to, że po restarcie może przejść do “Starting”, a potem do “Receiving”, czyli będzie w stanie ponownie przetwarzać wiadomości.

Z perspektywy kodu, poszczególne stany można obserwować, przeciążając odpowiednie metody:

  • OnStart  (Starting) – metoda wywołana przed rozpoczęciem otrzymywania jakichkolwiek wiadomości
  • PostStop (Stopping) – zwalnianie zasobów
  • PreRestart – przed rozpoczęciem restartu.
  • PostRestart po zakończeniu restartu, ale jeszcze przed ponownym wywołaniem OnStart.

Kod:

 class FooActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Console.WriteLine("Received: {0}", message);
        }

        protected override void PreStart()
        {
            Console.WriteLine("PreStart");
            base.PreStart();
        }

        protected override void PostStop()
        {
            Console.WriteLine("PostStop");
            base.PostStop();
        }

        protected override void PreRestart(Exception reason, object message)
        {
            Console.WriteLine("PreRestart");
            base.PreRestart(reason, message);
        }

        protected override void PostRestart(Exception reason)
        {
            Console.WriteLine("PostRestart");
            base.PostRestart(reason);
        }
    }

W celu przetestowania możemy:

            var system = ActorSystem.Create("system");
            var actor = system.ActorOf<FooActor>();

            actor.Tell("Hello World!");
            system.Shutdown();
            system.AwaitTermination();

Poszczególne metody, przydadzą się w następnym wpisie, poświęconym hierarchii aktorów oraz relacjami między aktorami. Każdy z aktorów może być zarządcą więc śledzenie czasu życia aktorów jest pomocne w zrozumieniu tych zasad.

Programowanie reaktywne

Zanim będę kontynuował serię o AKKA.NET, warto zapoznać się z podstawami programowania reaktywnego. Pozwoli to później zrozumieć, w jaki sposób AKKA.NET implementuje założenia programowania reaktywnego.
Dzisiaj zatem przedstawienię tzw. “The Reactive Manifesto”, którego pełną treść można znaleźć tutaj. Moim zdaniem jednak, manifest może wydawać się trochę skomplikowany i dlatego zdecydowałem się wyjaśnić to po swojemu.

Zastanówmy się po co nam kolejny “typ” programowania? Co jest złego z naszym starym, klasycznym podejściem klient-serwer? Jak nie trudno domyślić się, wymagania i oczekiwania dla dzisiejszych systemów są inne niż te 10 lat temu. Zamiast pojedynczych serwerów, mamy całe klastry. Przez ogólną dostępność komputerów (PC, Mobile itp.), przetwarzamy dane nie w gigabajtach ale w terabajtach. Model oprogramowania także zmienił się. Większość systemów stanowią usługi, a nie oprogramowanie desktopow’e, pakowane w kartony i sprzedawane w sklepach. Oczekiwania są, aby te usługi działały jak najdłużej bez żadnych usterek. Oczywiście ciężko dostarczyć oprogramowanie, które działa 100% czasu, ale możliwe jest uzyskanie niezawodności np. w 99% czasu. Dostępność różnych urządzeń dostępowych, typu tablet czy telefon komórkowy, powoduje, że użytkownicy nie chcą czekać kilku sekund na odpowiedź od serwera, a mieć rezultat w przeciągu milisekund.

Jak sama nazwa mówi, programowanie reaktywne, cechuje się, że poszczególne komponenty w odpowiedni sposób reagują (react). Na co zatem nasze systemy powinny reagować?

Zdarzenia
Przede wszystkim powinny reagować na zdarzenia. Systemy reaktywne opierają się na zdarzeniach, a nie na zapytaniach typu klient-serwer. W nServiceBus czy AKKA.NET mamy właśnie zdarzenia w postaci asynchronicznych wiadomości. Aktor wysyłając wiadomość do innego aktora, nie blokuje wykonywania procesu. Komponenty komunikujące się za pomocą zdarzeń są powiązane ze sobą w luźny sposób. Jeśli klasa A komunikuję się z klasą B, nie musimy przechowywać żadnej referencji z instancji A do B. Jedyną, luźną zależnością jest wiadomość\zdarzenie.

Dane, obciążenie
Systemy reaktywne muszą również reagować na dane, które przekazywane mogą być z różną intensywnością. Innymi słowy, system powinien być skalowalny. Powinien odpowiednio reagować na małą liczbę zapytań, jak i bardzo dużą. W przypadku niewystarczających zasobów, system powinien zareagować poprzez skalowanie. Jednym z typów skalowania (tzw. scaling-up), jest użycie więcej pamięci czy rdzeni procesora, poprzez np. wielowątkowość. Czasami jednak taka skalowalność jest niewystarczająca i trzeba szukać zasobów na zewnątrz (tzw. scaling out). Zasoby na zewnątrz to oczywiście kolejny serwer dołączony do klastra.

Ogromną rolę w reagowaniu na obciążenie odgrywają wcześniej opisane zdarzenia. Skoro dwie klasy nie są ze sobą mocno powiązane, wtedy nie współdzielą ze sobą żadnego stanu. Co za tym idzie, nic nie stoi na przeszkodzie, aby jedna z klas była wykonywana na kompletnie innym komputerze. Jeśli cały stan zawarty jest w wiadomości, nie ma dla nas różnicy czy przekażemy go w tym samym procesie np. do innego wątku,czy wyślemy przez sieć do innego klastra. Taka właściwość nazywa się “location transparency”, ponieważ raz zaimplementowana logika, może być skalowana bez żadnych zmian w kodzie. AKKA.NET posiada tą właściwość. Implementując jakiś algorytm za pomocą aktorów, można część z nich umieścić na różnych komputerach (tzw. remote actor). Lokalizacja aktorów zatem nie ma znaczenia – mogą znajdować się w tym samym procesie albo na innych komputerach, a i tak zaimplementowany algorytm będzie miał taki sam kod.

Wyjątki i błędy
Systemy reaktywne powinny również reagować w odpowiedni sposób na wszelkie błędy. AKKA.NET posiada szereg strategii obsługi wyjątków o których napiszę w przyszłych postach. System jednak nie powinien przestawać działać w momencie, gdy mało istotny błąd miał miejsce. Dokonuje się tego np. poprzez izolację. Jeśli aktor A ma jakieś błędy, można go odseparować od reszty, aby nie popsuć stanu aplikacji. Innymi słowy, system powinien wrócić do działania w momencie wystąpienia błędu (failure receovery). Jeśli np. nie można połączyć się z jakąś usługą, można spróbować ponownie za kilka sekund, zamiast całkowicie anulować operację.
Aktorzy w AKKA.NET tworzą hierarchie. Jeśli jeden z aktorów ma błędy, jego rodzic decyduje, co z błędem należy zrobić. Można np. powtórzyć operację, zrestartować aktora lub przekazać odpowiedzialność do kolejnego aktora, który zdecyduje co należy zrobić z węzłami podrzędnymi.
Jak widzimy, dobrą obsługę błędów również uzyskujemy za pomocą wspomnianych zdarzeń. Izolacja czy replikacja nie byłoby możliwa bez nich. Za pomocą zdarzeń, można komunikować się między aktorami. Jeśli błąd wystąpił w klastrze A, można spróbować w innym klastrze. Stan nie jest współdzielony, zatem odseparowanie pojedynczego węzła jest łatwe i nie powoduje proliferacji problemu do innych aktorów\węzłów.

Responsywność
Ostatnią, najłatwiejszą w zrozumieniu cechą, jest responsywność. Systemy reaktywne powinny reagować na użytkowników. Jeśli operacja jest czasochłonna, należy powiadomić o tym użytkownika. Znowu uzyskujemy to za pomocą zdarzeń. Zamiast wysłania pojedynczego zapytania i czekania aż operacja wykona się, korzystamy z asynchronicznych wiadomości, które nie blokują wykonywania.
Oczywiście responsywność jest również uzyskana za pomocą skalowalności i poprawnej obsługi błędu. Jeśli jest duża liczba zapytań, dzięki skalowalności system nie powinien być powolny. Obsługa błędów, zagwarantuje, że użytkownik zawsze jest świadom co się dzieje i nie zastanie np. pustej strony, gdy operacja nie powiodła się.

Spójrzmy zatem na słynny diagram, który można znaleźć na stronie “The Reactive Manifesto” (źródło: http://www.reactivemanifesto.org:

“Message-driven” to wspomniane zdarzenia, wiadomości.
“Responsive” to oczywiście responsywność systemu.
“Resilent” to prawidłowa obsługa błędów. Innymi słowy, system powinien być elastyczny (resilent) na błędy i dostosowywać się do sytuacji.
“Elastic” to skalowalność. System powinien być na tyle elastyczny, aby obsługiwać zarówno małą liczbę zapytań jak i bardzo dużą.

Ponadto z oznaczeń na rysunku, możemy zaobserwować:
– Dzięki zdarzeniom uzyskujemy zarówno elastyczność\skalowalność (location transparency), responsywność (nie blokujemy wywołań) oraz Resilent (izolacja błędów, replikacja w innym klastrze).
– Skalowalność (elastic) oraz obsługa błędów (resilence) są ze sobą tak naprawdę powiązane. Gdyby nie skalowalność i location transparency, nie moglibyśmy na przykład wykonać operacji w innym klastrze. Gdyby, nie prawidłowa obsługą błędów, awaria w innych punkcie systemu, zniszczyła by wszystkie węzły, niwelując korzyści ze skalowalności.
– Responsywność nie byłaby możliwa dzięki skalowalności (wydajność), jak i prawidłowej obsługi błędów (nie zostawianie użytkownika bez odpowiedzi w przypadku błędów).

Producent-konsument w C# – BlockingCollection

BlockingCollection jest specjalną kolekcją danych, przygotowaną do implementacji wzorca producent-konsument. Nakład pracy do implementacji tego wzorca jest minimalny z BlockingCollection. Nie musimy martwić się o synchronizację, sekcję krytyczną czy deadlock. Zacznijmy od razu od przykładu.
Producent będzie wyglądać następująco:

       private static void Produce(BlockingCollection<int> buffer)
        {
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine("Producing {0}", i);
                Thread.Sleep(10);

                buffer.Add(i);
            }

            buffer.CompleteAdding();
        }

Jak widzimy, implementacja producenta to nic innego jak dodawanie danych do kolekcji. Metoda Add jest thread-safe więc nie musimy używać lock. Ponadto robi to bardzo optymalnie, ponieważ nie jest to prosty mechanizm polegający po prostu na wstawieniu lock’a. Zaglądając do implementacji Add, zobaczymy między innymi spinning. BlockingCollection należy to tzw. concurrent collections o których już pisałem na blogu. Oznacza to, że jakiekolwiek operacje są zaimplementowane w ten sposób, aby unikać blokad. Zostało to osiągnięte na poziomie projektu (np. kilka mini-kolekcji w środku dla różnych wątków), jak i używania spinningu, gdy wiadomo, że zbyt długo nie będzie trzeba czekać.

Metoda CompleteAdding kończy produkcję i konsumenci nie będą już dłużej czekać. Musimy ją wywołać na końcu ponieważ w przeciwnym wypadku, konsumenci będą myśleli, że produkcja ciągle trwa i należy wciąż czekać.
Przyjrzyjmy się teraz konsumpcji:

        private static void Consume(BlockingCollection<int> buffer)
        {
            foreach (var i in buffer.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming {0}.", i);
                Thread.Sleep(20);
            }
        }

Kluczem jest metoda GetConsumingEnumerable. W bezpieczny sposób usuwa one dane z bufora. Jeśli w buforze nic nie ma po prostu wątek będzie blokowany. Iterator zakończy dopiero działanie, gdy producent wywoła CompleteAdding. W przeciwnym wypadku, foreach będzie zdejmował dane z kolekcji, lub blokował wywołanie w oczekiwaniu na więcej danych. Jeśli zajrzyjmy do implementacji wewnętrznej, znowu znajdziemy semafory i SpinWait.

Tak naprawdę, do najprostszej implementacji nic więcej już nie potrzebujemy. Całość wygląda zatem następująco:

class Program
    {
        static void Main(string[] args)
        {
            var buffer=new BlockingCollection<int>();

            var producerTask = Task.Run(() => Produce(buffer));
            var consumeTask = Task.Run(() => Consume(buffer));

            Task.WaitAll(producerTask, consumeTask);
        }

        private static void Produce(BlockingCollection<int> buffer)
        {
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine("Producing {0}", i);
                Thread.Sleep(10);

                buffer.Add(i);
            }

            buffer.CompleteAdding();
        }

        private static void Consume(BlockingCollection<int> buffer)
        {
            foreach (var i in buffer.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming {0}.", i);
                Thread.Sleep(20);
            }
        }
    }

W praktyce jednak trzeba rozważyć kilka innych “drobiazgów”. Co jeśli wyjątek zdarzy się podczas konsumpcji danych? Producent wciąż będzie generował dane, co przecież zwykle nie ma sensu i spowoduje memory leak. Dlatego lepiej napisać obsługę błędów:

       private static void Consume(BlockingCollection<int> buffer)
        {
            try
            {
                foreach (var i in buffer.GetConsumingEnumerable())
                {
                    Console.WriteLine("Consuming {0}.", i);
                    Thread.Sleep(20);
                    throw new Exception();
                }
            }
            catch
            {
                buffer.CompleteAdding();
                throw;
            }
        }

W momencie wystąpienia błędu, wywołujemy CompleteAdding, co spowoduje, że próba dodania nowych danych przez producenta zakończy się wyjątkiem InvalidOperationException i zakończeniem produkcji.
Analogicznie, warto dostać klauzule try-finally w producencie:

        private static void Produce(BlockingCollection<int> buffer)
        {
            try
            {
                for (int i = 0; i < 100; i++)
                {
                    Console.WriteLine("Producing {0}", i);
                    Thread.Sleep(10);

                    buffer.Add(i);
                }
            }
            finally
            {
                buffer.CompleteAdding();
            }

        }

W przypadku producenta, chcemy wywołać CompleteAdding zarówno w przypadku powodzenia (aby konsument już dłużej nie czekał), jak i wyjątku.
CompleteAdding tak naprawdę korzysta z CancellationToken, który jest znany nam z TPL. Prawdopodobnie warto również dodać obsługę wyjątków InvalidOperationException, tak aby dwukrotnie nie wywoływać CompleteAdding.

Inna bardzo ważna obserwacja to przypadek, gdy konsument jest dużo wolniejszy niż producent. Załóżmy, że wyprodukowanie zajmuje jedną sekundę, a konsumpcja 10. W tym przypadku, po długim przetwarzaniu możemy mieć do czynienia z ogromną alokacją pamięci ponieważ producent będzie ciągle dodawał dane, a konsument nie nadąży z usuwaniem ich.

BlockingCollection w bardzo prosty sposób rozwiązuje ten problem poprzez wprowadzenie maksymalnego limitu “porcji” w kolekcji. Wystarczy, w konstruktorze przekazać maksymalną pojemność:

var buffer = new BlockingCollection<int>(boundedCapacity: 10);

Po przekroczeniu limitu, metoda Add nie wyrzuci wyjątku, a po prostu będzie blokowała wywołanie za pomocą wspomnianych wcześniej technik (spinning, locking etc).