Lire le flux deux fois

127

Comment lisez-vous deux fois le même flux d'entrée? Est-il possible de le copier d'une manière ou d'une autre?

J'ai besoin d'obtenir une image sur le Web, de l'enregistrer localement, puis de renvoyer l'image enregistrée. J'ai juste pensé qu'il serait plus rapide d'utiliser le même flux au lieu de démarrer un nouveau flux vers le contenu téléchargé, puis de le relire.

Warpzit
la source
1
Peut-être utiliser marquer et réinitialiser
Vyacheslav Shylkin

Réponses:

114

Vous pouvez utiliser org.apache.commons.io.IOUtils.copypour copier le contenu de InputStream dans un tableau d'octets, puis lire à plusieurs reprises à partir du tableau d'octets à l'aide d'un ByteArrayInputStream. Par exemple:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();

// either
while (needToReadAgain) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    yourReadMethodHere(bais);
}

// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
    bais.reset();
    yourReadMethodHere(bais);
}
Paul Grime
la source
1
Je pense que c'est la seule solution valable car la marque n'est pas prise en charge pour tous les types.
Warpzit
3
@Paul Grime: IOUtils.toByeArray appelle également la méthode de copie de l'intérieur.
Ankit
4
Comme le dit @Ankit, cette solution n'est pas valable pour moi, car l'entrée est lue en interne et ne peut pas être réutilisée.
Xtreme Biker
30
Je sais que ce commentaire est hors du temps, mais, ici, dans la première option, si vous lisez le flux d'entrée sous forme de tableau d'octets, cela ne signifie-t-il pas que vous chargez toutes les données en mémoire? ce qui pourrait être un gros problème si vous chargez quelque chose comme de gros fichiers?
jaxkodex
2
On pourrait utiliser IOUtils.toByteArray (InputStream) pour obtenir un tableau d'octets en un seul appel.
utile
30

Selon la provenance de l'InputStream, vous ne pourrez peut-être pas le réinitialiser. Vous pouvez vérifier si mark()et reset()sont pris en charge en utilisant markSupported().

Si c'est le cas, vous pouvez appeler reset()InputStream pour revenir au début. Sinon, vous devez à nouveau lire InputStream à partir de la source.

Kevin Parker
la source
1
InputStream ne prend pas en charge 'mark' - vous pouvez appeler mark sur un IS mais il ne fait rien. De même, l'appel de reset sur un IS lèvera une exception.
ayahuasca
4
@ayahuasca sous- InputStreamclasses comme BufferedInputStreamsupporte 'mark'
Dmitry Bogdanovich
10

si votre InputStreamsupport en utilisant la marque, vous pouvez alors mark()votre inputStream et ensuite reset(). si votre InputStremne prend pas en charge la marque, vous pouvez utiliser la classe java.io.BufferedInputStream, vous pouvez donc intégrer votre flux dans un BufferedInputStreamcomme celui-ci

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
    bufferdInputStream.mark(some_value);
    //read your bufferdInputStream 
    bufferdInputStream.reset();
    //read it again
wannas
la source
1
Un flux d'entrée mis en mémoire tampon ne peut que revenir à la taille de la mémoire tampon, donc si la source ne rentre pas, vous ne pouvez pas revenir au début.
L. Blanc
@ L.Blanc désolé mais cela ne semble pas correct. Jetez un oeil à BufferedInputStream.fill(), il y a la section "Grow buffer", où la nouvelle taille de tampon est comparée uniquement à marklimitet MAX_BUFFER_SIZE.
eugene82
8

Vous pouvez encapsuler le flux d'entrée avec PushbackInputStream. PushbackInputStream permet de non lu ( « écriture différée ») octets qui ont été déjà lu, de sorte que vous pouvez faire comme ceci:

public class StreamTest {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's wrap it with PushBackInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    ((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3


  }

  private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
    System.out.print("Reading stream: ");

    byte[] buf = new byte[howManyBytes];

    int next = 0;
    for (int i = 0; i < howManyBytes; i++) {
      next = is.read();
      if (next > 0) {
        buf[i] = (byte) next;
      }
    }
    return buf;
  }

  private static void printBytes(byte[] buffer) throws IOException {
    System.out.print("Reading stream: ");

    for (int i = 0; i < buffer.length; i++) {
      System.out.print(buffer[i] + " ");
    }
    System.out.println();
  }


}

Veuillez noter que PushbackInputStream stocke le tampon interne d'octets, donc il crée vraiment un tampon en mémoire qui contient les octets "réécrits".

Connaissant cette approche, nous pouvons aller plus loin et la combiner avec FilterInputStream. FilterInputStream stocke le flux d'entrée d'origine en tant que délégué. Cela permet de créer une nouvelle définition de classe qui permet de " non lire " automatiquement les données originales. La définition de cette classe est la suivante:

