Wzorzec Saga, nServiceBus- część V

Dzisiaj o implementacji wzorca saga w nServiceBus. Korzystamy z niego, jeśli mamy kilka procesów biznesowych, które współtworzą jeden rozbudowany workflow. Załóżmy, że w celu złożenia zamówienia, należy wykonać kilka operacji takich jak weryfikacja danych, zapisanie danych w bazie, wysłanie emaila itp. Jeśli operacje są niezależne od siebie, wtedy wystarczy wysłać odpowiednie wiadomości do odizolowanych od siebie handlerów, tak jak to miało miejsce w poprzednich wpisach.

Co jeśli pewien stan jest współdzielony? Wtedy wspomniane podejście nie zadziałała. Rozwiązaniem jest wzorzec sagi. Można ją porównać do transakcji z tym, że nie ma tutaj koncepcji ACID. Jedynie co mamy to stan, który będzie współdzielony przez wszystkie etapy sagi. Z punktu widzenia nServiceBus, jest to klasa, która obsługuje kilka wiadomości naraz. Będziemy mieli zatem kilka implementacji metody Handle, dla różnych wiadomości. Oprócz tego, stan jest współdzielony między wszystkie handlery i zapisywany odpowiednio w bazie danych przez nServiceBus. W programowaniu rozproszonym nie wiemy, ile czasu ten stan będzie musiał być przechowywany w pamięci. Wywoływanie zewnętrznych serwisów może potrwać od sekundy po kilka godzin lub dni. Jak wspomniałem wcześniej, główna koncepcja nServiceBus i kolejek to  zagwarantowanie, że każda wiadomość zostanie dostarczona, nawet w przypadku tymczasowej awarii sieci. Z tego względu, niezbędne jest zapisanie stanu w bazie danych, a nie tylko do pamięci podręcznej.

Myślę, że przykład wyjaśni najlepiej powyższy wzorzec, który wciąż nie jest bardzo znany. Zacznijmy od deklaracji stanu, który będzie współdzielony przez wszystkie etapy sagi:

public class MySagaData : ContainSagaData
{
   [Unique]
   public Guid SagaId { get; set; }

   public string CustomField1 { get; set; }
   public string CustomField2 { get; set; }
   public string CustomField3 { get; set; }
}

W nServiceBus każdy stan musi dziedziczyć po ContainSagaData. Saga również powinna mieć identyfikator. W końcu chcemy współdzielić stan w ramach konkretnej sagi. Innymi słowy,  gdy rozpoczniemy dwie niezależne od siebie sagi, powinny one mieć osobne stany.

Samą sagę definiujemy, poprzez dziedziczenie po klasie Saga:

class SagaExample : Saga<MySagaData>
{
   protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper)
   {
       throw new NotImplementedException();
   }
}

W ConfigureHowToFindSaga definiuje się korelacje między wiadomościami a sagą. Następnie musimy zdefiniować co rozpocznie daną sagę. W nServiceBus będzie to oczywiście wiadomość np.:

public class AddPerson:IMessage
{
   public Guid PersonId { get; set; }
}

Za pomocą interfejsu IAmStartedByMessages, informujemy, że dana wiadomość (AddPerson) zawsze rozpocznie nową sagę:

class SagaExample : Saga<MySagaData>, IAmStartedByMessages<AddPerson>
{
   protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper)
   {
       throw new NotImplementedException();
   }

   public void Handle(AddPerson message)
   {
       throw new NotImplementedException();
   }
}

Kolejnym etapem jest zdefiniowanie kilku wiadomości, które będą współtworzyć sagę. W naszym przypadku:

public class FirstNameMessage : IMessage
{
   public Guid PersonId { get; set; }
   public string FirstName { get; set; }
}

public class LastNameMessage : IMessage
{
   public Guid PersonId { get; set; }
   public string LastName { get; set; }
}

Saga zatem aktualnie wygląda następująco:

class SagaExample : Saga<MySagaData>, IAmStartedByMessages<AddPerson>, IHandleMessages<FirstNameMessage>,IHandleMessages<LastNameMessage>
{
   protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper)
   {
       throw new NotImplementedException();
   }

   public void Handle(AddPerson message)
   {
       throw new NotImplementedException();
   }

   public void Handle(FirstNameMessage message)
   {
       throw new NotImplementedException();
   }

   public void Handle(LastNameMessage message)
   {
       throw new NotImplementedException();
   }
}

Innymi słowy, saga składa się ze stanu, wiadomości rozpoczynającej proces (sagę) oraz wiadomości składających się na nią. Musimy teraz skonfigurować korelację między poszczególnymi wiadomościami a współdzielonym stanem. W naszym przypadku PersonID będzie określał sagę.

