Socket
Socket
Sign inDemoInstall

rsocket-tcp-client

Package Overview
Dependencies
Maintainers
6
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rsocket-tcp-client - npm Package Compare versions

Comparing version 0.0.27 to 0.0.28

26

build/index.js

@@ -15,23 +15,13 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
Object.defineProperty(exports, 'RSocketTcpConnection', {
enumerable: true,
get: function () {
return _RSocketTcpClient.RSocketTcpConnection;
},
});
Object.defineProperty(exports, 'RSocketTlsClient', {
enumerable: true,
get: function () {
return _RSocketTcpClient.RSocketTlsClient;
},
});
exports.default = void 0;
var _RSocketTcpClient = require('./RSocketTcpClient');
var _default = _RSocketTcpClient.RSocketTcpClient;
exports.default = _default;
import {
RSocketTcpClient,
RSocketTcpConnection,
RSocketTlsClient,
} from './RSocketTcpClient';
export default RSocketTcpClient;
export {RSocketTcpConnection, RSocketTlsClient};

@@ -15,31 +15,20 @@ /** Copyright (c) Facebook, Inc. and its affiliates.

*
*
* @flow
*/
'use strict';
Object.defineProperty(exports, '__esModule', {value: true});
exports.RSocketTlsClient = exports.RSocketTcpClient = exports.RSocketTcpConnection = void 0;
var _net = _interopRequireDefault(require('net'));
var _tls = _interopRequireDefault(require('tls'));
var _rsocketFlowable = require('rsocket-flowable');
var _rsocketCore = require('rsocket-core');
import type {ConnectionStatus, DuplexConnection, Frame} from 'rsocket-types';
import type {ISubject, ISubscriber, ISubscription} from 'rsocket-types';
import type {Encoders} from 'rsocket-core';
var _rsocketTypes = require('rsocket-types');
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {default: obj};
}
function _defineProperty(obj, key, value) {
if (key in obj) {
Object.defineProperty(obj, key, {
value: value,
enumerable: true,
configurable: true,
writable: true,
});
} else {
obj[key] = value;
}
return obj;
}
import net from 'net';
import tls from 'tls';
import {Flowable} from 'rsocket-flowable';
import {
createBuffer,
deserializeFrames,
serializeFrameWithLength,
} from 'rsocket-core';
import {CONNECTION_STATUS} from 'rsocket-types';

