Wprowadzenie do współbieżnych kolekcji danych na przykładzie ConcurrentBag

Dziś po długim wprowadzeniu teoretycznym, mającym na celu wyjaśnienie “zaawansowanych” mechanizmów synchronizacji czas przyszedł na pokazanie pierwszej struktury danych. Przed pojawieniem się asynchronicznych kolekcji, najczęściej korzystało się z prostego lock’a jak:

lock(_Sync)
{
    _list.Add(newElement);
}

Rozwiązanie mało wygodne i przede wszystkim niewydajne. Nowe kolekcje zawierają mechanizmy synchronizacji omówione w poprzednich postach takie jak:

  1. SpinLock
  2. SpinWait
  3. SemaphoreSlim
  4. CountdownEvent

Jeśli nie są one znane, zachęcam do przeczytania najpierw poprzednich postów. Jak widać z powyższej listy, synchronizacja kolekcji polega głównie na użyciu mechanizmu Spinning, który dla szybkich operacji jest dużo bardziej wydajny,  ponieważ nie wymaga zmiany kontekstu. W przypadku długotrwałych operacji, mechanizmy synchronizacji są na tyle sprytne, że potrafią przejść ze stanu spinning do waiting. Ponadto pewne struktury danych takie jak ConcurrentQueue(T) albo ConcurrentStack(T) kompletnie nie korzystają z lock’ow – wszystko wykonują za pomocą operacji atomowych Interlocked.

Przejdźmy teraz do pierwszej kolekcji – ConcurrentBag. ConcurrentBag służy do przechowywania dowolnych obiektów i nadaje się do scenariuszy gdzie kolejność przechowywania nie ma znaczenia. W przeciwieństwie do zbiorów danych, możliwe jest przechowywanie kilku takich samych obiektów.  Struktura stanowi więc “torbę” na elementy, która zawiera nieuporządkowane elementy. Wewnętrzna implementacja jest zoptymalizowania pod kątem wielowątkowości. Istnieje wewnętrzna lista ThreadLocalList, która przechowuje oddzielne dane dla każdego wątku. Dzięki temu w większości przypadków nie musimy używać locków czy spinning, ponieważ każdy z wątków operuje na oddzielnej liście. W momencie gdy wątek A, nie maj już elementów na swojej liście wtedy przechodzi do listy innego wątku i tym samym będzie musiała nastąpić synchronizacja (za pomocą spinning a potem jeśli zajdzie potrzeba to zostanie założona zwykła blokada). Z tego względu ConcurrentBag zawiera nieuporządkowane elementy – w końcu nie mamy jednej listy a wszystkie elementy są wewnętrznie rozdzielane na podlisty przyporządkowane do każdego wątku. Zgodnie z MSDN, kolekcja jest optymalizowana dla przypadków gdzie ten sam wątek produkuje i konsumuje te same elementy – dlatego koncepcja podlist jest bardzo wydajna. Zaznaczyłem, że najkorzystniejsza sytuacja jest gdy każdy wątek operuje na własnej liście ThreadLocalList.  W przypadku gdy już wyczerpał wszystkie elementy znajdujące się na niej, wtedy sytuacja się komplikuje. Mimo to, Microsoft wprowadził optymalizacje jeśli w danej liście jest więcej niż 3 elementy. Wtedy pierwszy wątek czyta elementy na początku listy a drugi od końca. Wniosek z tego jest taki, że synchronizacja ma miejsce tylko wtedy gdy jakaś lista ma mniej niż 3 elementów i dwa wątki muszą na tej podliście operować!

Przykład:

ConcurrentBag<int> numbers = new ConcurrentBag<int>();
numbers.Add(5);
numbers.Add(1);

int number1;
numbers.TryTake(out number1);

int number2;
numbers.TryTake(out number2);

Console.WriteLine(number1);
Console.WriteLine(number2);

Prosty przykład pokazuje dwie podstawowe metody: Add do dodawania nowych elementów oraz TryTake do ich zwracania. TryTake zwraca wartość bool określającą czy element udało się zwrócić z lity. Z kolei za pomocą argumentu OUT zwraca obiekt z pojemnika – w naszym przypadku będzie to liczba całkowita. Gdybyśmy wywołali TryTake trzeci raz wtedy zwróciła by ona wartość false a number przyjąłby default ( T ).

TryTake usuwa element z listy. Istnieje metoda TryPeek, która tylko zwraca element, bez jego usunięcia (podgląda go):

ConcurrentBag<int> numbers = new ConcurrentBag<int>();
numbers.Add(5);
numbers.Add(1);            

int number1;
numbers.TryPeek(out number1);

int number2;
numbers.TryPeek(out number2);

Console.WriteLine(number1);
Console.WriteLine(number2);

Kod zwróci dwa razy tą samą wartość  1 ponieważ w przeciwieństwie do TryTake, TryPeek nie modyfikuje listy.

Add, TryTake oraz TryPeek to najważniejsze metody ConcurrentBag. Oprócz tego istnieją dwie przydatne właściwości:

  1. IsEmpty
  2. Count

Umożliwiają sprawdzenie czy lista jest pusta oraz zwrócenie liczby aktualnie przechowanych elementów. Należy jednak uważać na Count np.:

if(bag.Count == 0)
{

}

Powyższy kod jest niebezpieczny.  Istnieje ryzyko, że pojemnik zostanie zmodyfikowany przez inny wątek natychmiast po zwróceniu liczby elementów (przed porównaniem z zero). Z tego względu wprowadzono właściwość IsEmpty, która zwraca liczbę elementów a następnie porównuje ją z wartością zero za pomocą thread-safe operacji.Mimo to, nie mamy gwarancji, że po zwróceniu IsEmpty coś nie będzie dodane tym samym czyniąc naszą logikę błędną.

ConcurrentBag implementuje interfejs IEnumerable , zatem można użyć pętli foreach:

ConcurrentBag<int> numbers = new ConcurrentBag<int>();
numbers.Add(5);
numbers.Add(1);

foreach (var number in numbers)
{
    Console.WriteLine(number);
}

Ponadto nie musimy się martwić czy numbers zostało zmodyfikowane przez inny wątek. Operacja jest w pełni thread-safe. Jeśli byłaby to kolekcja List wtedy modyfikacja kolekcji spowodowałby wyjątek. Należy zaznaczyć, że elementy dodane w trakcie wykonywania foreach nie będą widziane. Innymi słowy, w momencie rozpoczęcia foreach, wykonywana jest kopia elementów. Dzięki temu mamy gwarancję, że nic z tymi elementami w trakcie nie stanie się, ale niestety nowych elementów nie będziemy widzieć – wyłącznie te, która istniały już w momencie wykonywania wspomnianej kopii.

One thought on “Wprowadzenie do współbieżnych kolekcji danych na przykładzie ConcurrentBag”

Leave a Reply

Your email address will not be published.