Producent-konsument w C# – BlockingCollection

BlockingCollection jest specjalną kolekcją danych, przygotowaną do implementacji wzorca producent-konsument. Nakład pracy do implementacji tego wzorca jest minimalny z BlockingCollection. Nie musimy martwić się o synchronizację, sekcję krytyczną czy deadlock. Zacznijmy od razu od przykładu.
Producent będzie wyglądać następująco:

       private static void Produce(BlockingCollection<int> buffer)
        {
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine("Producing {0}", i);
                Thread.Sleep(10);

                buffer.Add(i);
            }

            buffer.CompleteAdding();
        }

Jak widzimy, implementacja producenta to nic innego jak dodawanie danych do kolekcji. Metoda Add jest thread-safe więc nie musimy używać lock. Ponadto robi to bardzo optymalnie, ponieważ nie jest to prosty mechanizm polegający po prostu na wstawieniu lock’a. Zaglądając do implementacji Add, zobaczymy między innymi spinning. BlockingCollection należy to tzw. concurrent collections o których już pisałem na blogu. Oznacza to, że jakiekolwiek operacje są zaimplementowane w ten sposób, aby unikać blokad. Zostało to osiągnięte na poziomie projektu (np. kilka mini-kolekcji w środku dla różnych wątków), jak i używania spinningu, gdy wiadomo, że zbyt długo nie będzie trzeba czekać.

Metoda CompleteAdding kończy produkcję i konsumenci nie będą już dłużej czekać. Musimy ją wywołać na końcu ponieważ w przeciwnym wypadku, konsumenci będą myśleli, że produkcja ciągle trwa i należy wciąż czekać.
Przyjrzyjmy się teraz konsumpcji:

        private static void Consume(BlockingCollection<int> buffer)
        {
            foreach (var i in buffer.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming {0}.", i);
                Thread.Sleep(20);
            }
        }

Kluczem jest metoda GetConsumingEnumerable. W bezpieczny sposób usuwa one dane z bufora. Jeśli w buforze nic nie ma po prostu wątek będzie blokowany. Iterator zakończy dopiero działanie, gdy producent wywoła CompleteAdding. W przeciwnym wypadku, foreach będzie zdejmował dane z kolekcji, lub blokował wywołanie w oczekiwaniu na więcej danych. Jeśli zajrzyjmy do implementacji wewnętrznej, znowu znajdziemy semafory i SpinWait.

Tak naprawdę, do najprostszej implementacji nic więcej już nie potrzebujemy. Całość wygląda zatem następująco:

class Program
    {
        static void Main(string[] args)
        {
            var buffer=new BlockingCollection<int>();

            var producerTask = Task.Run(() => Produce(buffer));
            var consumeTask = Task.Run(() => Consume(buffer));

            Task.WaitAll(producerTask, consumeTask);
        }

        private static void Produce(BlockingCollection<int> buffer)
        {
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine("Producing {0}", i);
                Thread.Sleep(10);

                buffer.Add(i);
            }

            buffer.CompleteAdding();
        }

        private static void Consume(BlockingCollection<int> buffer)
        {
            foreach (var i in buffer.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming {0}.", i);
                Thread.Sleep(20);
            }
        }
    }

W praktyce jednak trzeba rozważyć kilka innych “drobiazgów”. Co jeśli wyjątek zdarzy się podczas konsumpcji danych? Producent wciąż będzie generował dane, co przecież zwykle nie ma sensu i spowoduje memory leak. Dlatego lepiej napisać obsługę błędów:

       private static void Consume(BlockingCollection<int> buffer)
        {
            try
            {
                foreach (var i in buffer.GetConsumingEnumerable())
                {
                    Console.WriteLine("Consuming {0}.", i);
                    Thread.Sleep(20);
                    throw new Exception();
                }
            }
            catch
            {
                buffer.CompleteAdding();
                throw;
            }
        }

W momencie wystąpienia błędu, wywołujemy CompleteAdding, co spowoduje, że próba dodania nowych danych przez producenta zakończy się wyjątkiem InvalidOperationException i zakończeniem produkcji.
Analogicznie, warto dostać klauzule try-finally w producencie:

        private static void Produce(BlockingCollection<int> buffer)
        {
            try
            {
                for (int i = 0; i < 100; i++)
                {
                    Console.WriteLine("Producing {0}", i);
                    Thread.Sleep(10);

                    buffer.Add(i);
                }
            }
            finally
            {
                buffer.CompleteAdding();
            }

        }

W przypadku producenta, chcemy wywołać CompleteAdding zarówno w przypadku powodzenia (aby konsument już dłużej nie czekał), jak i wyjątku.
CompleteAdding tak naprawdę korzysta z CancellationToken, który jest znany nam z TPL. Prawdopodobnie warto również dodać obsługę wyjątków InvalidOperationException, tak aby dwukrotnie nie wywoływać CompleteAdding.

Inna bardzo ważna obserwacja to przypadek, gdy konsument jest dużo wolniejszy niż producent. Załóżmy, że wyprodukowanie zajmuje jedną sekundę, a konsumpcja 10. W tym przypadku, po długim przetwarzaniu możemy mieć do czynienia z ogromną alokacją pamięci ponieważ producent będzie ciągle dodawał dane, a konsument nie nadąży z usuwaniem ich.

BlockingCollection w bardzo prosty sposób rozwiązuje ten problem poprzez wprowadzenie maksymalnego limitu “porcji” w kolekcji. Wystarczy, w konstruktorze przekazać maksymalną pojemność:

var buffer = new BlockingCollection<int>(boundedCapacity: 10);

Po przekroczeniu limitu, metoda Add nie wyrzuci wyjątku, a po prostu będzie blokowała wywołanie za pomocą wspomnianych wcześniej technik (spinning, locking etc).

5 thoughts on “Producent-konsument w C# – BlockingCollection”

  1. Wygląda to jak typowy stos tylko odporny na wielowątkowość.

    PS. Można prosić o zmianę tematu WordPress-a z większymi fontami, bo na dzisiejszych desktopach wszystko jest w skali mikro 🙂

Leave a Reply

Your email address will not be published.