Code Review: Asynchroniczne strumienie danych

Operacje na plikach mogą być bardzo czasochłonne. Z tego względu, dobrym zwyczajem jest umieszczenie kodu w osobnym wątku. Często popełnianym błędem jest samodzielne tworzenie wątku:

internal static class Sample
{        
   public static void Main()
   {
       var reader = new FileStream(@"c:\setup\1.txt", FileMode.Open);
       Task.Factory.StartNew(()=>ReadAsync(reader));
   }
   private static void ReadAsync(Stream reader)
   {
       byte[]buffer=new byte[100];
       reader.Read(buffer, 0, 100);
       reader.Close();
   }
}

Pomijam już kwestie obsługi błędów w powyższym kodzie oraz zamykanie strumienia. Skupmy się wyłącznie na wielowątkowości.  Dlaczego powyższy kod jest tak bardzo zły? Podobnym błędem byłoby stworzenie własnej delegaty oraz wywołanie na niej BeginInvoke. Prześledźmy jak wygląda synchroniczne wywołanie operacji na strumieniach w Windows:

  1. Użytkownik wywołuje metodę FileStream.Read.
  2. FileStream to tak naprawdę wrapper na zasoby niezarządzane. Pod spodem kryje się zwykła funkcja Win32 (ReadFile).
  3. ReadFile (Win32) stworzy strukturę IRP (Input\Output Request Package). Zawiera ona informacje o tym co chcemy przeczytać (offset, length itp.).
  4. Następnie Kernel przekazuje IRP do danego urządzenia. Każde urządzenie ma kolejkę IRP. Po jakimś czasie, przekazany IRP zostanie zdjęty z kolejki a dane zostaną przeczytane. W tym momencie, wywołanie synchroniczne zostanie uśpione przez Windows. Jest to bardzo korzystne, ponieważ w momencie umieszczenia IRP, Windows wie, że wątek nie ma nic do roboty i go usypia. Niestety wciąż marnujemy zasoby na call stack, ponieważ w końcu taki wątek musi zostać potem obudzony.

Co jest więc złego w powyższym kodzie? Tworzymy nowy wątek ale tak naprawdę będzie on uśpiony i będzie pochłaniał zasoby. Należy pamiętać, że czasami urządzenie może być zajęte i przeczytanie danych zajmie trochę czasu. Szczególnie gdy mamy do czynienia ze strumieniem sieciowym (NetworkStream) albo danymi na np. DVD. Strumienie są bardzo generycznym pojęciem.

Prawidłowa implementacja asynchronicznych operacji zawsze powinna wykorzystywać dostępne metody typu BeginRead. Przyjrzyjmy się jednak kolejnemu rozwiązaniu, które wciąż jest niepoprawne:

internal static class Sample
{
   private static FileStream _reader;

   public static void Main()
   {
       _reader = new FileStream(@"c:\setup\1.txt", FileMode.Open);
       byte[] buffer = new byte[100];
       _reader.BeginRead(buffer, 0, 100, Callback,buffer);
       Console.ReadKey();
   }

   private static void Callback(IAsyncResult ar)
   {
       int result=_reader.EndRead(ar);
       byte[] buffer = (byte[]) ar.AsyncState;
       _reader.Dispose();
       _reader = null;
   }       
}

Dlaczego wciąż jest to złe? Musimy koniecznie przekazać flagę FileOptions.Asynchronous w momencie gdy jest tworzony strumień:

internal static class Sample
{
   private static FileStream _reader;

   public static void Main()
   {
       _reader = new FileStream(@"c:\setup\1.txt", FileMode.Open,FileOptions.Asynchronous);
       byte[] buffer = new byte[100];
       _reader.BeginRead(buffer, 0, 100, Callback,buffer);
       Console.ReadKey();
   }

   private static void Callback(IAsyncResult ar)
   {
       int result=_reader.EndRead(ar);
       byte[] buffer = (byte[]) ar.AsyncState;
       _reader.Dispose();
       _reader = null;
   }       
}

