Analyse d'énormes fichiers journaux dans Node.js - lus ligne par ligne

126

J'ai besoin de faire une analyse de grands fichiers journaux (5-10 Go) dans Javascript / Node.js (j'utilise Cube).

La ligne de connexion ressemble à quelque chose comme:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

Nous devons lire chaque ligne, faire une analyse (par exemple, supprimer 5, 7et SUCCESS), puis pomper ces données dans Cube ( https://github.com/square/cube ) en utilisant leur client JS.

Premièrement, quelle est la manière canonique dans Node de lire dans un fichier, ligne par ligne?

Cela semble être une question assez courante en ligne:

Beaucoup de réponses semblent pointer vers un tas de modules tiers:

Cependant, cela semble être une tâche assez basique - il existe sûrement un moyen simple dans stdlib de lire dans un fichier texte, ligne par ligne?

Deuxièmement, je dois ensuite traiter chaque ligne (par exemple, convertir l'horodatage en un objet Date et extraire les champs utiles).

Quelle est la meilleure façon de faire cela, en maximisant le débit? Y a-t-il un moyen qui ne bloque pas la lecture de chaque ligne ou son envoi à Cube?

Troisièmement - je suppose que l'utilisation de séparations de chaînes, et l'équivalent JS de contains (IndexOf! = -1?) Sera beaucoup plus rapide que les expressions rationnelles? Quelqu'un a-t-il eu beaucoup d'expérience dans l'analyse de quantités massives de données texte dans Node.js?

Bravo, Victor

Victorhooi
la source
J'ai construit un analyseur de journal dans le nœud qui prend un tas de chaînes de regex avec des `` captures '' intégrées et des sorties vers JSON. Vous pouvez même appeler des fonctions sur chaque capture si vous souhaitez effectuer un calcul. Il pourrait faire ce que vous voulez: npmjs.org/package/logax
Jess

Réponses:

209

J'ai cherché une solution pour analyser les très gros fichiers (gbs) ligne par ligne en utilisant un flux. Toutes les bibliothèques et exemples tiers ne répondaient pas à mes besoins car ils ne traitaient pas les fichiers ligne par ligne (comme 1, 2, 3, 4 ..) ni ne lisaient le fichier entier en mémoire

La solution suivante peut analyser des fichiers très volumineux, ligne par ligne, en utilisant stream & pipe. Pour les tests, j'ai utilisé un fichier de 2,1 Go avec 17 000 000 enregistrements. L'utilisation de la RAM n'a pas dépassé 60 mb.

Tout d'abord, installez le package de flux d'événements :

npm install event-stream

Ensuite:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

entrez la description de l'image ici

S'il vous plaît laissez-moi savoir comment ça se passe!

Gérard
la source
6
FYI, ce code n'est pas synchrone. C'est asynchrone. Si vous insérez console.log(lineNr)après la dernière ligne de votre code, il n'affichera pas le nombre de lignes final car le fichier est lu de manière asynchrone.
jfriend00
4
Merci, c'était la seule solution que j'ai pu trouver qui s'est réellement arrêtée et a repris quand c'était censé le faire. Readline ne l'a pas fait.
Brent
3
Exemple génial, et il s'arrête réellement. De plus, si vous décidez d'arrêter la lecture du fichier tôt, vous pouvez utilisers.end();
zipzit
2
A travaillé comme un charme. Utilisé pour indexer 150 millions de documents dans l'index elasticsearch. readlinemodule est une douleur. Il ne se met pas en pause et provoquait un échec à chaque fois après 40 à 50 millions. Perdu une journée. Merci beaucoup pour la réponse. Celui-ci fonctionne parfaitement
Mandeep Singh
3
event-stream a été compromis: medium.com/intrinsic / ... mais 4+ est apparemment sûr blog.npmjs.org/post/180565383195
John Vandivier
72

Vous pouvez utiliser le readlinepackage intégré , voir la documentation ici . J'utilise stream pour créer un nouveau flux de sortie.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});

Le traitement des gros fichiers prendra un certain temps. Dites si cela fonctionne.

user568109
la source
2
Comme écrit, l'avant-dernière ligne échoue car cubestuff n'est pas défini.
Greg
2
En utilisant readline, est-il possible de mettre en pause / reprendre le flux de lecture pour effectuer des actions asynchrones dans la zone «faire des choses»?
jchook
1
@jchook readlineme posait beaucoup de problèmes lorsque j'ai essayé de mettre en pause / reprendre. Cela ne met pas correctement le flux en pause, ce qui crée beaucoup de problèmes si le processus en aval est plus lent
Mandeep Singh
31

J'ai vraiment aimé la réponse @gerard qui mérite en fait d'être la bonne réponse ici. J'ai apporté quelques améliorations:

  • Le code est dans une classe (modulaire)
  • L'analyse est incluse
  • La possibilité de reprendre est donnée à l'extérieur au cas où un travail asynchrone serait enchaîné à la lecture du CSV comme l'insertion dans la base de données ou une requête HTTP
  • Lecture en blocs / tailles de lots que l'utilisateur peut déclarer. Je me suis également occupé de l'encodage dans le flux, au cas où vous auriez des fichiers dans un encodage différent.

Voici le code:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

Donc, en gros, voici comment vous allez l'utiliser:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

J'ai testé cela avec un fichier CSV de 35 Go et cela a fonctionné pour moi et c'est pourquoi j'ai choisi de le construire sur la réponse de @gerard , les commentaires sont les bienvenus.

ambodi
la source
combien de temps cela a pris?
Z. Khullah
Apparemment, cela manque d' pause()appel, n'est-ce pas?
Vanuan
En outre, cela n'appelle pas la fonction de rappel à la fin. Ainsi, si batchSize vaut 100, la taille des fichiers est 150, seuls 100 éléments seront traités. Ai-je tort?
Vanuan
16

J'ai utilisé https://www.npmjs.com/package/line-by-line pour lire plus de 1000000 lignes à partir d'un fichier texte. Dans ce cas, une capacité occupée de RAM était d'environ 50 à 60 mégaoctets.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
Eugène Ilyushin
la source
«ligne par ligne» est plus efficace en mémoire que la réponse sélectionnée. Pour 1 million de lignes dans un csv, la réponse sélectionnée avait mon processus de nœud dans les faibles 800 mégaoctets. En utilisant `` ligne par ligne '', il était systématiquement dans les faibles 700s. Ce module maintient également le code propre et facile à lire. Au total, j'aurai besoin de lire environ 18 millions pour que chaque mb compte!
Néo
c'est dommage que cela utilise sa propre 'ligne' d'événement au lieu du 'morceau' standard, ce qui signifie que vous ne pourrez pas utiliser 'pipe'.
Rene Wooller
Après des heures de test et de recherche, c'est la seule solution qui s'arrête réellement lr.cancel() méthode. Lit les 1000 premières lignes d'un fichier 5Gig en 1 ms. Impressionnant!!!!
Perez Lamed van Niekerk le
6

En plus de lire le gros fichier ligne par ligne, vous pouvez également le lire morceau par morceau. Pour en savoir plus, consultez cet article

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
Kris Roofe
la source
Se pourrait-il que ce qui suit devrait être une comparaison au lieu d'une affectation if(bytesRead = chunkSize):?
Stefan Rein le
4

La documentation Node.js offre un exemple très élégant utilisant le module Readline.

Exemple: lire le flux de fichiers ligne par ligne

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

Remarque: nous utilisons l'option crlfDelay pour reconnaître toutes les instances de CR LF ('\ r \ n') comme un seul saut de ligne.

Jaime Gómez
la source
3

J'ai encore eu le même problème. Après avoir comparé plusieurs modules qui semblent avoir cette fonctionnalité, j'ai décidé de le faire moi-même, c'est plus simple que je ne le pensais.

gist: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

Il couvre le dossier ouvert dans une fermeture, que fetchBlock() retourné récupérera un bloc du fichier, finira par diviser en tableau (traitera le segment de la dernière récupération).

J'ai défini la taille du bloc sur 1024 pour chaque opération de lecture. Cela peut avoir des bogues, mais la logique du code est évidente, essayez-la vous-même.

juger
la source
2

node-byline utilise des flux, donc je préférerais celui-là pour vos gros fichiers.

pour vos conversions de date, j'utiliserais moment.js .

pour maximiser votre débit, vous pourriez envisager d'utiliser un cluster logiciel. il y a quelques modules sympas qui enveloppent assez bien le module de cluster natif du nœud. j'aime cluster-master d'isaacs. par exemple, vous pouvez créer un cluster de x workers qui calculent tous un fichier.

pour l'analyse comparative entre les divisions et les expressions régulières, utilisez benchmark.js . je ne l'ai pas testé jusqu'à présent. benchmark.js est disponible en tant que module de nœud

ici et maintenant78
la source
2

Sur la base de cette réponse aux questions, j'ai implémenté une classe que vous pouvez utiliser pour lire un fichier de manière synchrone ligne par ligne avec fs.readSync(). Vous pouvez faire cette "pause" et "reprendre" en utilisant une Qpromesse ( jQuerysemble nécessiter un DOM donc je ne peux pas l'exécuter avec nodejs):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}
Benvorth
la source
0
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
  [s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
  protected file: string;
  protected csvOptions = {
    delimiter: ',',
    headers: true,
    ignoreEmpty: true,
    trim: true
  };
  constructor(file: string, csvOptions = {}) {
    if (!fs.existsSync(file)) {
      throw new Error(`File ${file} not found.`);
    }
    this.file = file;
    this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
  }
  public read(callback: RowCallBack): Promise < Array < object >> {
    return new Promise < Array < object >> (resolve => {
      const readStream = fs.createReadStream(this.file);
      const results: Array < any > = [];
      let index = 0;
      const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
        index++;
        results.push(await callback(data, index));
      }).on('error', (err: Error) => {
        console.error(err.message);
        throw err;
      }).on('end', () => {
        resolve(results);
      });
      readStream.pipe(csvStream);
    });
  }
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
  const reader = new CSVReader('./database/migrations/csv/users.csv');
  const users = await reader.read(async data => {
    return {
      username: data.username,
      name: data.name,
      email: data.email,
      cellPhone: data.cell_phone,
      homePhone: data.home_phone,
      roleId: data.role_id,
      description: data.description,
      state: data.state,
    };
  });
  console.log(users);
})();
Raza
la source
-1

J'ai créé un module de nœud pour lire du texte ou JSON de gros fichiers de manière asynchrone. Testé sur de gros fichiers.

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');

module.exports = FileReader;

function FileReader(){

}

FileReader.prototype.read = function(pathToFile, callback){
    var returnTxt = '';
    var s = fs.createReadStream(pathToFile)
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        //console.log('reading line: '+line);
        returnTxt += line;        

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(){
        console.log('Error while reading file.');
    })
    .on('end', function(){
        console.log('Read entire file.');
        callback(returnTxt);
    })
);
};

FileReader.prototype.readJSON = function(pathToFile, callback){
    try{
        this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
    }
    catch(err){
        throw new Error('json file is not valid! '+err.stack);
    }
};

Enregistrez simplement le fichier sous le nom file-reader.js et utilisez-le comme ceci:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
Eyal Zoref
la source
7
On dirait que vous avez copié la réponse de Gérard. Vous devriez donner à Gérard le mérite de la partie que vous avez copiée.
Paul Lynch