@@ -49,30 +38,13 @@ /**

*/
class RSocketTcpConnection {
constructor(socket, encoders) {
_defineProperty(
this,
'_handleError',
export class RSocketTcpConnection implements DuplexConnection {
_buffer: Buffer;
_encoders: ?Encoders<*>;
_receivers: Set<ISubscriber<Frame>>;
_senders: Set<ISubscription>;
_socket: ?net$Socket;
_status: ConnectionStatus;
_statusSubscribers: Set<ISubject<ConnectionStatus>>;
(error) => {
error =
error || new Error('RSocketTcpClient: Socket closed unexpectedly.');
this._close(error);
}
);
_defineProperty(
this,
'_handleData',
(chunk) => {
try {
const frames = this._readFrames(chunk);
frames.forEach((frame) => {
this._receivers.forEach((subscriber) => subscriber.onNext(frame));
});
} catch (error) {
this._handleError(error);
}
}
);
this._buffer = (0, _rsocketCore.createBuffer)(0);
constructor(socket: ?net$Socket, encoders: ?Encoders<*>) {
this._buffer = createBuffer(0);
this._encoders = encoders;

@@ -82,17 +54,21 @@ this._receivers = new Set();

this._statusSubscribers = new Set();
if (socket) {
this.setupSocket(socket);
this._status = _rsocketTypes.CONNECTION_STATUS.CONNECTED;
this._status = CONNECTION_STATUS.CONNECTED;
} else {
this._socket = null;
this._status = _rsocketTypes.CONNECTION_STATUS.NOT_CONNECTED;
this._status = CONNECTION_STATUS.NOT_CONNECTED;
}
}
close() {
close(): void {
this._close();
}
connect() {
connect(): void {
throw new Error('not supported');
}
setupSocket(socket) {
setupSocket(socket: net$Socket) {
this._socket = socket;

@@ -104,4 +80,5 @@ socket.on('close', this._handleError);

}
connectionStatus() {
return new _rsocketFlowable.Flowable((subscriber) => {
connectionStatus(): Flowable<ConnectionStatus> {
return new Flowable(subscriber => {
subscriber.onSubscribe({

@@ -118,4 +95,5 @@ cancel: () => {

}
receive() {
return new _rsocketFlowable.Flowable((subject) => {
receive(): Flowable<Frame> {
return new Flowable(subject => {
subject.onSubscribe({

@@ -131,6 +109,8 @@ cancel: () => {

}
sendOne(frame) {
sendOne(frame: Frame): void {
this._writeFrame(frame);
}
send(frames) {
send(frames: Flowable<Frame>): void {
let subscription;

@@ -141,8 +121,8 @@ frames.subscribe({

},
onError: (error) => {
onError: error => {
subscription && this._senders.delete(subscription);
this._handleError(error);
},
onNext: (frame) => this._writeFrame(frame),
onSubscribe: (_subscription) => {
onNext: frame => this._writeFrame(frame),
onSubscribe: _subscription => {
subscription = _subscription;

@@ -154,10 +134,13 @@ this._senders.add(subscription);

}
getConnectionState() {
getConnectionState(): ConnectionStatus {
return this._status;
}
setConnectionStatus(status) {
setConnectionStatus(status: ConnectionStatus): void {
this._status = status;
this._statusSubscribers.forEach((subscriber) => subscriber.onNext(status));
this._statusSubscribers.forEach(subscriber => subscriber.onNext(status));
}
_close(error) {
_close(error?: Error) {
if (this._status.kind === 'CLOSED' || this._status.kind === 'ERROR') {

@@ -167,7 +150,5 @@ // already closed

}
const status = error
? {error, kind: 'ERROR'}
: _rsocketTypes.CONNECTION_STATUS.CLOSED;
const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED;
this.setConnectionStatus(status);
this._receivers.forEach((subscriber) => {
this._receivers.forEach(subscriber => {
if (error) {

@@ -180,3 +161,3 @@ subscriber.onError(error);

this._receivers.clear();
this._senders.forEach((subscription) => subscription.cancel());
this._senders.forEach(subscription => subscription.cancel());
this._senders.clear();

@@ -190,10 +171,24 @@ const socket = this._socket;

}
_readFrames(chunk) {
_handleError = (error?: ?Error): void => {
error = error || new Error('RSocketTcpClient: Socket closed unexpectedly.');
this._close(error);
};
_handleData = (chunk: Buffer): void => {
try {
const frames = this._readFrames(chunk);
frames.forEach(frame => {
this._receivers.forEach(subscriber => subscriber.onNext(frame));
});
} catch (error) {
this._handleError(error);
}
};
_readFrames(chunk: Buffer): Array<Frame> {
// Combine partial frame data from previous chunks with the next chunk,
// then extract any complete frames plus any remaining data.
const buffer = Buffer.concat([this._buffer, chunk]);
const [frames, remaining] = (0, _rsocketCore.deserializeFrames)(
buffer,
this._encoders
);
const [frames, remaining] = deserializeFrames(buffer, this._encoders);
this._buffer = remaining;

@@ -203,8 +198,5 @@ return frames;

_writeFrame(frame) {
_writeFrame(frame: Frame): void {
try {
const buffer = (0, _rsocketCore.serializeFrameWithLength)(
frame,
this._encoders
);
const buffer = serializeFrameWithLength(frame, this._encoders);
if (!this._socket) {

@@ -222,27 +214,27 @@ throw new Error('RSocketTcpClient: Cannot send frame, not connected.');

* A TCP transport client for use in node environments.
*/ exports.RSocketTcpConnection = RSocketTcpConnection;
class RSocketTcpClient extends RSocketTcpConnection {
constructor(options, encoders) {
*/
export class RSocketTcpClient extends RSocketTcpConnection {
_options: net$connectOptions;
constructor(options: net$connectOptions, encoders: ?Encoders<*>) {
super(null, encoders);
_defineProperty(
this,
'_handleOpened',
() => {
this.setConnectionStatus(_rsocketTypes.CONNECTION_STATUS.CONNECTED);
}
);
this._options = options;
}
connect() {
connect(): void {
if (this.getConnectionState().kind !== 'NOT_CONNECTED') {
throw new Error(
'RSocketTcpClient: Cannot connect(), a connection is already established.'
'RSocketTcpClient: Cannot connect(), a connection is already established.',
);
}
this.setConnectionStatus(_rsocketTypes.CONNECTION_STATUS.CONNECTING);
const socket = _net.default.connect(this._options);
this.setConnectionStatus(CONNECTION_STATUS.CONNECTING);
const socket = net.connect(this._options);
this.setupSocket(socket);
socket.on('connect', this._handleOpened);
}
_handleOpened = (): void => {
this.setConnectionStatus(CONNECTION_STATUS.CONNECTED);
};
}

@@ -252,28 +244,27 @@

* A TLS transport client for use in node environments.
*/ exports.RSocketTcpClient = RSocketTcpClient;
class RSocketTlsClient extends RSocketTcpConnection {
constructor(options, encoders) {
*/
export class RSocketTlsClient extends RSocketTcpConnection {
_options: tls$connectOptions;
constructor(options: tls$connectOptions, encoders: ?Encoders<*>) {
super(null, encoders);
_defineProperty(
this,
'_handleOpened',
() => {
this.setConnectionStatus(_rsocketTypes.CONNECTION_STATUS.CONNECTED);
}
);
this._options = options;
}
connect() {
connect(): void {
if (this.getConnectionState().kind !== 'NOT_CONNECTED') {
throw new Error(
'RSocketTcpClient: Cannot connect(), a connection is already established.'
'RSocketTcpClient: Cannot connect(), a connection is already established.',
);
}
this.setConnectionStatus(_rsocketTypes.CONNECTION_STATUS.CONNECTING);
const socket = _tls.default.connect(this._options);
this.setConnectionStatus(CONNECTION_STATUS.CONNECTING);
const socket = tls.connect(this._options);
this.setupSocket(socket);
socket.on('connect', this._handleOpened);
}
_handleOpened = (): void => {
this.setConnectionStatus(CONNECTION_STATUS.CONNECTED);
};
}
exports.RSocketTlsClient = RSocketTlsClient;
{
"name": "rsocket-tcp-client",
"description": "RSocket TCP client for use in Node.js environments",
"version": "0.0.27",
"version": "0.0.28",
"repository": {

@@ -12,7 +12,7 @@ "type": "git",

"dependencies": {
"rsocket-core": "^0.0.27",
"rsocket-flowable": "^0.0.27",
"rsocket-types": "^0.0.27"
"rsocket-core": "^0.0.28",
"rsocket-flowable": "^0.0.28",
"rsocket-types": "^0.0.28"
},
"gitHead": "23da9b0e9377ba52d3294cf4763cbbf3de7ba7b1"
"gitHead": "1dd3eb28183991d663392d87877225bf862946e2"
}

@@ -0,0 +0,0 @@ # rsocket-tcp-client

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc