Diriger un flux vers s3.upload ()

89

J'utilise actuellement un plugin node.js appelé s3-upload-stream pour diffuser de très gros fichiers sur Amazon S3. Il utilise l'API en plusieurs parties et pour la plupart, cela fonctionne très bien.

Cependant, ce module montre son âge et j'ai déjà dû y apporter des modifications (l'auteur l'a également déconseillé). Aujourd'hui, j'ai rencontré un autre problème avec Amazon, et j'aimerais vraiment suivre la recommandation de l'auteur et commencer à utiliser le aws-sdk officiel pour effectuer mes téléchargements.

MAIS.

Le SDK officiel ne semble pas prendre en charge le piping to s3.upload(). La nature de s3.upload est que vous devez transmettre le flux lisible comme argument au constructeur S3.

J'ai environ 120 modules de code utilisateur qui effectuent divers traitements de fichiers, et ils sont indépendants de la destination finale de leur sortie. Le moteur leur remet un flux de sortie inscriptible canalisable, et ils y canalisent. Je ne peux pas AWS.S3leur donner un objet et leur demander de l'appeler upload()sans ajouter de code à tous les modules. La raison pour laquelle j'ai utilisé s3-upload-streamétait parce qu'il supportait la tuyauterie.

Existe-t-il un moyen de faire de aws-sdk s3.upload()quelque chose vers lequel je peux diriger le flux?

womp
la source

Réponses:

132

Enveloppez la upload()fonction S3 avec le stream.PassThrough()flux node.js.

Voici un exemple:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
Casey Benko
la source
2
Génial, cela a résolu mon très laid hack = -) Pouvez-vous expliquer ce que fait réellement stream.PassThrough ()?
mraxus
6
Votre flux PassThrough se ferme-t-il lorsque vous faites cela? Je passe un sacré moment à propager la clôture dans s3.upload pour atteindre mon flux PassThrough.
four43
7
la taille du fichier téléchargé est de 0 octet. Si je dirige les mêmes données du flux source vers le système de fichiers, tout fonctionne bien. Une idée?
Radar155
3
Un flux passthrough prendra des octets écrits et les produira. Cela vous permet de renvoyer un flux accessible en écriture que aws-sdk lira lorsque vous y écrivez. Je retournerais également l'objet de réponse de s3.upload () car sinon, vous ne pouvez pas vous assurer que le téléchargement se termine.
reconbot
1
d'où le s3paramètre à l'intérieur du tuyau et streamviennent-ils?
Blackjack le
94

Réponse un peu tardive, cela pourrait aider quelqu'un d'autre, espérons-le. Vous pouvez renvoyer à la fois le flux accessible en écriture et la promesse, afin de pouvoir obtenir des données de réponse une fois le téléchargement terminé.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

Et vous pouvez utiliser la fonction comme suit:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

Vous pouvez maintenant vérifier la promesse:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

Ou comme stream.pipe()retourne stream.Writable, la destination (variable writeStream ci-dessus), permettant une chaîne de tubes, nous pouvons également utiliser ses événements:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });
Ahmet Cetin
la source
Cela a l'air génial, mais de mon côté,
j'obtiens
vient de répondre à votre question. J'espère que cela aide.
Ahmet Cetin le
48

Dans la réponse acceptée, la fonction se termine avant que le téléchargement ne soit terminé et, par conséquent, elle est incorrecte. Le code ci-dessous conduit correctement à partir d'un flux lisible.

Télécharger la référence

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

Vous pouvez également aller plus loin et afficher les informations de progression en utilisant ManagedUploadcomme tel:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

Référence ManagedUpload

Une liste des événements disponibles

tsuz
la source
1
aws-sdk propose désormais des promesses intégrées à la version 2.3.0+, vous n'avez donc plus à les lever. s3.upload (params) .promise (). then (données => données) .catch (erreur => erreur);
DBrown
1
@DBrown Merci pour le pointeur! J'ai mis à jour la réponse en conséquence.
tsuz
1
@tsuz, essayer de mettre en œuvre votre solution me donne une erreur:, TypeError: dest.on is not a functionune idée pourquoi?
FireBrand
Qu'est-ce que c'est dest.on? Pouvez-vous donner un exemple? @FireBrand
tsuz
9
Cela dit que la réponse acceptée est incomplète mais qu'elle ne fonctionne pas avec le piping vers s3.upload comme indiqué dans le post mis à jour de @ Womp. Ce serait très utile si cette réponse était mise à jour pour prendre la sortie canalisée de quelque chose d'autre!
MattW
6

