Creating a buffered transformation stream in practice

In my current project, I am exploring the use of the latest Node.js streams API to create a stream that buffers a specific amount of data. This buffer should be automatically flushed when the stream is piped to another stream or when it emits `readable` events. The challenge lies in ensuring that the buffer is flushed every time the stream is piped to a new destination, even if it has already been flushed to a previous destination.

Here's an example scenario:

  1. The BufferStream class is built using stream.Transform and maintains a 512KB ring buffer internally.
  2. ReadableStreamA is connected as a source to an instance of BufferStream.
  3. BufferStream continuously writes incoming data from ReadableStreamA to its ring buffer (overwriting old data).
  4. The buffered data in BufferStream is then piped to WritableStreamB.
  5. WritableStreamB</code receives the entire 512KB buffer and continues to receive ongoing data from <code>ReadableStreamA
    through BufferStream.
  6. The same buffer in BufferStream is also piped to WritableStreamC.
  7. WritableStreamC</code receives a separate copy of the 512KB buffer, which may differ from what <code>WritableStreamB
    received due to additional data being written to BufferStream.

My question is, can this functionality be achieved with the streams API? So far, my only idea involves creating an object with a custom method that generates a new PassThrough stream for each destination instead of simply piping to and from it.

As a side note, I have previously accomplished similar tasks using the older "flowing" API by monitoring new subscribers on `data` events. Whenever a new function was added with `.on('data')`, I would call it directly with a duplicate of the ring buffer data.

Answer №1

Allow me to offer my perspective on your issue.

To tackle the problem at hand, the approach involves creating a Transform stream. This will enable us to implement custom buffering logic before transmitting the data through the stream's output:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // Implement custom buffering logic here
  // For example, add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

Additionally, we must customize the .pipe() method to receive notifications when the BufferStream is piped into another stream. This customization allows automatic data writing:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

By doing so, when we execute buffer.pipe(someStream), the pipe operation proceeds as planned and the internal buffer gets written to the output stream. Subsequently, the Transform class manages the process while monitoring backpressure and other relevant aspects.

A functional gist is available for reference. Please note that the provided example does not fully address correct buffering logic implementation (e.g., neglecting internal buffer size considerations), but such adjustments can be easily made.

Answer №2

Although Paul's response is on the right track, it may not fully meet all of the necessary criteria. It seems like what is needed is for the transform stream to flush the buffer whenever the pipe() method is invoked. This would clear out any accumulated data between the creation of the transform stream and its connection to the current writable stream.

A revised solution could look something like this:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; 
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

It should be noted that:

BufferStream.super_.prototype.pipe.apply(this, arguments);

is essentially the same as:

stream.Transform.prototype.pipe.apply(this, arguments);

To improve efficiency, consider implementing some flags that can be toggled when pipe/unpipe operations are executed.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Working with JSON arrays within a JSON object in Node.js