public class TryReadInputStream extends FilterInputStream {
  private final int maxPushbackBufferSize;

  /**
  * Creates a <code>FilterInputStream</code>
  * by assigning the  argument <code>in</code>
  * to the field <code>this.in</code> so as
  * to remember it for later use.
  *
  * @param in the underlying input stream, or <code>null</code> if
  *           this instance is to be created without an underlying stream.
  */
  public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
    super(new PushbackInputStream(in, maxPushbackBufferSize));
    this.maxPushbackBufferSize = maxPushbackBufferSize;
  }

  /**
   * Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
   * in the stream
   *
   * @param buffer the destination buffer to which read the data
   * @param offset  the start offset in the destination <code>buffer</code>
   * @aram length how many bytes to read from the stream to buff. Length needs to be less than
   *        <code>maxPushbackBufferSize</code> or IOException will be thrown
   *
   * @return number of bytes read
   * @throws java.io.IOException in case length is
   */
  public int tryRead(byte[] buffer, int offset, int length) throws IOException {
    validateMaxLength(length);

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int bytesRead = 0;

    int nextByte = 0;

    for (int i = 0; (i < length) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        buffer[offset + bytesRead++] = (byte) nextByte;
      }
    }

    if (bytesRead > 0) {
      ((PushbackInputStream) in).unread(buffer, offset, bytesRead);
    }

    return bytesRead;

  }

  public byte[] tryRead(int maxBytesToRead) throws IOException {
    validateMaxLength(maxBytesToRead);

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int nextByte = 0;

    for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        baos.write((byte) nextByte);
      }
    }

    byte[] buffer = baos.toByteArray();

    if (buffer.length > 0) {
      ((PushbackInputStream) in).unread(buffer, 0, buffer.length);
    }

    return buffer;

  }

  private void validateMaxLength(int length) throws IOException {
    if (length > maxPushbackBufferSize) {
      throw new IOException(
        "Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
        length);
    }
  }

}

Cette classe a deux méthodes. Un pour lire dans le tampon existant (la définition est analogue à l'appel public int read(byte b[], int off, int len)de la classe InputStream). Deuxième qui renvoie un nouveau tampon (cela peut être plus efficace si la taille du tampon à lire est inconnue).

Voyons maintenant notre classe en action:

public class StreamTest2 {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's use our TryReadInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new TryReadInputStream(originalStream, 10);

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
    printBytes(readBytes); // prints 1 2 3

    // we can also call normal read which will actually read the bytes without "writing them back"
    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 4 5 6

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
    printBytes(readBytes); // prints 7 8 9

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 7 8 9


  }



}
marcheurs
la source
5

Si vous utilisez une implémentation de InputStream, vous pouvez vérifier le résultat de InputStream#markSupported()cela pour vous dire si vous pouvez ou non utiliser la méthode mark()/reset() .

Si vous pouvez marquer le flux lorsque vous lisez, appelez reset() pour revenir en arrière pour commencer.

Si vous ne pouvez pas, vous devrez ouvrir à nouveau un flux.

Une autre solution serait de convertir InputStream en tableau d'octets, puis d'itérer sur le tableau autant de fois que nécessaire. Vous pouvez trouver plusieurs solutions dans cet article Convertir InputStream en tableau d'octets en Java utilisant ou non des tierces. Attention, si le contenu lu est trop volumineux, vous risquez de rencontrer des problèmes de mémoire.

Enfin, si vous avez besoin de lire l'image, utilisez:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

L'utilisation ImageIO#read(java.net.URL)vous permet également d'utiliser le cache.

alain.janinm
la source
1
un mot d'avertissement lors de l'utilisation ImageIO#read(java.net.URL): certains serveurs Web et CDN peuvent rejeter les appels simples (c'est-à-dire sans agent utilisateur qui fait croire au serveur que l'appel provient d'un navigateur Web) effectués par ImageIO#read. Dans ce cas, l'utilisation de la URLConnection.openConnection()configuration de l'agent utilisateur sur cette connexion + l'utilisation de `ImageIO.read (InputStream) fera, la plupart du temps, l'affaire.
Clint Eastwood
InputStreamn'est pas une interface
Brice
3

Que diriez-vous:

if (stream.markSupported() == false) {

        // lets replace the stream object
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        IOUtils.copy(stream, baos);
        stream.close();
        stream = new ByteArrayInputStream(baos.toByteArray());
        // now the stream should support 'mark' and 'reset'

    }
Anshuman Chatterjee
la source
5
C'est une idée terrible. Vous mettez tout le contenu du flux en mémoire comme ça.
Niels Doucet
3

