The NATS client for NodeJS is very slow

Samaritain Sim'S

I performed a latency test with the NATS benchmark tool, using two types of NATS clients.

A) The native NATS client (NATS CLI), which can be downloaded and installed from https://github.com/nats-io/natscli/releases

Procedure

Producer command : nats bench foo --pub 1 --request --msgs 100000 --size 3kb

Consumer command : nats bench foo --sub 1 --reply

Version : 0.0.35

Results : ~6600 Msg/s

Latency : 1/(2*6600) = 0.075 ms

NATS CLI Latency Test Result

B) Then I used the NATS client for NodeJS, writing a script inspired by what's provided in the NATS NodeJS Github repository, to do the same benchmark as with the native NATS client.

bench.js

#!/usr/bin/env node

const parse = require("minimist");
const {Nuid, connect, Bench, Metric} = require('nats')

const defaults = {
  s: "127.0.0.1:4222",
  c: 100000,
  p: 128,
  subject: new Nuid().next(),
  i: 1,
  json: false,
  csv: false,
  csvheader: false,
  pending: 1024 * 32,
};

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "s": ["server"],
      "c": ["count"],
      "d": ["debug"],
      "p": ["payload"],
      "i": ["iterations"],
    },
    default: defaults,
    string: [
      "subject",
    ],
    boolean: [
      "asyncRequests",
      "callbacks",
      "json",
      "csv",
      "csvheader",
    ],
  },
);

if (argv.h || argv.help || (!argv.sub && !argv.pub && !argv.req)) {
  console.log(
    "usage: bench.ts [--json] [--callbacks] [--csv] [--csvheader] [--pub] [--sub] [--req (--asyncRequests)] [--count <#messages>=100000] [--payload <#bytes>=128] [--iterations <#loop>=1>] [--server server] [--subject <subj>]\n",
  );
  process.exit(0);
}
const server = argv.server;
const count = parseInt(argv.count);
const bytes = parseInt(argv.payload);
const iters = parseInt(argv.iterations);
const pl = parseInt(argv.pendingLimit) * 1024;
const metrics = [];

(async () => {
  for (let i = 0; i < iters; i++) {
    const nc = await connect(
      { servers: server, debug: argv.debug, pending: argv.pending },
    );
    const opts = {
      msgs: count,
      size: bytes,
      asyncRequests: argv.asyncRequests,
      callbacks: argv.callbacks,
      pub: argv.pub,
      sub: argv.sub,
      req: argv.req,
      rep: argv.rep,
      subject: argv.subject,
    };

    const bench = new Bench(nc, opts);
    const m = await bench.run();
    metrics.push(...m);
    await nc.close();
  }
})().then(() => {
  const reducer = (a, m) => {
    if (a) {
      a.name = m.name;
      a.payload = m.payload;
      a.bytes += m.bytes;
      a.duration += m.duration;
      a.msgs += m.msgs;
      a.lang = m.lang;
      a.version = m.version;
      a.async = m.async;

      a.max = Math.max(a.max === undefined ? 0 : a.max, m.duration);
      a.min = Math.min(a.min === undefined ? m.duration : a.max, m.duration);
    }
    return a;
  };

  if (!argv.json && !argv.csv) {
    const pubsub = metrics.filter((m) => m.name === "pubsub").reduce(
      reducer,
      new Metric("pubsub", 0),
    );
    const pub = metrics.filter((m) => m.name === "pub").reduce(
      reducer,
      new Metric("pub", 0),
    );
    const sub = metrics.filter((m) => m.name === "sub").reduce(
      reducer,
      new Metric("sub", 0),
    );
    const req = metrics.filter((m) => m.name === "req").reduce(
      reducer,
      new Metric("req", 0),
    );
    const rep = metrics.filter((m) => m.name === "rep").reduce(
      reducer,
      new Metric("rep", 0),
    );

    if (pubsub && pubsub.msgs) {
      console.log(pubsub.toString());
    }
    if (pub && pub.msgs) {
      console.log(pub.toString());
    }
    if (sub && sub.msgs) {
      console.log(sub.toString());
    }
    if (req && req.msgs) {
      console.log(req.toString());
    }
    if (rep && rep.msgs) {
      console.log(rep.toString());
    }
  } else if (argv.json) {
    console.log(JSON.stringify(metrics, null, 2));
  } else if (argv.csv) {
    const lines = metrics.map((m) => {
      return m.toCsv();
    });
    if (argv.csvheader) {
      lines.unshift(Metric.header());
    }
    console.log(lines.join(""));
  }
});

Procedure

Command: node bench.js --subject test --req --payload 3500 --count 100000

NodeJS Client Version: 2.1.14

Results : ~330 Msg/s

Latency : 1/(2*330) = 1.5 ms

NATS NodeJS Latency Test Result

C) In addition, I tried to create two clients, a publisher which sends messages and a subscriber which consumes them, to measure the transport time of the messages in question, without going through the benchmark tool. The results are of the same order as the latency time calculated by the benchmark tool, bench.js.

pub.js

#!/usr/bin/env node

const parse = require("minimist");
const { connect, StringCodec, headers, credsAuthenticator } = require("nats");
const { delay } = require("./utils.js");
const fs = require("fs");

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "s": ["server"],
      "c": ["count"],
      "i": ["interval"],
      "f": ["creds"],
      "h": ["headers"]
    },
    default: {
      s: "127.0.0.1:4222",
      c: 1,
      i: 0,
    },
    boolean: true,
    string: ["server", "count", "interval", "headers", "creds"],
  },
);

const opts = { servers: argv.s };
const subject = String(argv._[0]);
const payload = argv._[1] || "";
const count = (argv.c === -1 ? Number.MAX_SAFE_INTEGER : argv.c) || 1;
const interval = argv.i || 0;

if (argv.h || argv.help || !subject) {
    console.log(
      "Usage: nats-pub [--creds=/path/file.creds] [-s server] [-c <count>=1] [-i <interval>=0] [-h='k=v;k2=v2'] subject [msg]",
    );
    console.log("to publish forever, specify -c=-1 or --count=-1");
    process.exit(1);
  }

if (argv.debug) {
  opts.debug = true;
}

if (argv.creds) {
  const data = fs.readFileSync(argv.creds);
  opts.authenticator = credsAuthenticator(data);
}

(async () => {
  let nc;
  try {
    nc = await connect(opts);
  } catch (err) {
    console.log(`error connecting to nats: ${err.message}`);
    return;
  }
  console.info(`connected ${nc.getServer()}`);

  nc.closed()
    .then((err) => {
      if (err) {
        console.error(`closed with an error: ${err.message}`);
      }
    });

  const hdrs = headers();
  if (argv.headers) {
    argv.headers.split(";").map((l) => {
      const [k, v] = l.split("=");
      hdrs.append(k, v);
    });
  }
  hdrs.append("time", "0");

  const sc = StringCodec();

  for (let i = 1; i <= count; i++) {
    const pubopts = {};
    hdrs.set("time", Date.now().toString());
    pubopts.headers = hdrs;
    nc.publish(subject, sc.encode(String(payload)), pubopts);
    console.log(`[${i}] ${subject}: ${payload}`);
    if (interval) {
      await delay(interval);
    }
  }
  await nc.flush();
  await nc.close();
})();

sub.js

#!/usr/bin/env node

const parse = require("minimist");
const { connect, StringCodec, credsAuthenticator } = require('nats');
const fs = require("fs");

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "d": ["debug"],
      "s": ["server"],
      "q": ["queue"],
      "f": ["creds"]
    },
    default: {
      s: "127.0.0.1:4222",
      q: "",
    },
    boolean: ["debug"],
    string: ["server", "queue", "creds"]
  }
);

const usage = () => {
    console.log(
        "Usage: nats-sub [-s server] [--creds=/path/file.creds] [-q queue] [-h] subject",
    );
}

const opts = { servers: argv.s };
const subject = argv._[0] ? String(argv._[0]) : null;

if (argv.h || argv.help || !subject) {
usage();
process.exit(1);
}
  
if (argv.d) {
  opts.debug = true;
}

if (argv.creds) {
  const data = fs.readFileSync(argv.creds);
  opts.authenticator = credsAuthenticator(data);
}


(async () => {
  let nc;
  try {
    nc = await connect(opts);
  } catch (err) {
    console.log(`error connecting to nats: ${err.message}`);
    return;
  }

  console.info(`connected ${nc.getServer()}`);
  nc.closed()
    .then((err) => {
        if (err) {
            console.error(`closed with an error: ${err.message}`);
        }
    });

  const sc = StringCodec();
  const sub = nc.subscribe(subject, { queue: argv.q });
  console.info(`${argv.q !== "" ? "queue " : ""}listening to ${subject}`);
  for await (const m of sub) {
    let receivedTime = Date.now();
    let transportTime = null;
    if (m.headers) {
        for (const [key, value] of m.headers) {
            if (key === 'time') {
                transportTime = receivedTime - parseInt(value.toString(), 10);
                break;
            }
        }
    }
    console.log(`[${sub.getProcessed()}]: ${m.subject}: ${sc.decode(m.data)} after ${(transportTime || -1)} ms`);
  }
})();

