nice-grpc
A Node.js gRPC library that is nice to you. Built on top of
grpc-js
.
Features
- Written in TypeScript for TypeScript.
- Modern API that uses Promises and Async Iterables for streaming.
- Canceling client and server calls using
AbortSignal
. - Client and server middleware via concise API that uses Async Generators.
Installation
npm install nice-grpc google-protobuf
npm install --save-dev @types/google-protobuf
Usage
Compiling Protobuf files
This works the same way as you would do for grpc-js
.
Install necessary tools:
npm install --save-dev grpc-tools grpc_tools_node_protoc_ts
Given a Protobuf file ./proto/example.proto
, generate JS code and TypeScript
definitions into directory ./compiled_proto
:
./node_modules/.bin/grpc_tools_node_protoc \
--plugin=protoc-gen-ts=./node_modules/.bin/protoc-gen-ts \
--plugin=protoc-gen-grpc=./node_modules/.bin/grpc_tools_node_protoc_plugin \
--js_out=import_style=commonjs,binary:./compiled_proto \
--ts_out=grpc_js:./compiled_proto \
--grpc_out=grpc_js:./compiled_proto \
./proto/example.proto
Alternative methods include Buf and
Prototool.
Server
Consider the following Protobuf definition:
syntax = "proto3";
package nice_grpc.example;
service ExampleService {
rpc ExampleUnaryMethod(ExampleRequest) returns (ExampleResponse) {};
}
message ExampleRequest {
// ...
}
message ExampleResponse {
// ...
}
After compiling Protobuf file, we can write service implementation:
import {ServiceImplementation} from 'nice-grpc';
import {IExampleService} from './compiled_proto/example_grpc_pb';
import {ExampleRequest, ExampleResponse} from './compiled_proto/example_pb';
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async exampleUnaryMethod(request: ExampleRequest): Promise<ExampleResponse> {
return new ExampleResponse();
},
};
Alternatively, you can use classes:
class ExampleServiceImpl implements ServiceImplementation<IExampleService> {
async exampleUnaryMethod(request: ExampleRequest): Promise<ExampleResponse> {
return new ExampleResponse();
}
}
Now we can create and start a server that exposes our service:
import {createServer} from 'nice-grpc';
import {ExampleService} from './compiled_proto/example_grpc_pb';
const server = createServer();
server.add(ExampleService, exampleServiceImpl);
await server.listen('0.0.0.0:8080');
Once we need to stop, gracefully shut down the server:
await server.shutdown();
Errors
To report an error to a client, use ServerError
.
Any thrown errors other than ServerError
will result in client receiving
error with status code UNKNOWN
. Use server middleware for custom handling of
uncaught errors.
import {status} from '@grpc/grpc-js';
import {ServerError} from 'nice-grpc';
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async exampleUnaryMethod(request: ExampleRequest): Promise<ExampleResponse> {
throw new ServerError(status.NOT_FOUND, 'Requested data does not exist');
},
};
Metadata
A server receives client metadata along with request, and can send response
metadata in header and trailer.
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async exampleUnaryMethod(
request: ExampleRequest,
context: CallContext,
): Promise<ExampleResponse> {
const someValue = context.metadata.get('some-key')[0] as string | undefined;
context.header.set('some-key', 'some-value');
context.trailer.set('some-key', 'some-value');
return new ExampleResponse();
},
};
Cancelling calls
A server receives
AbortSignal
that gets aborted once the call is cancelled by the client, or due to a
deadline. You can use it to cancel any inner requests.
import fetch from 'node-fetch';
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async exampleUnaryMethod(
request: ExampleRequest,
context: CallContext,
): Promise<ExampleResponse> {
const response = await fetch('http://example.com', {
signal: context.signal,
});
},
};
Server streaming
Consider the following Protobuf definition:
service ExampleService {
rpc ExampleStreamingMethod(ExampleRequest)
returns (stream ExampleResponse) {};
}
Service implementation defines this method as an Async Generator:
import {delay} from 'abort-controller-x';
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async *exampleStreamingMethod(
request: ExampleRequest,
context: CallContext,
): AsyncIterable<ExampleResponse> {
for (let i = 0; i < 10; i++) {
await delay(context.signal, 1000);
yield new ExampleResponse();
}
},
};
Example: IxJS
import {range} from 'ix/asynciterable';
import {withAbort, map} from 'ix/asynciterable/operators';
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async *exampleStreamingMethod(
request: ExampleRequest,
context: CallContext,
): AsyncIterable<ExampleResponse> {
yield* range(0, 10).pipe(
withAbort(context.signal),
map(() => new ExampleResponse()),
);
},
};
Example: Observables
import {Observable} from 'rxjs';
import {from} from 'ix/asynciterable';
import {withAbort} from 'ix/asynciterable/operators';
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async *exampleStreamingMethod(
request: ExampleRequest,
context: CallContext,
): AsyncIterable<ExampleResponse> {
const observable: Observable<ExampleResponse>;
yield* from(observable).pipe(withAbort(context.signal));
},
};
Client streaming
Given a client streaming method:
service ExampleService {
rpc ExampleClientStreamingMethod(stream ExampleRequest)
returns (ExampleResponse) {};
}
Service implementation method receives request as an Async Iterable:
const exampleServiceImpl: ServiceImplementation<IExampleService> = {
async exampleUnaryMethod(
request: AsyncIterable<ExampleRequest>,
): Promise<ExampleResponse> {
for await (const item of request) {
}
return new ExampleResponse();
},
};
Middleware
Server middleware intercepts incoming calls allowing to:
- Execute any logic before and after implementation methods
- Look into request, request metadata and response
- Interrupt a call before it reaches implementation by throwing a
ServerError
- Catch implementation errors and return friendly
ServerError
s to a client - Augment call context
- Modify response header and trailer metadata
Server middleware is defined as an Async Generator. The most basic no-op
middleware looks like this:
import {ServerMiddlewareCall, CallContext} from 'nice-grpc';
async function* middleware<Request, Response>(
call: ServerMiddlewareCall<Request, Response>,
context: CallContext,
) {
return yield* call.next(call.request, context);
}
For unary and client streaming methods, the call.next
generator yields no
items and returns a single response; for server streaming and bidirectional
streaming methods, it yields each response and returns void. By doing
return yield*
we cover both cases. To handle these cases separately, we can
write a middleware as follows:
async function* middleware<Request, Response>(
call: ServerMiddlewareCall<Request, Response>,
context: CallContext,
) {
if (!call.responseStream) {
const response = yield* call.next(call.request, context);
return response;
} else {
for await (const response of call.next(call.request, context)) {
yield response;
}
return;
}
}
To attach a middleware to a server, use a server.use
method. Note that
server.use
returns a new server instance.
const server = createServer().use(middleware1).use(middleware2);
A middleware that is attached first, will be invoked first.
You can also attach middleware per-service:
const server = createServer().use(middlewareA);
server.with(middlewareB).add(Service1, service1Impl);
server.with(middlewareC).add(Service2, service2Impl);
In the above example, Service1
gets middlewareA
and middlewareB
, and
Service2
gets middlewareA
and middlewareC
.
Example: Logging
Log all calls:
async function* loggingMiddleware<Request, Response>(
call: ServerMiddlewareCall<Request, Response>,
context: CallContext,
) {
const {path} = call.definition;
console.log('Server call', path, 'start');
try {
const result = yield* call.next(call.request, context);
console.log('Server call', path, 'end: OK');
return result;
} catch (error) {
if (error instanceof ServerError) {
console.log('Server call', path, `end: ${status[error.code]}`);
} else {
console.log('Server call', path, `error: ${error?.stack}`);
}
throw error;
}
}
Example: Error handling
Catch unknown errors and wrap them into ServerError
s with friendly messages:
async function* errorHandlingMiddleware<Request, Response>(
call: ServerMiddlewareCall<Request, Response>,
context: CallContext,
) {
try {
return yield* call.next(call.request, context);
} catch (error: unknown) {
if (error instanceof ServerError) {
throw error;
}
let details = 'Unknown server error occurred';
if (process.env.NODE_ENV === 'development') {
details += `: ${error.stack}`;
}
throw new ServerError(status.UNKNOWN, details);
}
}
Example: Authentication
Validate JSON Web Token (JWT) from request metadata and put its claims to
CallContext
:
import createRemoteJWKSet from 'jose/jwks/remote';
import jwtVerify, {JWTPayload} from 'jose/jwt/verify';
import {JOSEError} from 'jose/util/errors';
const jwks = createRemoteJWKSet(
new URL('https://example.com/.well-known/jwks.json'),
);
type AuthCallContextExt = {
auth: JWTPayload;
};
async function* authMiddleware<Request, Response>(
call: ServerMiddlewareCall<Request, Response, AuthCallContextExt>,
context: CallContext,
) {
const authorization = context.metadata.get('Authorization')[0];
if (authorization == null) {
throw new ServiceError(
grpc.status.UNAUTHENTICATED,
'Missing Authorization metadata',
);
}
const parts = authorization.toString().split(' ');
if (parts.length !== 2 || parts[0] !== 'Bearer') {
throw new ServiceError(
grpc.status.UNAUTHENTICATED,
'Invalid Authorization metadata format. Expected "Bearer <token>"',
);
}
const token = parts[1];
const {payload} = await jwtVerify(token, jwks).catch(error => {
if (error instanceof JOSEError) {
throw new ServiceError(grpc.status.UNAUTHENTICATED, error.message);
} else {
throw error;
}
});
return yield* call.next(call.request, {
...context,
auth: payload,
});
}
Service implementation can then access JWT claims via call context:
const exampleServiceImpl: ServiceImplementation<
IExampleService,
AuthCallContextExt
> = {
async exampleUnaryMethod(
request: ExampleRequest,
context: CallContext & AuthCallContextExt,
): Promise<ExampleResponse> {
const userId = context.auth.sub;
},
};
Client
Consider the following Protobuf definition:
syntax = "proto3";
package nice_grpc.example;
service ExampleService {
rpc ExampleUnaryMethod(ExampleRequest) returns (ExampleResponse) {};
}
message ExampleRequest {
// ...
}
message ExampleResponse {
// ...
}
After compiling Protobuf file, we can create the client:
import {createChannel, createClient} from 'nice-grpc';
import {ExampleService} from './compiled_proto/example_grpc_pb';
const channel = createChannel('localhost:8080');
const client = createClient(ExampleService, channel);
When creating a client, you can specify default call options for all methods, or
per-method. See Example: Timeouts.
Call the method:
import {ExampleRequest, ExampleResponse} from './compiled_proto/example_pb';
const response: ExampleResponse = await client.exampleUnaryMethod(
new ExampleRequest(),
);
Once we've done with the client, close the channel:
client.close();
Channels
By default, a channel uses insecure connection. The following are equivalent:
import {ChannelCredentials} from '@grpc/grpc-js';
import {createChannel} from 'nice-grpc';
createChannel('example.com:8080');
createChannel('http://example.com:8080');
createChannel('example.com:8080', ChannelCredentials.createInsecure());
To connect over TLS, use one of the following:
createChannel('https://example.com:8080');
createChannel('example.com:8080', ChannelCredentials.createSsl());
Metadata
Client can send request metadata and receive response headers and trailers:
import {Metadata} from '@grpc/grpc-js';
const metadata = new Metadata();
metadata.set('key', 'value');
const response = await client.exampleUnaryMethod(new ExampleRequest(), {
metadata,
onHeader(header: Metadata) {
},
onTrailer(trailer: Metadata) {
},
});
Errors
Client calls may throw gRPC errors represented as ClientError
, that contain
status code and description.
import {status} from '@grpc/grpc-js';
import {ClientError} from 'nice-grpc';
let response: ExampleResponse | null;
try {
response = await client.exampleUnaryMethod(new ExampleRequest());
} catch (error: unknown) {
if (error instanceof ClientError && error.code === status.NOT_FOUND) {
response = null;
} else {
throw error;
}
}
Cancelling calls
A client call can be cancelled using
AbortSignal
.
import AbortController from 'node-abort-controller';
import {isAbortError} from 'abort-controller-x';
const abortController = new AbortController();
client
.exampleUnaryMethod(new ExampleRequest(), {
signal: abortController.signal,
})
.catch(error => {
if (isAbortError(error)) {
} else {
throw error;
}
});
abortController.abort();
Deadlines
You can specify a deadline for a client call using Date
object:
import {status} from '@grpc/grpc-js';
import {ClientError} from 'nice-grpc';
import {addSeconds} from 'date-fns';
try {
const response = await client.exampleUnaryMethod(new ExampleRequest(), {
deadline: addSeconds(new Date(), 15),
});
} catch (error: unknown) {
if (error instanceof ClientError && error.code === status.DEADLINE_EXCEEDED) {
} else {
throw error;
}
}
Server streaming
Consider the following Protobuf definition:
service ExampleService {
rpc ExampleStreamingMethod(ExampleRequest)
returns (stream ExampleResponse) {};
}
Client method returns an Async Iterable:
for await (const response of client.exampleStreamingMethod(
new ExampleRequest(),
)) {
}
Client streaming
Given a client streaming method:
service ExampleService {
rpc ExampleClientStreamingMethod(stream ExampleRequest)
returns (ExampleResponse) {};
}
Client method expects an Async Iterable as its first argument:
async function* createRequest(): AsyncIterable<ExampleRequest> {
for (let i = 0; i < 10; i++) {
yield new ExampleRequest();
}
}
const response = await client.exampleClientStreamingMethod(createRequest());
Middleware
Client middleware intercepts outgoing calls allowing to:
- Execute any logic before and after reaching server
- Modify request metadata
- Look into request, response and response metadata
- Send call multiple times for retries or hedging
- Augment call options type to have own configuration
Client middleware is defined as an Async Generator and is very similar to
Server middleware. Key differences:
- Middleware invocation order is reversed: middleware that is attached first,
will be invoked last.
- There's no such thing as
CallContext
for client middleware; instead,
CallOptions
are passed through the chain and can be accessed or altered by a
middleware.
To create a client with middleware, use a client factory:
import {createClientFactory} from 'nice-grpc';
const client = createClientFactory()
.use(middleware1)
.use(middleware2)
.create(ExampleService, channel);
A middleware that is attached first, will be invoked last.
You can reuse a single factory to create multiple clients:
const clientFactory = createClientFactory().use(middleware);
const client1 = clientFactory.create(Service1, channel1);
const client2 = clientFactory.create(Service2, channel2);
You can also attach middleware per-client:
const factory = createClientFactory().use(middlewareA);
const client1 = clientFactory.use(middlewareB).create(Service1, channel1);
const client2 = clientFactory.use(middlewareC).create(Service2, channel2);
In the above example, Service1
client gets middlewareA
and middlewareB
,
and Service2
client gets middlewareA
and middlewareC
.
Example: Logging
Log all calls:
import {ClientMiddlewareCall, CallOptions, ClientError} from 'nice-grpc';
async function* loggingMiddleware<Request, Response>(
call: ClientMiddlewareCall<Request, Response>,
options: CallOptions,
) {
const {path} = call.definition;
console.log('Client call', path, 'start');
try {
const result = yield* call.next(call.request, options);
console.log('Client call', path, 'end: OK');
return result;
} catch (error) {
if (error instanceof ClientError) {
console.log('Client call', path, `end: ${status[error.code]}`);
} else {
console.log('Client call', path, `error: ${error?.stack}`);
}
throw error;
}
}
Example: Timeouts
Support specifying timeouts for unary calls instead of absolute deadlines:
import ms = require('ms');
import {ClientMiddlewareCall, CallOptions} from 'nice-grpc';
type TimeoutCallOptionsExt = {
timeout?: string;
};
async function* timeoutMiddleware<Request, Response>(
call: ClientMiddlewareCall<Request, Response>,
options: CallOptions & TimeoutCallOptionsExt,
) {
const {timeout, ...nextOptions} = options;
if (timeout != null && !call.requestStream && !call.responseStream) {
nextOptions.deadline ??= new Date(Date.now() + ms(timeout));
}
return yield* call.next(call.request, nextOptions);
}
When creating a client, you can specify default call options for all methods, or
per-method:
const client = createClientFactory()
.use(timeoutMiddleware)
.create(ExampleService, channel, {
'*': {
timeout: '1m',
},
exampleUnaryMethod: {
timeout: '30s',
},
});
Specify call options per-call:
await client.exampleUnaryMethod(new ExampleRequest(), {
timeout: '15s',
});