Gestion des erreurs avec les flux node.js

164

Quelle est la bonne façon de gérer les erreurs avec les flux? Je sais déjà qu'il y a un événement «d'erreur» sur lequel vous pouvez écouter, mais je veux en savoir plus sur des situations arbitrairement compliquées.

Pour commencer, que faites-vous lorsque vous voulez faire une simple chaîne de tuyaux:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

Et comment créer correctement l'une de ces transformations pour que les erreurs soient gérées correctement?

Plus de questions connexes:

  • lorsqu'une erreur se produit, qu'arrive-t-il à l'événement «fin»? Ne se fait-il jamais virer? Est-il parfois viré? Cela dépend-il de la transformation / du flux? Quelles sont les normes ici?
  • existe-t-il des mécanismes pour propager les erreurs à travers les tuyaux?
  • les domaines résolvent-ils efficacement ce problème? Les exemples seraient bien.
  • les erreurs qui proviennent d'événements «d'erreur» ont-elles des traces de pile? Quelquefois? Jamais? y a-t-il un moyen d'en obtenir un?
BT
la source
1
Ce n’est pas anodin. Promiseles frameworks le rendent beaucoup plus simple
slezica
27
Malheureusement, les promesses / futurs ne peuvent pas vraiment vous aider avec les flux ...
BT

Réponses:

222

transformer

Les flux de transformation sont à la fois lisibles et inscriptibles, et sont donc de très bons flux «intermédiaires». Pour cette raison, ils sont parfois appelés throughflux. Ils sont similaires à un flux duplex de cette manière, sauf qu'ils fournissent une interface agréable pour manipuler les données plutôt que de simplement les envoyer. Le but d'un flux de transformation est de manipuler les données lorsqu'elles sont acheminées dans le flux. Vous voudrez peut-être faire des appels asynchrones, par exemple, ou dériver quelques champs, remapper certaines choses, etc.


Où vous pourriez mettre un flux de transformation


Pour savoir comment créer un flux de transformation, voir ici et ici . Tout ce que tu dois faire est :

  1. inclure le module de flux
  2. instancier (ou hériter de) la classe Transform
  3. implémenter une _transformméthode qui prend un (chunk, encoding, callback).

Le morceau est vos données. La plupart du temps, vous n'aurez pas à vous soucier de l'encodage si vous travaillez objectMode = true. Le rappel est appelé lorsque vous avez terminé de traiter le bloc. Ce bloc est ensuite poussé vers le flux suivant.

Si vous voulez un bon module d'aide qui vous permettra de faire du stream très facilement, je suggère through2 .

Pour la gestion des erreurs, continuez à lire.

tuyau

Dans une chaîne de tubes, la gestion des erreurs n'est en effet pas triviale. Selon ce fil, .pipe () n'est pas conçu pour transmettre les erreurs. Donc quelque chose comme ...

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

... n'écouterait que les erreurs sur le flux c. Si un événement d'erreur était émis a, cela ne serait pas transmis et, en fait, serait déclenché. Pour faire cela correctement:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

Maintenant, bien que la deuxième méthode soit plus verbeuse, vous pouvez au moins conserver le contexte dans lequel vos erreurs se produisent. C'est généralement une bonne chose.

Une bibliothèque que je trouve utile si vous avez un cas où vous souhaitez uniquement capturer les erreurs à la destination et que vous ne vous souciez pas tellement de l'endroit où cela s'est produit est le flux d'événements .

fin

Lorsqu'un événement d'erreur est déclenché, l'événement de fin ne sera pas déclenché (explicitement). L'émission d'un événement d'erreur mettra fin au flux.

domaines

D'après mon expérience, les domaines fonctionnent très bien la plupart du temps. Si vous avez un événement d'erreur non géré (c'est-à-dire émettant une erreur sur un flux sans écouteur), le serveur peut planter. Maintenant, comme le souligne l'article ci-dessus, vous pouvez envelopper le flux dans un domaine qui devrait correctement détecter toutes les erreurs.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });
  • l'exemple de code ci-dessus provient de ce post

La beauté des domaines est qu'ils conserveront les traces de la pile. Bien que le flux d'événements fasse également un bon travail dans ce domaine.

Pour plus d'informations, consultez le manuel du flux . Assez en profondeur, mais super utile et donne d'excellents liens vers de nombreux modules utiles.

mshell_lauren
la source
C'est vraiment une bonne information, merci! Pourriez-vous ajouter un peu pourquoi vous souhaitez créer un flux de transformation et pourquoi il est lié à ma question?
BT
Bien sûr - même si je pensais que c'était lié depuis que vous avez posé des questions à ce sujet; )
mshell_lauren
1
Poster à ce sujet par isaccs sur Google Groups- nodejs: groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ (pas grokbase)
jpillora
Cette réponse est parfaitement écrite. Je vais étudier la suggestion de domaine - cela semble être le genre de solution que je recherchais.
Point
12
Notez que vous n'avez pas besoin d'envelopper le .on('error')gestionnaire dans une fonction anonyme, c'est a.on('error', function(e){handleError(e)})-à- dire que vous pouvez simplement êtrea.on('error', handleError)
timoxley
28

Si vous utilisez node> = v10.0.0, vous pouvez utiliser stream.pipeline et stream.finished .

Par exemple:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

Voir ce PR github pour plus de discussion.

Shusson
la source
1
Pourquoi utiliseriez-vous finishedcependant, quand a pipelinedéjà un rappel?
Marcos Pereira
4
Vous souhaiterez peut-être gérer les erreurs différemment entre le pipeline et les flux individuels.
shusson le
25

les domaines sont obsolètes. vous n'en avez pas besoin.

pour cette question, les distinctions entre transform ou inscriptible ne sont pas si importantes.

La réponse de mshell_lauren est excellente, mais comme alternative, vous pouvez également écouter explicitement l'événement d'erreur sur chaque flux que vous pensez être une erreur. et réutilisez la fonction de gestionnaire si vous préférez.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

cela empêche la tristement célèbre exception non interceptée si l'un de ces flux déclenche son événement d'erreur

Cardan courbé
la source
3
lol, amusez-vous à gérer 3 événements d'erreur différents et priez pour que celui qui a écrit les 3 bibliothèques de streaming différentes implémente correctement la gestion des erreurs
Alexander Mills
4
@Alex Mills 1) Quel est le problème de la gestion de 3 événements, et pourquoi sont-ils "différents", quand leur type est le même - error, on peut aussi bien alors se contenter du fait que chaque événement est distinct; 2) Quelles bibliothèques de streaming sont écrites ci-dessus, à part la fonctionnalité native de Node.js? et 3) pourquoi est-ce important de savoir comment ils gèrent les événements en interne, alors que cela permet évidemment à quiconque d'attacher des gestionnaires d'erreurs supplémentaires en plus de tout ce qui existe déjà?
amn le
10

Les erreurs de toute la chaîne peuvent être propagées vers le flux le plus à droite à l'aide d'une simple fonction:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

qui peut être utilisé comme:

safePipe(readable, [ transform1, transform2, ... ]);
Gleba
la source
5

.on("error", handler)ne prend en charge que les erreurs de flux, mais si vous utilisez des flux de transformation personnalisés, .on("error", handler)ne détectez pas les erreurs qui se produisent dans la _transformfonction. On peut donc faire quelque chose comme ça pour contrôler le flux des applications: -

thisLe mot-clé en _transformfonction se réfère à Streamlui-même, qui est un EventEmitter. Vous pouvez donc utiliser try catchcomme ci-dessous pour détecter les erreurs et les transmettre plus tard aux gestionnaires d'événements personnalisés.

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

De cette façon, vous pouvez séparer votre logique et vos gestionnaires d'erreurs. En outre, vous pouvez choisir de ne gérer que certaines erreurs et d'en ignorer d'autres.

UPDATE
Alternative: RXJS Observable

Vikas Gautam
la source
4

Utilisez le package multipipe pour combiner plusieurs flux en un seul flux duplex. Et gérez les erreurs en un seul endroit.

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)
Sergey Savenko
la source
1

Utilisez le modèle Node.js en créant une mécanique de flux de transformation et en appelant son rappel doneavec un argument afin de propager l'erreur:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });
Derek
la source
Hmm, alors vous dites que si tous les processeurs de flux étaient construits comme ça, les erreurs se propageraient?
BT le
-2

Try catch ne capturera pas les erreurs qui se sont produites dans le flux, car elles sont levées après que le code appelant soit déjà fermé. vous pouvez vous référer à la documentation:

https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

Mehran
la source
Merci, mais cela ne répond pas du tout à la question.
BT
Me donner un document de 40 pages n'est pas utile. À quoi pensez-vous que je devrais me référer dans cette page géante? Aussi, avez-vous lu ma question? Ma question n'est pas "Est-ce que Try catch fonctionne avec les flux?" Je suis déjà bien conscient que try-catch ne fonctionnera pas avec les erreurs asynchrones, par exemple celles des pipelines de traitement de flux.
BT