Reactive Nats client and RxJS wrapper for ts-nats
$ npm i --save natsx
The main purpose of the library is to manage nats subscription behavior with RxJS operators like take
and timeout
.
You can pass a nats client or use connect() method of NatsX to initalize the wrapper
import { connect, Msg } from 'ts-nats';
import { NatsX } from 'natsx';
const rawClient = await connect({servers: ['nats://demo.nats.io:4222', 'tls://demo.nats.io:4443']});
const client = new NatsX(rawClient);
client.from('greeting').subscribe((msg: Msg) => { ... })
To unsubscribe from nats subscription, you must unsubscribe from the observable with operators or manually.
// that will get 3 greeting messages then unsubscribe from nats subscription
client.from('greeting')
.pipe(
take(3)
)
.subscribe((msg: Msg) => { ... })
To achieve the same ability for nats requests, we are not using the ts-nats request method directly. You must manage your subscription same way of from
method of NatsX
// that will wait a response for 2 seconds then throw timeout error and unsubscribe from reply subject.
client.request('greeter', 'me')
.pipe(
timeout(2000)
)
.subscribe((msg: Msg) => { ... })
Also, you can use other methods like publish
, close
, drain
and listen the connection status with status$
, and other nats events with error$
, subscriptions$
, serverChanged$
, yield$
You are welcome to contribute to this project, just open a PR.
- NatsX is MIT licensed.