Google Cloud PubSub
A type-safe Google Cloud Pub/Sub integration for NestJS. The module validates topic/subscription configuration, keeps schemas in sync with remote revisions, and generates a fully typed publisher/subscriber API through initializeKit.
Installation
bash
npm install @golevelup/nestjs-google-cloud-pubsub
npm install @google-cloud/pubsub avsc @protobuf-ts/runtimeQuick Start
Define your topics/subscriptions once and call initializeKit. It produces a typed publisher class, a subscribe decorator, and a payload map inferred from your Avro/Proto schemas.
// google-cloud-pubsub.configuration.ts
ts
import {
GoogleCloudPubsubModule,
PubsubTopicConfiguration,
} from '@golevelup/nestjs-google-cloud-pubsub';
import { SchemaTypes, Encodings } from '@google-cloud/pubsub';
import { MessageType } from '@protobuf-ts/runtime';
import { PaymentProcessedProtocolBufferSchema } from './generated-proto/payment-processed-schema';
export const topics = [
{
name: 'order.created',
schema: {
definition: {
fields: [
{ name: 'field1', type: 'string' },
{ name: 'field2', type: 'int' },
{ name: 'field3', type: 'boolean' },
{ name: 'field4', type: 'double' },
{
name: 'field5',
type: {
fields: [{ name: 'nestedField1', type: 'string' }],
name: 'Nested1',
type: 'record',
},
},
{ name: 'field6', type: ['double', 'null'] },
],
name: 'order.created.schema',
type: 'record',
},
encoding: Encodings.Binary,
name: 'order.created.schema',
type: SchemaTypes.Avro,
},
subscriptions: [
{ name: 'order.created.subscription.order-processor-service' },
{ name: 'order.created.subscription.analytic-service' },
],
},
{
name: 'payment.processed',
schema: {
definition:
PaymentProcessedProtocolBufferSchema as MessageType<PaymentProcessedProtocolBufferSchema>,
encoding: Encodings.Binary,
name: 'payment.processed.schema',
protoPath:
'/Users/Desktop/google-cloud-pubsub/proto/payment-processed.proto',
type: SchemaTypes.ProtocolBuffer,
},
subscriptions: [
{ name: 'payment.processed.payment-processor-service' },
{ name: 'payment.processed.analytic-service' },
],
},
] as const satisfies readonly PubsubTopicConfiguration[];
const {
_GoogleCloudPubsubPayloadsMap,
GoogleCloudPubsubAbstractPublisher,
GoogleCloudPubsubSubscribe,
} = GoogleCloudPubsubModule.initializeKit<typeof topics>();
export type GoogleCloudPubsubPayloadsMap = typeof _GoogleCloudPubsubPayloadsMap;
export class GoogleCloudPubsubPublisher extends GoogleCloudPubsubAbstractPublisher<GoogleCloudPubsubPayloadsMap> {}
export { GoogleCloudPubsubSubscribe };// app.module.ts
ts
import { Module } from '@nestjs/common';
import { GoogleCloudPubsubModule } from '@golevelup/nestjs-google-cloud-pubsub';
import {
GoogleCloudPubsubPublisher,
topics,
} from './google-cloud-pubsub.configuration';
@Module({
imports: [
GoogleCloudPubsubModule.registerAsync({
useFactory: () => ({
client: { keyFilename: process.env.GOOGLE_APPLICATION_CREDENTIALS },
topics,
}),
publisher: GoogleCloudPubsubPublisher,
}),
],
})
export class AppModule {}// app.service.ts
ts
import { Injectable } from '@nestjs/common';
import { GoogleCloudPubsubMessage } from '@golevelup/nestjs-google-cloud-pubsub';
import {
GoogleCloudPubsubPayloadsMap,
GoogleCloudPubsubPublisher,
GoogleCloudPubsubSubscribe,
} from './google-cloud-pubsub.configuration';
@Injectable()
export class AppService {
constructor(
private readonly googleCloudPubsubPublisher: GoogleCloudPubsubPublisher,
) {}
public async publishOrderCreated() {
await this.googleCloudPubsubPublisher.publish('order.created', {
data: {
field1: 'field1',
field2: 0,
field3: true,
field4: 10,
field5: { nestedField1: 'nestedField1' },
field6: null,
},
});
}
@GoogleCloudPubsubSubscribe(
'order.created',
'order.created.subscription.analytic-service',
)
public async onOrderCreated(
payload: GoogleCloudPubsubMessage<
GoogleCloudPubsubPayloadsMap['order.created']
>,
) {
console.log({ data: payload.data });
}
}Future features
- Custom error hooks / metric emitters.
- Push subscription support.
- Split optional dependencies so Avro (
avsc) and Protocol Buffer runtimes are only required when those schema types are used. - Publisher.ready() helper that awaits PubsubClient initialization.
- Manual ack/nack.
- Iterate over schema revisions page by page instead of fetching them all at once.
- Support for custom Avro serialization options (e.g.
wrapUnions,logicalTypes) with adaptive type inference.