We wspomnianej metodzie ConfigureHowToFindSaga, definiujemy mapowanie:

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper)
{
  mapper.ConfigureMapping<FirstNameMessage>(s => s.PersonId).ToSaga(m => m.SagaId);
  mapper.ConfigureMapping<LastNameMessage>(s => s.PersonId).ToSaga(m => m.SagaId);
}

Następnie, w obsłudze wiadomości, która rozpoczyna sagę inicjalizujemy stan:

public void Handle(AddPerson message)
{
  Data.SagaId = message.PersonId;
  Data.CustomField1 = "any value";
  Data.CustomField2 = "any value 1";
  Data.CustomField3 = "any value 2";
}

Najważniejsze pole to SagaID. Reszta jest opcjonalna i może zostać wypełniona na każdym etapie sagi. Zaimplementujmy pozostałe wiadomości w następujący sposób:

class SagaExample : Saga<MySagaData>, IAmStartedByMessages<AddPerson>, IHandleMessages<FirstNameMessage>, IHandleMessages<LastNameMessage>
{
   protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper)
   {
       mapper.ConfigureMapping<FirstNameMessage>(s => s.PersonId).ToSaga(m => m.SagaId);
       mapper.ConfigureMapping<LastNameMessage>(s => s.PersonId).ToSaga(m => m.SagaId);
   }

   public void Handle(AddPerson message)
   {
       Data.SagaId = message.PersonId;
       Data.CustomField1 = "any value";
       Data.CustomField2 = "any value 1";
       Data.CustomField3 = "any value 2";

       Console.WriteLine("Handling AddPerson");
       PrintState();
   }

   public void Handle(FirstNameMessage message)
   {
       Data.CustomField2 = message.FirstName;

       Console.WriteLine("Handling FirstNameMessage");
       PrintState();       
   }

   public void Handle(LastNameMessage message)
   {
       Data.CustomField3 = message.LastName;

       Console.WriteLine("Handling LastNameMessage");
       PrintState();
   }

   private void PrintState()
   {
       Console.WriteLine(Data.SagaId);
       Console.WriteLine(Data.CustomField1);
       Console.WriteLine(Data.CustomField2);
       Console.WriteLine(Data.CustomField3);
       Console.WriteLine();
   }
}

Spróbujmy przetestować naszą sagę wysyłając serie następujących wiadomości:

var busConfiguration = new BusConfiguration();
busConfiguration.UsePersistence<RavenDBPersistence>();
ISendOnlyBus bus = Bus.CreateSendOnly(busConfiguration);

Guid personId = Guid.NewGuid();
bus.Send(new AddPerson() {PersonId = personId});
bus.Send(new FirstNameMessage() { PersonId = personId,FirstName = "Piotr"});
bus.Send(new LastNameMessage() { PersonId = personId, LastName = "Zielinski" });

Prześledźmy co po kolei ma miejsce:

image

Najpierw wysyłamy AddPerson, co inicjuje nową sagę. Następnie odbierane są FirstNameMessage oraz LastNameMessage. Widzimy wyraźnie, że stan jest przechowywany pomiędzy kolejnymi wiadomościami.

Spróbujmy teraz wysłać FirstNameMessage oraz LastNameMessage z identyfikatorem sagi, która nie istnieje tzn.:

bus.Send(new AddPerson() {PersonId = Guid.NewGuid()});            
Thread.Sleep(5000);
bus.Send(new FirstNameMessage() { PersonId = Guid.NewGuid(),FirstName = "Piotr"});
Thread.Sleep(5000);
bus.Send(new LastNameMessage() { PersonId = Guid.NewGuid(), LastName = "Zielinski" });

Za każdym razem generujemy nowy identyfikator. Z tego względu tylko AddPerson zostanie obsłużone bo ta wiadomość rozpoczyna nową sagę. Następne wiadomości zakończą się błędem ponieważ posiadają ID nieistniejącej sagi:

image

Zróbmy jeszcze jeden eksperyment. Co się stanie jeśli wyślemy trzy razy tą samą wiadomość?

Guid personId = Guid.NewGuid();

bus.Send(new AddPerson() { PersonId = personId });
bus.Send(new FirstNameMessage() { PersonId = personId, FirstName = "Piotr1" });
bus.Send(new FirstNameMessage() { PersonId = personId, FirstName = "Piotr2" });
bus.Send(new FirstNameMessage() { PersonId = personId, FirstName = "Piotr3" });

Wynik:

image

Jak widzimy, wszystkie wiadomości zostały prawidłowo obsłużone. Wynika to z tego, że nie zamknęliśmy sagi. Można tego dokonać za pomocą metody MarkAsCompleted. Wysłanie kilkukrotne FirstNameMessage poskutkuje teraz:

