Creating a buffered transformation stream in practice

In my current project, I am exploring the use of the latest Node.js streams API to create a stream that buffers a specific amount of data. This buffer should be automatically flushed when the stream is piped to another stream or when it emits `readable` events. The challenge lies in ensuring that the buffer is flushed every time the stream is piped to a new destination, even if it has already been flushed to a previous destination.

Here's an example scenario:

  1. The BufferStream class is built using stream.Transform and maintains a 512KB ring buffer internally.
  2. ReadableStreamA is connected as a source to an instance of BufferStream.
  3. BufferStream continuously writes incoming data from ReadableStreamA to its ring buffer (overwriting old data).
  4. The buffered data in BufferStream is then piped to WritableStreamB.
  5. WritableStreamB</code receives the entire 512KB buffer and continues to receive ongoing data from <code>ReadableStreamA
    through BufferStream.
  6. The same buffer in BufferStream is also piped to WritableStreamC.
  7. WritableStreamC</code receives a separate copy of the 512KB buffer, which may differ from what <code>WritableStreamB
    received due to additional data being written to BufferStream.

My question is, can this functionality be achieved with the streams API? So far, my only idea involves creating an object with a custom method that generates a new PassThrough stream for each destination instead of simply piping to and from it.

As a side note, I have previously accomplished similar tasks using the older "flowing" API by monitoring new subscribers on `data` events. Whenever a new function was added with `.on('data')`, I would call it directly with a duplicate of the ring buffer data.

Answer №1

Allow me to offer my perspective on your issue.

To tackle the problem at hand, the approach involves creating a Transform stream. This will enable us to implement custom buffering logic before transmitting the data through the stream's output:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // Implement custom buffering logic here
  // For example, add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

Additionally, we must customize the .pipe() method to receive notifications when the BufferStream is piped into another stream. This customization allows automatic data writing:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

By doing so, when we execute buffer.pipe(someStream), the pipe operation proceeds as planned and the internal buffer gets written to the output stream. Subsequently, the Transform class manages the process while monitoring backpressure and other relevant aspects.

A functional gist is available for reference. Please note that the provided example does not fully address correct buffering logic implementation (e.g., neglecting internal buffer size considerations), but such adjustments can be easily made.

Answer №2

Although Paul's response is on the right track, it may not fully meet all of the necessary criteria. It seems like what is needed is for the transform stream to flush the buffer whenever the pipe() method is invoked. This would clear out any accumulated data between the creation of the transform stream and its connection to the current writable stream.

A revised solution could look something like this:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; 
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

It should be noted that:

BufferStream.super_.prototype.pipe.apply(this, arguments);

is essentially the same as:

stream.Transform.prototype.pipe.apply(this, arguments);

To improve efficiency, consider implementing some flags that can be toggled when pipe/unpipe operations are executed.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Monitoring and recording the current line number during evaluation

Recently, I've been experimenting with eval to modify a $scope object named variables. This is the watcher I'm using: $scope.$watch(function () { return $scope.variables; }, function (variables) { console.log('changed!'); }, true) ...

Subscription date is activated when a different part of the state changes in ngrx

Within my state, I have properties named start and end which store dates. Whenever any other part of the state is modified, the subscription for these start and end dates is triggered. Here is the subscription implementation: this.subs.sink = this.store ...

Guide to navigating to a different intent in a Google Action (Google Assistant Action)

Presently, I am developing an Action for the Google Assistant that prompts users to input their phone number. Following this prompt, another intent will repeat the phone number provided and inquire if it is correct. If the user indicates 'no', I ...

Implementing AngularJS within a standalone system devoid of internet connectivity

Hello there! I am interested in creating a single page web application for an embedded system that won't have access to the internet. This means I need to develop it without relying on external sources for functionality. Although I prefer AngularJS, ...

How can it be that "Function" actually functions as a function?

In JavaScript, there exists a function called "Function". When you create an instance of this function, it returns another function: var myfunc = new Function('arg1','arg2','return arg1+arg2'); In the above example, the vari ...

Printing a React component using a button causes formatting related to className to be lost, while printing it inline preserves the formatting

