Stream's in Dart Part 3 StreamController

Photo by Paweł Czerwiński on Unsplash
Im zweiten Post zum Thema Stream's hatten wir ja ein Diagramm, welches die "Welt der Stream's" in Dart dargestellt hat.
Wir haben ausserdem gesehen, das wir mit async, async* und await Streams programmieren können, ohne die Library Funktionen direkt benutzen zu müssen.

Streams und Future's in Dart sind nahe beieinander liegende Themen. Man muss aufpassen, das keine Verwirrung entsteht. Stellen wir einen Vergleich an:

Sie sind ähnlich da.. :
Beide arbeiten asynchron (Stream's meistens, nicht immer).
Beide tragen einen potenziellen Wert.

Sie sind sich nicht ähnlich da..:
Ein Stream eine Kombination von Future's ist
Ein Future nur immer eine Antwort hat, ein Stream hat in der Regel eine nicht begrenzte Anzahl von Antworten.




StreamController


Beginnen wir mit der Funktion erschaffeStream(...).Der komplette Code am Schluss, hier zuerst einen Part nach dem anderen:


  StreamController<num> meinController = new StreamController<num>(

       onListen: () => print("Wir hören zu"),
       onPause: () => print("Eine Pause muss manchmal auch sein"),
       onResume: () => print("Pause vorbei.Weiter.."),
       onCancel: () => print("Keine Lust mehr. Schluss"),
       sync: false

  );

Der Code oben ist nicht schwer zu verstehen. onListen, onPause, onResume,onCancel erhalten von uns Callback Funktionen, die dann ausgeführt werden, wenn das entsprechende Ereignis eingetretten ist.



 num i = 0;
  Future.doWhile((){
    meinController.add(i++);
    if(i==5){
      meinController.addError("Fehler bei dem $i Element");
    }
    if(i==7){
      meinController.close();
      return false;
    }
    return true;
  });

In diesem Code oben sehen wir einen interessanten Einsatz von Future. Mit Future.doWhile wird diese Schleife solange ausgeführt, bis false retourniert wird. In unserem Fall bricht der Code bei dem 7 Element ab.
Bei 5 wird ein Fehler generiert, aber der Code wird fortgesetzt. und retourniert true.



import 'dart:async';

Stream<num> erschaffeStream(){

  StreamController<num> meinController = new StreamController<num>(

       onListen: () => print("Wir hören zu"),
       onPause: () => print("Eine Pause muss manchmal auch sein"),
       onResume: () => print("Pause vorbei.Weiter.."),
       onCancel: () => print("Keine Lust mehr. Schluss"),
       sync: false

  );
  num i = 0;
  Future.doWhile((){
    meinController.add(i++);
    if(i==5){
      meinController.addError("Fehler bei dem $i Element");
    }
    if(i==7){
      meinController.close();
      return false;
    }
    return true;
  });
  return meinController.stream;
}

Oben der gesamte Code für erschaffeStream(...). Es bleibt nur zu ergänzen, das am Schluss meinController.stream an die aufrufende Ebene zurückgegeben wird.

Zuerst wird also ein StreamController Objekt geschaffen. Mit dessen Hilfe, meinController.add, senden wir Daten an den Strom, generieren bei dem 5 Element einen Fehler.
Bei dem 7 Element schließen wir den Strom mittels meinController.close.



StreamSubscription<num> erschaffeAbbo(Stream<num> meinStream){
 StreamSubscription abboNent;
 abboNent = meinStream.listen((num daten){
  print('onData: $daten');
  if(daten==3){
    abboNent.pause(new Future.delayed(Duration(milliseconds: 500),()=>'ok'));
  }
 },
 onError: (error)=>print("onError: $error"),
 onDone: ()=>print("Fertig")
 );
return abboNent;
}

Oben sehen wir die Funktion erschaffeAbbo(...) . Mit meinSream.listen(...) hören wir nun auf den Stream. Die eintreffenden Daten werden in die Konsole gedruckt.
Nach dem das dritte Element eingetroffen ist, wird eine Pause generiert. Dafür wird wiederum ein Future benutzt. onError druckt die Fehlermeldung, die wir im StreamController hinterlegt haben in die Konsole. Zum Schluss wird onDone verwendet, um eine Zeichenkette in die Konsole zu drucken.
Man beachte, das in der Zeile abboNent.pause am Schluss eine anonyme Funktion steht: ()=>'ok' steht. Sie dient hier nur als Platzhalter. Future.delayed hat einen Rückgabewert, dieser wird in diesem Code aber nicht verwendet.

Zum Abschluss hier der gesamte Code:


import 'dart:async';

Stream<num> erschaffeStream(){

  StreamController<num> meinController = new StreamController<num>(

       onListen: () => print("Wir hören zu"),
       onPause: () => print("Eine Pause muss manchmal auch sein"),
       onResume: () => print("Pause vorbei.Weiter.."),
       onCancel: () => print("Keine Lust mehr. Schluss"),
       sync: false

  );
  num i = 0;
  Future.doWhile((){
    meinController.add(i++);
    if(i==5){
      meinController.addError("Fehler bei dem $i Element");
    }
    if(i==7){
      meinController.close();
      return false;
    }
    return true;
  });
  return meinController.stream;
}

StreamSubscription<num> erschaffeAbbo(Stream<num> meinStream){
 StreamSubscription abboNent;
 abboNent = meinStream.listen((num daten){
  print('onData: $daten');
  if(daten==3){
    abboNent.pause(new Future.delayed(Duration(milliseconds: 500),()=>'ok'));
    abboNent.resume();
  }
 },
 onError: (error)=>print("onError: $error"),
 onDone: ()=>print("Fertig")
 );
return abboNent;
}

void main(){
  Stream<num> meinStream = erschaffeStream();
  StreamSubscription<num> streamAbbo = erschaffeAbbo(meinStream);
}

Die main(...) Funktion erzeugt zuerst den Stream und anschliessend ein StreamSubscription Objekt.
Die eigentliche Arbeit erledigen die Funktionen. Das Resultat in der Konsole :


Terminal Output
Terminal Output

Das war es für diesen Post. Bis bald !

Kommentare

Beliebte Posts aus diesem Blog

Flutter -- ohne Dart geht es nicht 2 -- einfache Variablen Typen

Material Design in Flutter Teil 2

Dart Final Const