Procedure

Command pub : node pub.js -c 50 -i 10 test --debug

Command sub : node sub.js --debug test

Version : 2.1.14

Transport time : ~2.5ms

Transport Time NODEJS

My question is: why this glaring difference between NATS CLI and NodeJS clients? Is it due to the language? NATS being done in Go natively?

Alberto Ricart

The short answer is the Go client (one used in nats tool) will be much faster (client can use different threads to send/receive and process network traffic)

In node/and other JavaScript runtimes while the client is doing something, it cannot do something else. This means that if it is busy doing i/o, it cannot process subscription messages, etc.

The i/o of a test involving a publisher can be affected by the size of the outbound buffer. By default this is 32K in the javascript clients is a good working size (I believe there's an option to change it in the test, if not I will update the bench). When publishing in a loop once the outbound is filled, it sends to the network - typically the client will process and do nothing, and it flushes on the next event loop. In the case of a test loop, it fills it, and must send, and then continues. So the size of this buffer greatly affects the network i/o.

In your screenshots, you are using debug. Also never put the server or client in debug modes when performing any performance tests.

Request Test

There are a few things you should try:

  • For requests you are not using the --asyncRequests option. So requests are being processed serially - second request will not be sent until the first one returns, clearly the RTT is going to be terrible without this option.
  • The subscriber in a test when running async iterators for loops won't be as performant. For tests, you should use the --callbacks option which will eliminate iterators in the processing, for normal client you should possibly just use the iterators.
  • The --req bench is suboptimal, because it is doing 4 per network operations (since it sets up the subscriber on the same connection - ie, the client sends a request to the server, from the server it goes to the subscriber, subscriber responds back and message goes to the server, the server return the response to the client). If you divide by 2 that is a rough approximate the value.
  • The --count by default is 1M messages, which in the case of the request which creates a promise per request, you'll quickly run into slow consumer or other issues once the number of promises is exhausted. I would keep this 10K or less for the request test.
node examples/bench.js --subject a --req --payload 3500 --count 10000
req 791 msgs/sec - [25.30 secs] ~ 2.64 MB/sec 25295/25295

node examples/bench.js --subject a --req --payload 3500 --count 10000 --asyncRequests
req 36,430 msgs/sec - [0.55 secs] ~ 243.20 MB/sec 549/549

node examples/bench.js --subject a --req --payload 3500 --count 10000 --asyncRequests --callbacks
req 38,462 msgs/sec - [0.52 secs] ~ 256.76 MB/sec 520/520

Pub test

Raw blast performance of the node client:

node examples/bench.js --subject a --pub --payload 0
pub 917,431 msgs/sec - [0.11 secs] ~ 0.00 B/sec 109/109

# If you actually add a payload it will take longer (after all you are moving 3.5gb):

node examples/bench.js --subject a --pub --payload 3500
pub 290,698 msgs/sec - [0.34 secs] ~ 970.31 MB/sec 344/344

Sub test

node examples/bench.js --subject a --sub
sub 294,118 msgs/sec - [0.34 secs] ~ 981.72 MB/sec 340/340

# and in a different terminal - this will possibly 
nats bench --pub 1 --msgs 1000000 --size 3500 -s localhost:4222 a
11:48:16 Starting Core NATS pub/sub benchmark [subject=a, multisubject=false, multisubjectmax=0, msgs=1,000,000, msgsize=3.4 KiB, pubs=1, subs=0, pubsleep=0s, subsleep=0s]
11:48:16 Starting publisher, publishing 1,000,000 messages
Finished      1s

Pub stats: 608,014 msgs/sec ~ 1.98 GB/sec

Specifying the --callbacks option will increase the performance Also, the publisher implementation will limit how fast the sub will perform so:

node examples/bench.js --subject a --sub --callbacks
sub 370,370 msgs/sec - [0.27 secs] ~ 1.21 GB/sec 270/270

[repeat the nats bench above]

If doing both, the numbers will be a bit different, mostly because of JavaScript either publishing or processing

node examples/bench.js --subject a --sub --callbacks --pub
pubsub 930,233 msgs/sec - [0.21 secs] ~ 113.55 MB/sec 215/215
pub 462,963 msgs/sec - [0.22 secs] ~ 56.51 MB/sec 216/216
sub 952,381 msgs/sec - [0.10 secs] ~ 116.26 MB/sec 105/105

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related