Należy zawsze przekazać powyższą flagę ponieważ bez niej, .NET symuluje tylko asynchroniczne zachowanie. W rzeczywistości zostanie stworzony wątek, który będzie marnował zasoby aż do momentu przetworzenia zapytania. Z powyższą flagą, w momencie wysłania IRP, wątek kończy swoje działanie. Następnie, gdy urządzenie przeczyta dane, Kernel\sterownik urządzenia wywoła przekazany callback. Żaden dodatkowy wątek nie musi zostać tworzony – dla strumieni istnieje specjalna infrastruktura zaimplementowana przez Windows i sterownik urządzenia.

13 thoughts on “Code Review: Asynchroniczne strumienie danych”

  1. Task.Factory.StartNew(…) może, ale nie musi tworzyć nowego wątku. Wydaje mi się, że mieszasz pojęcie asynchroniczności i wielowątkowości.

  2. @Bartek:
    Nie wazne, ze moze byc to watek z puli. Wazne, ze jest on po prostu marnowany.

  3. @Bartek:
    A co do roznicy miedzy async and multi-threading.
    Post zostal nazwany asynchroniczne strumienie danych.
    Problem w tym, ze skutek asynchronicznego programowania i wielowatkowego moze wygladac tak samo i czesto popelniane sa bledy. Jednak w przypadku asynchronicznego mozna polegac na zdarzeniach, ktore nie powoduja (tak jak w przypadku Task.Factory.StartNew) stworzenie lub zarezerwowanie watku. Przez watki mozna zasymulowac programowanie async ale zwykle sa lepsze metody jak te przedstawione w poscie.

  4. Bardzo fajny przykład.Jednak czy alternatywnym rozwiązaniem może być:
    private async Task Running()
    {
    await FunkcjaCzytajacaStrumien();
    }
    ?

  5. Mam na myśli to, że asynchroniczne operacje moga byc wykonywane na jednym wątku o ile sprzyjają temu odpowiednie warunki.

  6. @Bartek:
    Zgadza sie. Async jest bardziej jak event-based. Z tym, ze jak tworzysz Task.Factory.New zawsze bedzie tworzony nowy watek lub wykorzystywany juz istniejacy (co tez nie jest czystym async).

  7. @Piotr
    Pod FunkcjaCzytajacaStrumien miałem na myśli cały kod, który jest zawarty u Ciebie w Main

  8. No chyba że zapodasz flagę TaskContinuationOptions.ExecuteSynchronously.
    Z resztą i tak czy siak trzeba to zazwyczaj zapodać by móc jednak coś sensownego potem z tym zrobić(tzn. by wrócić na główny wątek).
    Alternatywą jest wpisanie kolejnego delegata na właściwego Dipatchera, czy Scheduler.

    W sensie…
    Twoje rozwiązanie Piotrze jest jak najbardziej właściwe ale by w życiu codziennym miało zastosowanie to trzeba dodać jakąś formę kontynuacji na głównym wątku. Czy to przez async await I/O. Alb

  9. o przez Reactivowe FromAsyncPattern.
    Czy zwyczajne ContinueWith(… TaskContinuationOptions.ExecuteSynchronously).
    Czy przez dispatchera(kontrolki lub currenta zapisanego do zmiennej)…

  10. No chyba że od razu projektujemy aplikację pod separację wątków, ale to jeszcze nie te czasy…

  11. @Arek Bal:
    Nie bardzo rozumiem w czym problem?Forma kontynuacji?
    Przecież callback zostanie potem wywołany.

  12. Hmm… może jednak “zagupi” jestem. 😀

    Przeczytałem jeszcze raz co napisałeś i w tym wypadku ok… ale..
    czy callback dla wszelkiej maści Async Pattern(Begin/End) będzie zawsze wywołane na wątku na którym poleciało Begin? Coś mnie się wierzyć nie chce. 🙂

Leave a Reply

Your email address will not be published.