Pour fractionner un InputStreamen deux, en évitant de charger toutes les données en mémoire , puis de les traiter indépendamment:

  1. Créez-en quelques-uns OutputStream, précisément:PipedOutputStream
  2. Connectez chaque PipedOutputStream avec un PipedInputStream, ce PipedInputStreamsont les fichiers renvoyés InputStream.
  3. Connectez le Source InputStream avec juste créé OutputStream. Donc, tout le lire à partir du sourcing InputStreamserait écrit dans les deux OutputStream. Il n'est pas nécessaire de l'implémenter, car cela est déjà fait dans TeeInputStream(commons.io).
  4. Dans un thread séparé, lisez l'intégralité de la source inputStream et implicitement, les données d'entrée sont transférées vers les inputStreams cibles.

    public static final List<InputStream> splitInputStream(InputStream input) 
        throws IOException 
    { 
        Objects.requireNonNull(input);      
    
        PipedOutputStream pipedOut01 = new PipedOutputStream();
        PipedOutputStream pipedOut02 = new PipedOutputStream();
    
        List<InputStream> inputStreamList = new ArrayList<>();
        inputStreamList.add(new PipedInputStream(pipedOut01));
        inputStreamList.add(new PipedInputStream(pipedOut02));
    
        TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
    
        TeeInputStream tin = new TeeInputStream(input, tout, true);
    
        Executors.newSingleThreadExecutor().submit(tin::readAllBytes);  
    
        return Collections.unmodifiableList(inputStreamList);
    }

Soyez conscient de fermer les inputStreams après avoir été consommés et fermez le thread qui s'exécute: TeeInputStream.readAllBytes()

Au cas où vous auriez besoin de le diviser en plusieursInputStream , au lieu de seulement deux. Remplacez dans le fragment de code précédent la classe TeeOutputStreamde votre propre implémentation, qui encapsulerait a List<OutputStream>et remplacerait l' OutputStreaminterface:

public final class TeeListOutputStream extends OutputStream {
    private final List<? extends OutputStream> branchList;

    public TeeListOutputStream(final List<? extends OutputStream> branchList) {
        Objects.requireNonNull(branchList);
        this.branchList = branchList;
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        for (OutputStream branch : branchList) {
            branch.write(b);
        }
    }

    @Override
    public void flush() throws IOException {
        for (OutputStream branch : branchList) {
            branch.flush();
        }
    }

    @Override
    public void close() throws IOException {
        for (OutputStream branch : branchList) {
            branch.close();
        }
    }
}
Zeugor
la source
Pouvez-vous expliquer un peu plus l'étape 4? Pourquoi devons-nous déclencher la lecture manuellement? Pourquoi la lecture de pipedInputStream ne déclenche PAS la lecture de la source inputStream? Et pourquoi faisons-nous cet appel de manière asyncronale?
Дмитрий Кулешов
2

Convertissez le flux d'entrée en octets, puis passez-le à la fonction savefile où vous assemblez le même flux d'entrée. Également dans la fonction d'origine, utilisez des octets à utiliser pour d'autres tâches

Maneesh
la source
5
Je dis mauvaise idée sur celui-ci, le tableau résultant pourrait être énorme et privera l'appareil de mémoire.
Kevin Parker
0

Dans le cas où quelqu'un s'exécute dans une application Spring Boot et que vous souhaitez lire le corps de la réponse d'un RestTemplate (c'est pourquoi je veux lire un flux deux fois), il existe un moyen propre (plus) de le faire.

Tout d'abord, vous devez utiliser Spring StreamUtilspour copier le flux dans une chaîne:

String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))

Mais ce n'est pas tout. Vous devez également utiliser une fabrique de requêtes qui peut mettre en mémoire tampon le flux pour vous, comme ceci:

ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
RestTemplate restTemplate = new RestTemplate(factory);

Ou, si vous utilisez le bean d'usine, alors (c'est Kotlin mais quand même):

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
  .requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
  .additionalInterceptors(loggingInterceptor)
  .build()

Source: https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-without-destroying-the-body/

milosmns
la source
0

Si vous utilisez RestTemplate pour passer des appels http, ajoutez simplement un intercepteur. Le corps de la réponse est mis en cache par l'implémentation de ClientHttpResponse. Le flux d'entrée peut maintenant être récupéré à partir de respose autant de fois que nécessaire

ClientHttpRequestInterceptor interceptor =  new ClientHttpRequestInterceptor() {

            @Override
            public ClientHttpResponse intercept(HttpRequest request, byte[] body,
                    ClientHttpRequestExecution execution) throws IOException {
                ClientHttpResponse  response = execution.execute(request, body);

                  // additional work before returning response
                  return response 
            }
        };

    // Add the interceptor to RestTemplate Instance 

         restTemplate.getInterceptors().add(interceptor); 
Noman Khan
la source