The JSON data displayed below is the response that I am fetching from my Node.js client. The code snippet responsible for this operation can be seen here: res.on('data', function (chunk) { var parsedResponse = JSON.parse(JSON.stringify(c ...

Is there a way to automatically initiate the download of a file (such as a PDF) when a webpage loads?

Currently, my objective is to have a form on a webpage that, once filled out by a user, redirects them to a thank you page where a message of gratitude is displayed. What I aim to accomplish is for a PDF file to automatically start downloading as soon as t ...

Mastering TypeScript in Router Configuration

I am currently working with a standard router setup. type Routes = '/' | '/achievements' | ... ; This helps in identifying the routers present in the project. However, I am faced with a new challenge of creating an array that includes ...

Discovering a specific element using jQuery

I am trying to work with a div structure like this: <div class="country"> <div class="cty_popover"> <p>TITLE</p> <ul> <li>NAME 1</li> <li>NAME 2</li> ...

Ways to retrieve information from an API with the help of axios

Postman is able to retrieve the data correctly, however axios is getting the wrong data and receiving a "Not found" error even though there is a record in the database. React hook: import axios from "axios"; import {useEffect, useState} from "react"; exp ...

How to Round Decimals in DataTables

I am encountering an issue with my data table's footer, which is supposed to display the sum of each column. However, while the values within the table are rounded to two decimal places, the values in the footer do not always adhere to this formatting ...

Validation in AngularJS is limited to only accepting integers with the use of the

Can you help me with validating positive integer numbers using the ng-pattern attribute? Currently, I have this pattern: ^[0-9]{1,7}(\.[0-9]+)?$/, but it also allows decimal values. I want to restrict it to only accept whole numbers. ...

Error encountered while attempting to obtain OAuth2 API authorization token in ExpressJS Node.js Angular: "getaddrinfo ENOTFOUND"

Recently, I developed an angular application that sends an HTTP post request to a Node/Express.js endpoint upon clicking a button in order to obtain an authorisation token. I successfully configured the necessary credentials for basic OAuth2 authorisation ...

Is it possible to launch a browser window using javascript?

I need to open a new Internet Explorer browser window and I am currently using the following code: window.open("http://jsc.simfatic-solutions.com", "mywindow"); However, I would like to know how can I open a new IE window with a fresh session? ...

Implementing custom styles in JavaScript according to the specific browser or platform

When it comes to adding multiple css styles to a dom element and ensuring compatibility across different browsers, which approach is more optimal for performance? Combining all prefixed css properties together, allowing the browser to decide which one ...

Retrieve the content of the specified element within the webpage

How can I modify the method to successfully retrieve the text content of an element on a webpage using Selenium with JavaScript? Currently, it is returning undefined. homepage.js const { Builder, By, Key, until } = require('selenium-webdriver'); ...

Is onMouseLeave failing to trigger upon exiting an element?

Having an issue with the onMouseLeave event in React. It seems to be triggering when hovering over other elements like the navbar or console, but not when leaving the intended element. function GameCard({name, about, image}) { const [active, ...

The object persists in revealing the password that I am attempting to conceal

Seeking help to hide the password object from being displayed. Below is my code snippet where I am using bcrypt to hash the password. Even though I am trying to hide the return object, I am not seeing the desired outcome. Can someone please point out wha ...

Setting up the Node.js file system for an Ionic project: A step-by-step guide

Seeking a solution to delete a Folder/File on the client side using javascript/jQuery/AngularJS1. After researching, I found that it can be done using Node.js as shown in this Sitepoint link. Now, I am unsure how to integrate Node.js fs(File System) with ...

Angular Automatically Generated Identifier for Event Detection

By using jQuery, I can easily target an ID that starts with "conditionValue" $(document).on('focus', "[id^=conditionValue]", function (event) { // code }); But how can I achieve the same in Angular when looking for an ID starting with somet ...

What is the correct method for orchestrating API requests during deployment with create-react-app and an ExpressJS backend?

I have encountered a problem while trying to deploy my react application to the server. After deployment, when the app attempts to make API calls to the same server, an error occurs: Cross-Origin Request Blocked: The Same Origin Policy disallows reading ...

When implementing v-model on a v-dialog component, it causes the element within an array to

There is an array of apps with links, and a Dialog component that has v-model="dialog" to close it. However, after adding v-model to the v-dialog, the functionality of the apps is affected. The current app is passed as a prop to Dialog, but it always sends ...

The child directive has the ability to interact with every parent directive

I am attempting to create a slider using angularJS. Within this slider, I have implemented a child directive with event listeners. Whenever the main event (mousedown) is triggered, it invokes a function from the parent directive through a controller and up ...

How can we trigger the Skill bar effect upon scrolling to the section?

I have created a stunning Skill Bar that is functioning perfectly. However, I am looking to enhance the experience by having the skill bar effect trigger only when I scroll to that specific section. For example, as I navigate from the introduction section ...

Having trouble deploying Nodejs on Azure and receiving a 404 error?

After successfully building a web app using the MEAN stack for personal use, I encountered an issue when deploying it on Azure. The deployed app is showing a 404 error. You can view the web app here: The main culprit appears to be related to routing, part ...