Skip to content

๐ŸŒ Run Vitess VStream & Messaging streams on PlanetScale instances, in Typescript.

Notifications You must be signed in to change notification settings

andrao/planetscale-stream-ts

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

24 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

planetscale-stream-ts

npm version build

This package exports two classes for streaming data from a PlanetScale database:

  • PlanetScaleMessagingStream: For reading a Vitess Messaging stream
  • PlanetScaleVStream: For reading a Vitess VStream (change data capture) stream


PlanetScaleMessagingStream

Offers a method, stream(), which returns an async iterable for consuming messages from a Vitess Messaging stream.

Note that messages need to be acknowledged, otherwise they will be redelivered. Use the ack() method to acknowledge messages.

This class uses PlanetScaleโ€™s psdb gRPC API, which is a slimmed down version of the Vitess queryservice. Itโ€™s an alpha API, without much documentation, whose purpose Iโ€™m unsure of, and which has not been publicized in any real way, so use with caution.

See the Vitess documentation for more information on Vitess Messaging, including instructions on how to create a messaging table:

Parameters

Constructor

Parameter Description
db_config Database connection config
db_config.host PlanetScale host
db_config.database PlanetScale database name
db_config.username PlanetScale branch username
db_config.password PlanetScale branch password
table_name The name of the messaging table from which to stream messages
table_primary_key The name of the primary key field in the messaging table

Method: stream()

The stream() method uses named parameters:

Parameter Description
read_duration_ms (Optional) The duration for which the stream will be read. Omit to stream indefinitely.

Method: ack()

The ack() method uses one positional parameter:

Parameter Description
keys An array of message primary key values to acknowledge.

Usage

import { PlanetScaleMessagingStream } from 'planetscale-stream-ts';

const messenger = new PlanetScaleMessagingStream({
    db_config: {
        host: 'aws.connect.psdb.cloud',
        database: 'my_db',
        username: 'my_user',
        password: '<secret>',
    },
    table_name: 'my_message',
    table_primary_key: 'id',
});

const stream = messenger.stream({ read_duration_ms: 30 * 1000 });

for await (const { messages, error } of stream) {
    // Log out messages
    console.dir(messages, { depth: null });

    // Log out error
    if (error) console.error(error);

    // Acknowledge messages using primary key values
    const keys = messages.map(r => r.id);
    void messenger.ack(keys);
}

Example

planetscale-stream-ts--messaging.mp4


PlanetScaleVStream

Offers a method, stream(), which returns an async iterable for consuming messages from a Vitess VStream.

This class uses the psdbconnect gRPC API, which is used for e.g. the Connect Airbyte adapter. This API is in alpha, so use with caution.

See the Vitess documentation for more information on VStream:

Parameters

Constructor

Note that this db_config includes a use_replica boolean.

Parameter Description
db_config Database connection config
db_config.host PlanetScale host
db_config.database PlanetScale database name
db_config.username PlanetScale branch username
db_config.password PlanetScale branch password
db_config.use_replica Whether to use the branch replica
table_name The name of the table from which to stream changes

Method: stream()

The stream() method uses named parameters:

Parameter Description
starting_cursor The table cursor from which the stream will be read.
read_duration_ms (Optional) The duration for which the stream will be read. Omit to stream indefinitely.
stop_position (Optional) The VGtid position at which to stop.

Determining the starting cursor

The TableCursor encodes the keyspace, shard, and VGtid position from which the stream will begin.

Parameter Description
keyspace The keyspace from which to stream changes.
shard The shard from which to stream changes.
position The VGtid position from which to stream changes.

The position parameter has two special values:

  • undefined: Stream will start from the start of the binlog
    • PlanetScale retains binlog records for 3 days, by default
      • Run SHOW VARIABLES LIKE 'binlog_expire_logs_seconds' to confirm
  • "current": Stream will start from the current moment

Keyspace and shard values can be found by querying the database:

  • SHOW KEYSPACES: Lists keyspaces
  • SHOW VITESS_SHARDS: Lists shards in each keyspace, using format {keyspace}/{shard}

Usage

import { PlanetScaleVStream, TableCursor } from 'planetscale-stream-ts';

const vstream = new PlanetScaleVStream({
    db_config: {
        host: 'aws.connect.psdb.cloud',
        database: 'my_db',
        username: 'my_user',
        password: '<secret>',
        use_replica: true,
    },
    table_name: 'my_table',
});

const stream = vstream.stream({
    starting_cursor: new TableCursor({
        keyspace: 'my_keyspace',
        shard: '-',
        position: 'current',
    }),
    read_duration_ms: 30 * 1000,
});

for await (const { cursor, inserts, updates, deletes, error } of stream) {
    // Log out stream cursor position (VGtid)
    console.log('streamed up to:', cursor?.position);

    // Log out changes
    console.dir({ mod: 'INSERTS', data: inserts }, { depth: null });
    console.dir({ mod: 'UPDATES', data: updates }, { depth: null });
    console.dir({ mod: 'DELETES', data: deletes }, { depth: null });

    // Log out error
    if (error) console.error(error);
}

Example

planetscale-stream-ts--vstream.mp4


Using the examples

This repository includes two example scripts, one for each class, in the examples/ folder.

Before running an example, copy the .env.template file to .env and set the correct environment variables. Both of the examples additionally require some configuration values to be set in the files themselves, indicated by the @todo comments.

After running pnpm install, run the examples using the scripts in package.json:

  • pnpm run messaging runs the PlanetScale Messaging example
    • See examples/messaging.ts
  • pnpm run vstream runs the PlanetScale VStream example
    • See examples/vstream.ts

See the Example sections in the documentation above for screencaps of behaviour.


Protocol buffers

Sources

The .proto files in the proto/psdb directory have been copied in from the planetscale/psdb repository. Those in proto/vitess come from Vitess.

Conversion to TypeScript

TypeScript equivalents are generated using the @bufbuild/protoc-gen-es package and the @connectrpc/protoc-gen-connect-es plugin. Code generation is configured in buf.yaml and buf.gen.yaml, and generated code is saved to src/generated.

Run pnpm run generate to regenerate src/generated.

API clients

gRPC API clients for the two PlanetScale APIs are created in the src/clients directory using @connectrpc/connect and @connectrpc/connect-node.