How to control memory consumption when processing network requests in Node.js

How to control memory consumption when processing network requests in Node.js

Hello everybody! I’m Viktor Kugai, head of the special projects development team at Tinkoff. We create data-driven gamification projects to introduce users to a company’s ecosystem and increase brand awareness.

I will tell you how to implement batch processing of hundreds of gigabytes of data on machines with a hard memory limit using Node.js Streams and the Back Pressure mechanism of the TCP protocol.

How we transfer large amounts of data over the network

Special projects help users to get involved in the use of the company’s ecosystem in a playful way. For example, the user transfers X rubles per month to the charity fund or performs some other action in the Tinkoff application and receives extra bonuses.

For long-term special projects, streaming data processing using Apache Kafka is perfect. For example, to implement game mechanics, our team processes more than 150,000,000 events per day with a total volume of more than 250 GB.

For short-term special projects – with a life span of several months – creating a new streaming integration is associated with risks. For example, setting up the integration will require more resources and time than we expected, and we will not have time to prepare a reliable integration before the start of the project.

In this case, the batch data processing script comes to the rescue:

  1. Platform for data sharing Helicopter initiates data aggregation in the Data Warehouse on a schedule.

  2. The process sends a CSV file with the compiled content of the format multipart/form-data by HTTP.

  3. The Node.js service validates input data and stores the special project database in PostgreSQL.

We use the format multipart/form-data for transferring component data over the network, since many programming languages ​​support it, and the implementation does not change depending on the language.

For example, JavaScript uses the FormData object to load component data. In the browser, you can send components over the network using the XMLHttpRequest object and the Fetch API, and in Node.js – using the http/https module used in the axios library, or net in the undici library.

const body = new FormData();
body.append('file', file);
fetch(url, {method: 'POST', body});

In python, support for multipart/form-data is built into the requests module and no additional transformations are required, so it is convenient to use multipart/form-data for writing data overflow scripts.

import requests

requests.post(
  url,
  files={'file', file}
)

It’s easy to load constituent data, but what happens if you ignore flow control when processing the data? Memory consumption will exceed the limits and the application will crash with the following OOMKilled error:

$ docker inspect --format="{{json .State}}" <container>

{
    "Status": "exited",
    "Running": false,
    "Paused": false,
    "Restarting": false,
    "OOMKilled": true,
    "Dead": false,
    "Pid": 0,
    "ExitCode": 137,
    "Error": "",
    "StartedAt": "2024-03-11T15:30:38.222462386Z",
    "FinishedAt": "2024-03-11T15:31:24.086041716Z"
}

Flow control when processing the constituent content of network requests in Node.js

An example of manual flow control by subscribing to the data event:

const HIGH_WATER_MARK = 1_048_576; // ~1MB

let buffer = Buffer.from([]);

readable.on('data', (data) => {
  buffer = Buffer.concat([buffer, data]);
  
  if (buffer.byteLength > HIGH_WATER_MARK) {
    readable.pause();
    // ...
    // === Обработать данные ===
    // ...
    buffer = Buffer.from([]);
    readable.resume();
  }
});

Node.js v10 added support for asynchronous data processing using the for await … of loop:

for await (const data of readable) {
  // ...
  // === Обработать данные ===
  // ...
}

The upper-level data processing algorithm using the for await loop…

  1. Data packets accumulate in the buffer.

  2. The total size of data in the buffer reaches the high watermark limit.

  3. Data consumption is stopped.

  4. All unread packets are merged.

  5. Data enters the for await … of loop.

  6. The data is processed in the body of the loop.

  7. The buffer is freed.

  8. Data consumption continues.

In the case of asynchronous iterators, data first accumulate in the buffer to the highWaterMark threshold. Then the consumption of data from the stream stops, and only after the processing is complete, it continues. Manual flow control with readable.pause() and readable.resume() is ignored.

In the case of subscribing to the data event, the fact of processing data is accepted as a call to the subscriber function, so useful work is performed competitively, which can lead to memory overflow. To control memory overflow, you need to manually pause data processing using readable.pause() and readable.resume().

The highWaterMark parameter configures the buffer limit for all stream types in Node.js. Two independent buffers are provided for Duplex and Transform streams. The value of highWaterMark is not a strict memory limit and has a recommendation nature, that is, the actual utilization of memory by the buffer may significantly exceed the value of highWaterMark.

The highWaterMark parameter has two configuration modes: in bytes and in characters.

In bytes, the parameter is configured by default, and in characters – when the stream encoding is explicitly specified using stream.setEncoding(…). One character usually occupies one byte (latin1, ascii), but can occupy multiple bytes, so the highwatermark configuration in characters is not equivalent to the configuration in bytes.

In the Node.js documentation, the setEncoding configurationmentioned in the last paragraph is easy to miss.

Differences in processing the constituent content of network requests in Node.js

When subscribing to the data event, the handler is called every time the next TCP packet is read. The highWaterMark parameter does not affect the size of the TCP packet, so it is not very convenient to use a subscription to the data event. First, you need to manually control the flow with readable.pause() and readable.resume(). Second, the data is processed too often. For example, in packets of 64 kilobytes, because this is the maximum packet size in the IPv4 protocol.

Data processing using asynchronous iterators buffers TCP packets up to the HighWaterMark threshold. The packets are then combined into a single chunk and sent to the body of the loop. The size of the chunk in the loop depends on the value of HighWaterMark, so you can control the size of the chunks more flexibly. It is important to remember that highWaterMark is not a hard limit, but only a limit value that can be exceeded or ignored.

Back Pressure mechanism when handling network requests in Node.js

Node.js uses capabilities libuv libraries control of network connections, which regulates the throughput of the flow using the window mechanism of the TCP protocol.

As soon as the data size in the buffer exceeds the highWaterMark limit, the libuv library reduces the size of the TCP window to zero and thereby signals that sending data should be suspended.

TCP session parameters that are key to flow control:

  • Max Segment Size – the maximum segment size for transmission over the network. It is necessary that the sender and receiver do not transfer too large packets to each other.

  • Window Scale – coefficient for calculating the current size of the window. Need to exponentially increase the size of the window.

  • Window – window size. Required to tell the sender how much data the consumer can process.

The high-level flow control algorithm using windows in the TCP protocol looks like this:

  1. The client and server exchange parameters to establish a TCP connection.

  2. The client transmits packets to the server no larger than the Max Segment Size.

  3. The server buffers the packets, confirms their receipt, and reports that the window has been reduced.

  4. The client stops sending packets when the window becomes insufficient to send the next packet.

  5. The server processes the packets from the buffer and informs the client about increasing the window.

  6. The client continues sending data.

If the client ignores the window size and continues to send packets, the extra packets will be dropped.

Sending streaming data in practice

Consider an example of sending streaming data to a server with memory overflow control and using the tcpdump utility to log TCP packets. We will use only the node:stream and node:http modules built into Node.js to exclude the influence of frameworks on the Readable stream.

1. We implement the output data flow. We create a Readable Stream, which generates data in chunks of 5 MB, and send it using the request function from the node:http module.

/* client.js */

const {request} = require('node:http');
const {Readable} = require('node:stream');

const STREAM_SIZE_IN_BYTES = 104_857_600; // ~100MB
const STREAM_CHUNK_SIZE_IN_BYTES = 5_242_880; // ~5MB

main().catch((error) => {
  console.log(error);
  process.exit(1);
});

async function main() {
  await new Promise((resolve) => {
    let counter = 0;

    const readableStream = new Readable({
      read() {
        if (counter < STREAM_SIZE_IN_BYTES) {
          this.push(Buffer.alloc(STREAM_SIZE_IN_BYTES));
          counter += STREAM_CHUNK_SIZE_IN_BYTES;
        } else {
          this.push(null);
        }
      },
    });

    const options = {
      path: '/',
      method: 'POST',
      protocol: 'http:',
      hostname: 'localhost',
      port: '3000',
    };

    const req = request(options, (res) => {
      res.on('end', () => {
        resolve(null);
      });
    });

    readableStream.pipe(req);
  });
}

2. We implement the processing of the incoming stream. We indicate HighWaterMark 1 MB, to verify that flow control is working. Each iteration of the loop uses a delay to simulate the complex work being done in the body of the loop.

/* server.js */

const {createServer} = require('node:http');
const {PassThrough} = require('node:stream');

const HIGH_WATER_MARK = 1_048_576; // ~1MB

const MAX_DELAY_IN_MS = 5_000;
const MIN_DELAY_IN_MS = 3_000;

main().catch((error) => {
  console.log(error);
});

async function main() {
  await new Promise((resolve) => {
    const server = createServer(async (req) => {
      const passThrough = new PassThrough({
        highWaterMark: HIGH_WATER_MARK,
      });

      req.pipe(passThrough);

      for await (const chunk of passThrough) {
        console.log(`Обработано ${chunk.toString().length} байт`);

        const randomTimeoutMs =
          Math.random() * (MAX_DELAY_IN_MS - MIN_DELAY_IN_MS) + MIN_DELAY_IN_MS;

        await new Promise((resolve) => setTimeout(resolve, randomTimeoutMs));
      }
    });

    server.listen(3000, () => {
      console.log(`Сервер успешно запущен`);
      resolve(null);
    });
  });
}

3. We assemble the Docker container and configure resources in the Docker Compose file to limit memory. We set a memory limit of 50 MB, including ~20 MB for Node.js.

FROM node:18.16-alpine3.18

COPY build ./build

EXPOSE 3000

CMD ["node", "./build/server.js"]
version: '3'
name: node-streams
services:
  server:
    build:
      dockerfile: ./Dockerfile
    ports:
      - 3000:3000
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 50M
        reservations:
          cpus: '0.5'
          memory: 50M

4. We start the container and send data. We can see that the server processes chunks of about 1 MB. This happens because a high WaterMark is not a limit, but only a threshold value.

node-streams-server-1  | Сервер успешно запущен
node-streams-server-1  | Обработано 32671 байт
node-streams-server-1  | Обработано 1081344 байт
node-streams-server-1  | Обработано 1114112 байт
node-streams-server-1  | Обработано 1114112 байт
node-streams-server-1  | Обработано 1114112 байт

5. Run the tcpdump utility, to trace the flow control process in detail only at the TCP protocol level. The -i parameter is the localhost interface, port is the port the utility is listening on.

$ sudo tcpdump -i lo0 port 3000

6. We run the example again. First, the parties exchange parameters to establish a TCP session. Max Message Size (mss) – 16,324 bytes, Window Scale factor (wscale) – 6.

15:47:10.004647 IP localhost.54892 > localhost.hbci: Flags [S], seq 2664759488, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 792746651 ecr 0,sackOK,eol], length 0
15:47:10.005743 IP localhost.hbci > localhost.54892: Flags [S.], seq 4116912290, ack 2664759489, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 840270532 ecr 792746651,sackOK,eol], length 0

7. The client sends data in packets of 16332 bytes, and the recipient confirms receipt of the packets. The sequence number (seq) indicates the data range, for example, 1:16313 and then 16313:32625. The receiver confirms that the packets have been received, for example ack: 16313. At the same time, the size of the window is constantly decreasing.

15:47:10.006046 IP localhost.54892 > localhost.hbci: Flags [.], seq 1:16333, ack 1, win 6379, options [nop,nop,TS val 792746653 ecr 840270534], length 16332
15:47:10.006047 IP localhost.54892 > localhost.hbci: Flags [.], seq 16333:32665, ack 1, win 6379, options [nop,nop,TS val 792746653 ecr 840270534], length 16332
15:47:10.006087 IP localhost.hbci > localhost.54892: Flags [.], ack 16333, win 6124, options [nop,nop,TS val 840270534 ecr 792746653], length 0
15:47:10.006099 IP localhost.hbci > localhost.54892: Flags [.], ack 32665, win 5869, options [nop,nop,TS val 840270534 ecr 792746653], length 0

8. This continues until the window size becomes too small to send the next packet (win: 255). The sender stops sending and waits for the window size to recover. The value of the win parameter is not a value in bytes. Below, I will tell you how to calculate the actual size of the window.

15:47:10.031553 IP localhost.hbci > localhost.54892: Flags [.], ack 1159573, win 510, options [nop,nop,TS val 840270559 ecr 792746678], length 0
15:47:10.031567 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 255, options [nop,nop,TS val 840270559 ecr 792746678], length 0

9. After three seconds, confirmation comes that the window size has been restored and you can continue sending data. Shipping continues.

15:47:10.031567 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 255, options [nop,nop,TS val 840270559 ecr 792746678], length 0
15:47:13.033733 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 4338, options [nop,nop,TS val 840270561 ecr 792746678], length 0
15:47:13.033850 IP localhost.54892 > localhost.hbci: Flags [.], seq 1175905:1192237, ack 1, win 6379, options [nop,nop,TS val 792746680 ecr 840270561], length 16332
15:47:13.033853 IP localhost.54892 > localhost.hbci: Flags [.], seq 1192237:1208569, ack 1, win 6379, options [nop,nop,TS val 792746680 ecr 840270561], length 16332

To get the size of the actual window, you need the value of the win parameter to apply the formula for calculating the actual window size. When the TCP protocol was created, the developers decided that the window size should exceed 65,535 bytes. For the size of the window, the Window parameter was selected in the message header. Parameter value must be greater than 16 bits (2^16 === 65,535).

In the late 1990s, it became apparent that the current window size was not sufficient, so an attribute was added to the TCP standard Window Scale Optionso as not to break the backward compatibility of the protocol. When establishing a connection, hosts exchange Window Scale parameters and calculate the current window size according to the formula Window Size* (2 ^ Window Scale), thus the maximum window size has increased from 0.06 MB (65.535 bytes) to ~1 GB and does not need change the size of the Window header.

We collect all together

In Node.js, when processing a large amount of data transmitted over the network, it is possible and necessary to control the amount of memory consumed. For this, the for await … of cycle must be used when processing the Readable Stream. You can use a subscription to the data event, but you will have to manually pause the thread using readable.pause().

Under the hood, Node.js uses the Back Pressure mechanism, which is based on the windowing mechanism of the TCP protocol, and the libuv library manages the state of the TCP connection. The size of the window depends on the space in the buffer. If the data size in the buffer exceeds the HighWaterMark value, data consumption stops and resumes when the buffer is cleared.

At the beginning, I described the integration of component data transfer using HTTP. Thanks to the simple and convenient flow control mechanism in Node.js, our team reliably processes hundreds of gigabytes of data on machines with a memory limit of 256 MB. The main thing is to use the for await … of loop for data processing and correctly configure highWaterMark according to the allocated resources.

If you have improved the reliability of data processing in Node.js – share your practices in the comments!

Related posts