I've hit a roadblock in trying to figure out the React and printing issue for the past week and a half, and it feels like I'm going in circles. Despite finding some helpful solutions on Stack Overflow, none of them seem to work completely for me ...

Having trouble getting static serving to work on Express.js when custom middleware is added

Seeking assistance with creating a middleware for my express js website to incorporate subdomains and serve static image, css, and js files. While my HTML pages load properly, I encounter long page load times and the javascript files fail to load. Any gui ...

What is the best way to run an npm script with grunt-run?

I currently have a npm task specified in my package.json file for running jest tests: "scripts": { "test-jest": "jest", "jest-coverage": "jest --coverage" }, "jest": { "testEnvironment": "jsdom" }, I would like to run this task using ...

javascriptchange the format from <string, string[]> to <string, string>

In my code, I came across a JavaScript object that consists of key/value pairs. Each value in this object is an array of strings: var errors = { "Message": ["Error #1", "Error #2"], "Email": ["Error #3", "Error #4"] }; My goal is to transform thi ...

How to dynamically add HTML content to a JSON string with the help of Javascript

I am working with a JSON object that includes various messages to be displayed on a webpage using Javascript. This JSON file is located in a separate directory and I have full control over it, being loaded asynchronously. There are specific cases where di ...

Error encountered when attempting to save a record in NodeJS using Mongoose

I am in the process of developing a small REST API using NodeJS, Express, and Mongoose. However, I have encountered a TypeError that I am struggling to resolve. The setup for Express Router and Mongoose is as follows: const Router = require('express ...

Encountering issues with running the 'npm run serve' command locally in a Vue project

Trying to develop an app with Vue, I used the npm command. However, when I executed "npm run serve," the messages showed me that I should be running the app at "http://localhost:8080/" and not on "x86_64-apple-darwin13.4.0:". Is there a way to fix this by ...

Exploring nested JSON data to access specific elements

When I use console.log(responseJSON), it prints the following JSON format logs on the screen. However, I am specifically interested in printing only the latlng values. When I attempt to access the latlng data with console.log(responseJSON.markers.latlng) o ...

The Docker container is encountering issues after initialization (node:19). An unhandled promise rejection warning is being displayed, indicating that the error has exited with code 3

When I run docker build . -t app, followed by docker run -p 8080:8080 app, either my Docker container or node seems to be throwing an error that's preventing me from viewing my app. Normally, when running my node app locally, I just use npm run dev an ...

Cease the continuous scrolling of the display element now

I'm having trouble with my progressbar animation when scrolling. It seems to run multiple times instead of just once. I apologize if there's an error in my code. Could someone please assist me? This is the code I am using: <div class="demo- ...

Difficulty of combining forEach with findById in mongoose

I need to create a Node route that adds properties to objects and pushes them onto an array declared outside a forEach loop. I have noticed that while the array appears to be filled with data when I log it within the loop, it somehow becomes empty when I r ...

Step-by-step guide on how to showcase elements within v-for by clicking on them

In my data array, only the first element is visible by default. Clicking on either the YES or NO button will display the element with the corresponding id of yes_section or no_section (depending on which button was clicked). For instance, if we click on ...

Guide on adding multiple entries into a PostreSQL database through NodeJS

In my quest to enhance my ReactJS skills, I've encountered a challenge of adding multiple rows to the database. Currently, I'm using the following code snippet: let request = new Request('http://localhost:3000/songs', { method: &ap ...

I'm facing some difficulties in sourcing my header into a component and encountering errors. Can anyone advise me on how to properly outsource my header?

Looking to streamline my headers in my Ionic 4 project by creating a separate reusable component for them. Here's how I set up my dashboard page with the header component: <ion-header> <app-header title="Dashboard"></app-he ...

What is the best way to utilize Create-React-App interactively without incorporating Typescript?

Using Ubuntu 20.04 on WSL 2 Installed Node version 16.17.0 When I try to create my app by running the following command: npx create-next-app ninjalist I encounter a prompt asking whether I want to use TypeScript with the project, with options for Yes an ...