Aucune des réponses n'a fonctionné pour moi parce que je voulais:

  • Pipe dans s3.upload()
  • Transférer le résultat de s3.upload()dans un autre flux

La réponse acceptée ne fait pas ce dernier. Les autres s'appuient sur l'API de promesse, qui est lourde à travailler avec des tuyaux de flux.

Ceci est ma modification de la réponse acceptée.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

cortopie
la source
Cela a l'air génial, mais de mon côté,
j'obtiens
5

Solution de type Script:
cet exemple utilise:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

Et fonction asynchrone:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

Appelez cette méthode quelque part comme:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
dzole vladimirov
la source
4

La chose à noter ici dans la réponse la plus acceptée ci-dessus est que: Vous devez renvoyer la passe dans la fonction si vous utilisez un tube comme,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

Sinon, il passera silencieusement au suivant sans lancer d'erreur ou générera une erreur TypeError: dest.on is not a functionselon la façon dont vous avez écrit la fonction

varun bhaya
la source
3

Si cela aide quelqu'un, j'ai pu diffuser du client vers s3 avec succès:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

Le code côté serveur suppose qu'il reqs'agit d'un objet de flux, dans mon cas, il a été envoyé par le client avec des informations de fichier définies dans les en-têtes.

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

Oui, cela brise les conventions, mais si vous regardez l'essentiel, c'est beaucoup plus propre que tout ce que j'ai trouvé en utilisant multer, busboy, etc.

+1 pour le pragmatisme et merci à @SalehenRahman pour son aide.

mattdlockyer
la source
multer, busboy gère les téléchargements multipart / form-data. req en tant que flux fonctionne lorsque le client envoie un tampon en tant que corps à partir de XMLHttpRequest.
André Werlang
Pour clarifier, le téléchargement est effectué depuis le back-end, pas le client, n'est-ce pas?
numX
Oui, c'est "piping" le flux, sur le backend, mais cela vient d'un frontend
mattdlockyer
3

Pour ceux qui se plaignent que lorsqu'ils utilisent la fonction de téléchargement de l'api s3 et qu'un fichier de zéro octet se termine sur s3 (@ Radar155 et @gabo) - j'ai également eu ce problème.

Créez un deuxième flux PassThrough et dirigez simplement toutes les données du premier vers le second et passez la référence à ce second vers s3. Vous pouvez le faire de différentes manières - peut-être une manière sale est d'écouter l'événement "data" sur le premier flux, puis d'écrire ces mêmes données dans le second flux - de même pour l'événement "end" - il suffit d'appeler la fonction de fin sur le deuxième flux. Je ne sais pas s'il s'agit d'un bogue dans l'API aws, la version de node ou un autre problème - mais cela a contourné le problème pour moi.

Voici à quoi cela pourrait ressembler:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
Tim
la source
Cela a également fonctionné pour moi. La fonction de téléchargement S3 ne faisait que «mourir» silencieusement chaque fois qu'un téléchargement en plusieurs parties était utilisé, mais lorsque vous utilisiez votre solution, cela fonctionnait bien (!). Merci! :)
jhdrn
Pouvez-vous nous expliquer pourquoi le deuxième flux est nécessaire?
noob7
1

Suite aux autres réponses et en utilisant le dernier AWS SDK pour Node.js, il existe une solution beaucoup plus propre et plus simple puisque la fonction s3 upload () accepte un flux, en utilisant la syntaxe d'attente et la promesse de S3:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
emich
la source
0

J'utilise KnexJS et j'ai eu un problème avec leur API de streaming. Je l'ai finalement réparé, j'espère que ce qui suit aidera quelqu'un.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();
TestWell
la source
-3

Si vous connaissez la taille du flux, vous pouvez utiliser minio-js pour télécharger le flux comme ceci:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })
Krishna Srinivas
la source