image

Jak widzimy, tylko raz wiadomość zostanie obsłużona. Kolejne wywołania prowadzą do błędu, ponieważ po pierwszym odebraniu FirstNameMessage, saga jest zakończona. W moich postach korzystam z RavenDB i zaglądając do bazy, przekonamy się, że stan sagi faktycznie jest tam przechowywany:

image

Dla przypomnienia, bazę konfigurowaliśmy (patrz poprzednie wpisy) w EndPointConfiguration:

public class EndpointConfig : IConfigureThisEndpoint, AsA_Server
{
   public void Customize(BusConfiguration configuration)
   {
      configuration.UsePersistence<RavenDBPersistence>();
   }
}

Zaglądając do bazowej klasy ContainSagaData zobaczymy:

public abstract class ContainSagaData : IContainSagaData
  {
    /// <summary>
    /// The saga id
    /// 
    /// </summary>
    public virtual Guid Id { get; set; }
    /// <summary>
    /// The address io the endpoint that started the saga
    /// 
    /// </summary>
    public virtual string Originator { get; set; }
    /// <summary>
    /// The id of the message that started the saga
    /// 
    /// </summary>
    public virtual string OriginalMessageId { get; set; }
  }

Widzimy, że dzięki nServiceBus wiemy kto rozpoczął sagę za pomocą Originator oraz OriginalMessageId.  W naszym przypadku jest to klient, ale zwykle jest to jakiś endpoint. Z tego względu, możemy odpowiedzieć bezpośrednio mu za pomocą:

 ReplyToOriginator(new AnyMessage { FirstName = Data.CustomField2 });

Mamy również do dyspozycji timeout. Jeśli wiemy, że obsługa sagi nie ma sensu, gdy przetwarzanie zajęło dłużej niż określony czas, wtedy możemy zaimplementować interfejs IHandleTimeouts:

 class SagaExample : Saga<MySagaData>, IAmStartedByMessages<AddPerson>, 
        IHandleMessages<FirstNameMessage>, 
        IHandleMessages<LastNameMessage>,
        IHandleTimeouts<MyCustomTimeout>
    {
    // ...
    }

Kolejnym krokiem jest wywołanie RequestTimeout z określonym czasem:

public void Handle(AddPerson message)
{
  Data.SagaId = message.PersonId;
  RequestTimeout<MyCustomTimeout>(TimeSpan.FromHours(1));
}

W tym przypadku jest to 1h. Po tym czasie, zostanie wywołana metoda Timeout (część interfejsu IHandleTimeouts):

public void Timeout(MyCustomTimeout state)
{
    ReplyToOriginator(new ErrorMessage(){PersonId=Data.SagaId});
}

W powyższym przykładzie,  wywołuję ReplyToOrignator co ma sens, ponieważ chcemy poinformować handler, który rozpoczął sagę o timeout.

MyCustomTimeout to zwykła struktura danych. Zaglądając do sygnatur RequestTimeout przekonamy się, jak jest ona wypełniana:

protected void RequestTimeout<TTimeoutMessageType>(TimeSpan within, Action<TTimeoutMessageType> messageConstructor) where TTimeoutMessageType : new();

5 thoughts on “Wzorzec Saga, nServiceBus- część V”

  1. Dzięki za artykuł, bardzo konkretny.
    Wzorzec sagi jest raczej mało popularny, a w sieci nie ma wielu przykładów dlatego duży plus.

    Mam jednak pytanie dotyczące tej implementacji,
    stworzyłem bliźniacze rozwiązanie i niestety nie działa,
    mam problem z wywoływaniem metody Handle dla FirstNameMessage i LastNameMessage.
    AddPerson wywołuje się poprawnie, jednak pozostała handlery nie są wywoływane, mimo poprawnej deklaracji klasy.

    W logu dostaje informacje “Could not find a saga ‘SagaExample’ for the message ‘FirstNameMessage'”.

    Spotkał się ktoś z podobnym problemem?

  2. Napewno masz ta konfiguracje:

    mapper.ConfigureMapping(s => s.PersonId).ToSaga(m => m.SagaId);
    mapper.ConfigureMapping(s => s.PersonId).ToSaga(m => m.SagaId);

    I czy na pewno PersonID jest taki sam dla AddPerson oraz FirstNameMessaage oraz LastNameMessage?

  3. Kombinowałem z tym dobre 2 godziny i nic,
    a po napisaniu poprzedniego komentarza w 2 minuty rozwiązałem problem.

    Zmienyłem typ właściwości SagaId ze string na Guid
    i wszytkko działa.

Leave a Reply

Your email address will not be published.