From e96928e79fad6724ea4b3924ba42cbe70a68f555 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Tue, 3 Dec 2024 09:03:22 -0700 Subject: [PATCH] Move non-protocol options like table & where to the params sub-key (#2081) Fix https://github.com/electric-sql/electric/issues/2079 Electric's TypeScript client is currently tightly coupled to PostgreSQL-specific options in its `ShapeStreamOptions` interface. As Electric plans to support multiple data sources in the future, we need to separate protocol-level options from source-specific options. ## Changes 1. Created a new `PostgresParams` type to define PostgreSQL-specific parameters: - `table`: The root table for the shape - `where`: Where clauses for the shape - `columns`: Columns to include in the shape - `replica`: Whether to send full or partial row updates 2. Moved PostgreSQL-specific options from the top-level `ShapeStreamOptions` interface to the `params` sub-key 3. Updated `ParamsRecord` type to include PostgreSQL parameters 4. Updated the `ShapeStream` class to handle parameters from the `params` object 5. Updated documentation to reflect the changes ## Migration Example Before: ```typescript const stream = new ShapeStream({ url: 'http://localhost:3000/v1/shape', table: 'users', where: 'id > 100', columns: ['id', 'name'], replica: 'full' }) ``` After: ```typescript const stream = new ShapeStream({ url: 'http://localhost:3000/v1/shape', params: { table: 'users', where: 'id > 100', columns: ['id', 'name'], replica: 'full' } }) ``` --- .changeset/tiny-socks-drive.md | 49 ++++++ examples/basic-example/src/Example.tsx | 4 +- .../linearlite/src/pages/Issue/Comments.tsx | 4 +- examples/linearlite/src/shapes.ts | 2 +- examples/nextjs-example/app/page.tsx | 11 +- examples/proxy-auth/app/page.tsx | 8 +- examples/redis-sync/src/index.ts | 4 +- examples/todo-app/src/routes/index.tsx | 4 +- packages/react-hooks/README.md | 4 +- packages/react-hooks/src/react-hooks.tsx | 21 ++- .../react-hooks/test/react-hooks.test-d.ts | 12 +- .../react-hooks/test/react-hooks.test.tsx | 80 ++++++++-- packages/typescript-client/README.md | 14 +- packages/typescript-client/src/client.ts | 137 ++++++++++------- packages/typescript-client/src/shape.ts | 7 +- .../typescript-client/test/client.test-d.ts | 16 +- .../typescript-client/test/client.test.ts | 74 ++++++--- .../test/integration.test.ts | 92 +++++++---- .../typescript-client/test/stream.test.ts | 12 +- website/docs/api/clients/typescript.md | 145 ++++++++++-------- website/docs/integrations/react.md | 50 +++++- 21 files changed, 535 insertions(+), 215 deletions(-) create mode 100644 .changeset/tiny-socks-drive.md diff --git a/.changeset/tiny-socks-drive.md b/.changeset/tiny-socks-drive.md new file mode 100644 index 0000000000..1b477758bf --- /dev/null +++ b/.changeset/tiny-socks-drive.md @@ -0,0 +1,49 @@ +--- +"@electric-sql/client": minor +"@electric-sql/react": minor +--- + +[BREAKING]: Move non-protocol options like table & where to the params sub-key + +## Context + +Electric's TypeScript client is currently tightly coupled to PostgreSQL-specific options in its `ShapeStreamOptions` interface. As Electric plans to support multiple data sources in the future, we need to separate protocol-level options from source-specific options. + +## Changes + +1. Created a new `PostgresParams` type to define PostgreSQL-specific parameters: + - `table`: The root table for the shape + - `where`: Where clauses for the shape + - `columns`: Columns to include in the shape + - `replica`: Whether to send full or partial row updates + +2. Moved PostgreSQL-specific options from the top-level `ShapeStreamOptions` interface to the `params` sub-key +3. Updated `ParamsRecord` type to include PostgreSQL parameters +4. Updated the `ShapeStream` class to handle parameters from the `params` object +5. Updated documentation to reflect the changes + +## Migration Example + +Before: +```typescript +const stream = new ShapeStream({ + url: 'http://localhost:3000/v1/shape', + table: 'users', + where: 'id > 100', + columns: ['id', 'name'], + replica: 'full' +}) +``` + +After: +```typescript +const stream = new ShapeStream({ + url: 'http://localhost:3000/v1/shape', + params: { + table: 'users', + where: 'id > 100', + columns: ['id', 'name'], + replica: 'full' + } +}) +``` diff --git a/examples/basic-example/src/Example.tsx b/examples/basic-example/src/Example.tsx index 797fc72e6f..eb08a18cc6 100644 --- a/examples/basic-example/src/Example.tsx +++ b/examples/basic-example/src/Example.tsx @@ -8,7 +8,9 @@ const baseUrl = import.meta.env.ELECTRIC_URL ?? `http://localhost:3000` export const Example = () => { const { data: items } = useShape({ url: `${baseUrl}/v1/shape`, - table: `items`, + params: { + table: `items`, + }, }) return ( diff --git a/examples/linearlite/src/pages/Issue/Comments.tsx b/examples/linearlite/src/pages/Issue/Comments.tsx index 26fa41d7a5..6d07cfbe3d 100644 --- a/examples/linearlite/src/pages/Issue/Comments.tsx +++ b/examples/linearlite/src/pages/Issue/Comments.tsx @@ -18,10 +18,10 @@ function Comments(commentProps: CommentsProps) { const [newCommentBody, setNewCommentBody] = useState(``) const allComments = useShape({ url: `${baseUrl}/v1/shape`, - table: `comment`, - databaseId, params: { token, + database_id: databaseId, + table: `comment`, }, })! as Comment[] diff --git a/examples/linearlite/src/shapes.ts b/examples/linearlite/src/shapes.ts index 53b399c0b0..6f044f7675 100644 --- a/examples/linearlite/src/shapes.ts +++ b/examples/linearlite/src/shapes.ts @@ -3,8 +3,8 @@ import { baseUrl, databaseId, token } from './electric' export const issueShape: ShapeStreamOptions = { url: `${baseUrl}/v1/shape/`, - table: `issue`, params: { + table: `issue`, token, database_id: databaseId, }, diff --git a/examples/nextjs-example/app/page.tsx b/examples/nextjs-example/app/page.tsx index 239ebe2a7d..50963659ba 100644 --- a/examples/nextjs-example/app/page.tsx +++ b/examples/nextjs-example/app/page.tsx @@ -5,17 +5,22 @@ import { useOptimistic } from "react" import { useShape, getShapeStream } from "@electric-sql/react" import "./Example.css" import { matchStream } from "./match-stream" +import { ShapeStreamOptions } from "@electric-sql/client/*" -const itemShape = () => { +const itemShape = (): ShapeStreamOptions => { if (typeof window !== `undefined`) { return { url: new URL(`/shape-proxy`, window?.location.origin).href, - table: `items`, + params: { + table: `items`, + }, } } else { return { url: new URL(`https://not-sure-how-this-works.com/shape-proxy`).href, - table: `items`, + params: { + table: `items`, + }, } } } diff --git a/examples/proxy-auth/app/page.tsx b/examples/proxy-auth/app/page.tsx index c845007fa6..2d77cd5612 100644 --- a/examples/proxy-auth/app/page.tsx +++ b/examples/proxy-auth/app/page.tsx @@ -22,7 +22,9 @@ const usersShape = (): ShapeStreamOptions => { return { url: new URL(`/shape-proxy?org_id=${org_id}`, window.location.origin) .href, - table: `users`, + params: { + table: `users`, + }, headers: { Authorization: org_id || ``, }, @@ -30,7 +32,9 @@ const usersShape = (): ShapeStreamOptions => { } else { return { url: new URL(`https://not-sure-how-this-works.com/shape-proxy`).href, - table: `items`, + params: { + table: `items`, + }, } } } diff --git a/examples/redis-sync/src/index.ts b/examples/redis-sync/src/index.ts index 178e54d6b7..2d6998e090 100644 --- a/examples/redis-sync/src/index.ts +++ b/examples/redis-sync/src/index.ts @@ -34,7 +34,9 @@ client.connect().then(async () => { const itemsStream = new ShapeStream({ url: `http://localhost:3000/v1/shape`, - table: `items`, + params: { + table: `items`, + }, }) itemsStream.subscribe(async (messages: Message[]) => { // Begin a Redis transaction diff --git a/examples/todo-app/src/routes/index.tsx b/examples/todo-app/src/routes/index.tsx index 7acae6ad62..0b3dd52813 100644 --- a/examples/todo-app/src/routes/index.tsx +++ b/examples/todo-app/src/routes/index.tsx @@ -20,7 +20,9 @@ type ToDo = { export default function Index() { const { data: todos } = useShape({ url: `http://localhost:3000/v1/shape`, - table: `todos`, + params: { + table: `todos`, + } }) todos.sort((a, b) => a.created_at - b.created_at) console.log({ todos }) diff --git a/packages/react-hooks/README.md b/packages/react-hooks/README.md index 95b4b7dd40..fb4f340f06 100644 --- a/packages/react-hooks/README.md +++ b/packages/react-hooks/README.md @@ -22,7 +22,9 @@ import { useShape } from "@electric-sql/react" export default function MyComponent () { const { isLoading, data } = useShape({ url: "http://my-api.com/shape", - table: `foo`, + params: { + table: `foo` + } }) if (isLoading) { diff --git a/packages/react-hooks/src/react-hooks.tsx b/packages/react-hooks/src/react-hooks.tsx index 6ad64cfb1f..f02125b5a8 100644 --- a/packages/react-hooks/src/react-hooks.tsx +++ b/packages/react-hooks/src/react-hooks.tsx @@ -23,8 +23,27 @@ export async function preloadShape = Row>( return shape } +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function sortObjectKeys(obj: any): any { + if (typeof obj !== `object` || obj === null) return obj + + if (Array.isArray(obj)) { + return obj.map(sortObjectKeys) + } + + return ( + Object.keys(obj) + .sort() + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .reduce>((sorted, key) => { + sorted[key] = sortObjectKeys(obj[key]) + return sorted + }, {}) + ) +} + export function sortedOptionsHash(options: ShapeStreamOptions): string { - return JSON.stringify(options, Object.keys(options).sort()) + return JSON.stringify(sortObjectKeys(options)) } export function getShapeStream>( diff --git a/packages/react-hooks/test/react-hooks.test-d.ts b/packages/react-hooks/test/react-hooks.test-d.ts index c5a5ab5ec3..9439b2fb02 100644 --- a/packages/react-hooks/test/react-hooks.test-d.ts +++ b/packages/react-hooks/test/react-hooks.test-d.ts @@ -5,7 +5,9 @@ import { Row } from 'packages/typescript-client/dist' describe(`useShape`, () => { it(`should infer correct return type when no selector is provided`, () => { const shape = useShape({ - table: ``, + params: { + table: ``, + }, url: ``, }) @@ -21,7 +23,9 @@ describe(`useShape`, () => { it(`should infer correct return type when a selector is provided`, () => { const shape = useShape({ - table: ``, + params: { + table: ``, + }, url: ``, selector: (_value: UseShapeResult) => { return { @@ -38,7 +42,9 @@ describe(`useShape`, () => { it(`should raise a type error if type argument does not equal inferred return type`, () => { const shape = useShape({ - table: ``, + params: { + table: ``, + }, url: ``, // @ts-expect-error - should have type mismatch, because doesn't match the declared `Number` type selector: (_value: UseShapeResult) => { diff --git a/packages/react-hooks/test/react-hooks.test.tsx b/packages/react-hooks/test/react-hooks.test.tsx index ba07052479..ac8c559b95 100644 --- a/packages/react-hooks/test/react-hooks.test.tsx +++ b/packages/react-hooks/test/react-hooks.test.tsx @@ -13,17 +13,41 @@ describe(`sortedOptionsHash`, () => { () => { const hash1 = sortedOptionsHash({ url: `http://whatever`, - table: `foo`, + params: { + table: `foo`, + }, offset: `-1`, }) const hash2 = sortedOptionsHash({ offset: `-1`, - table: `foo`, + params: { + table: `foo`, + }, url: `http://whatever`, }) expect(hash1).toEqual(hash2) } ) + bareIt( + `should create the different hashes from options with different params`, + () => { + const hash1 = sortedOptionsHash({ + url: `http://whatever`, + params: { + table: `foo`, + where: `1=1`, + }, + }) + const hash2 = sortedOptionsHash({ + params: { + table: `foo`, + where: `2=2`, + }, + url: `http://whatever`, + }) + expect(hash1).not.toEqual(hash2) + } + ) }) describe(`useShape`, () => { @@ -31,7 +55,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, subscribe: false, }) @@ -53,7 +79,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter?.signal, subscribe: false, }) @@ -73,7 +101,9 @@ describe(`useShape`, () => { renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: manualAborter.signal, subscribe: false, }) @@ -86,7 +116,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter?.signal, subscribe: false, }) @@ -101,7 +133,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (input, init) => { await sleep(10) return fetch(input, init) @@ -120,7 +154,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (input, init) => { await sleep(50) return fetch(input, init) @@ -148,7 +184,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, subscribe: true, }) @@ -181,8 +219,10 @@ describe(`useShape`, () => { const { result, rerender } = renderHook((options) => useShape(options), { initialProps: { url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, - where: `id = '${id}'`, + params: { + table: issuesTableUrl, + where: `id = '${id}'`, + }, signal: aborter.signal, subscribe: true, }, @@ -194,8 +234,10 @@ describe(`useShape`, () => { rerender({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, - where: `id = '${id2}'`, + params: { + table: issuesTableUrl, + where: `id = '${id2}'`, + }, signal: aborter.signal, subscribe: true, }) @@ -216,7 +258,9 @@ describe(`useShape`, () => { const { result } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, subscribe: true, selector: (result) => { @@ -263,7 +307,9 @@ describe(`useShape`, () => { ({ selector }) => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, subscribe: true, selector: selector, @@ -292,7 +338,9 @@ describe(`useShape`, () => { const { result, unmount } = renderHook(() => useShape({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, subscribe: true, }) diff --git a/packages/typescript-client/README.md b/packages/typescript-client/README.md index af2bf70534..6d8122bb8c 100644 --- a/packages/typescript-client/README.md +++ b/packages/typescript-client/README.md @@ -51,17 +51,19 @@ import { ShapeStream } from '@electric-sql/client' // Passes subscribers rows as they're inserted, updated, or deleted const stream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: `foo`, + params: { + table: `foo` + } }) // You can also add custom headers and URL parameters const streamWithParams = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: `foo`, headers: { 'Authorization': 'Bearer token' }, params: { + table: `foo`, 'custom-param': 'value' } }) @@ -80,7 +82,9 @@ import { ShapeStream, Shape } from '@electric-sql/client' const stream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: `foo`, + params: { + table: `foo` + } }) const shape = new Shape(stream) @@ -101,7 +105,9 @@ The ShapeStream provides two ways to handle errors: ```typescript const stream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: `foo`, + params: { + table: `foo` + }, onError: (error) => { // Handle all stream errors here console.error('Stream error:', error) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 68dc3aba62..d2944bf06c 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -39,18 +39,43 @@ import { } from './constants' const RESERVED_PARAMS = new Set([ - COLUMNS_QUERY_PARAM, LIVE_CACHE_BUSTER_QUERY_PARAM, SHAPE_HANDLE_QUERY_PARAM, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, - TABLE_QUERY_PARAM, - WHERE_QUERY_PARAM, - REPLICA_PARAM, ]) type Replica = `full` | `default` +/** + * PostgreSQL-specific shape parameters that can be provided externally + */ +type PostgresParams = { + /** The root table for the shape. Not required if you set the table in your proxy. */ + table?: string + + /** + * The columns to include in the shape. + * Must include primary keys, and can only include valid columns. + */ + columns?: string[] + + /** The where clauses for the shape */ + where?: string + + /** + * If `replica` is `default` (the default) then Electric will only send the + * changed columns in an update. + * + * If it's `full` Electric will send the entire row with both changed and + * unchanged values. + * + * Setting `replica` to `full` will result in higher bandwidth + * usage and so is not generally recommended. + */ + replica?: Replica +} + type ReservedParamKeys = | typeof COLUMNS_QUERY_PARAM | typeof LIVE_CACHE_BUSTER_QUERY_PARAM @@ -61,12 +86,38 @@ type ReservedParamKeys = | typeof WHERE_QUERY_PARAM | typeof REPLICA_PARAM -type ParamsRecord = Omit, ReservedParamKeys> +/** + * External params type - what users provide. + * Includes documented PostgreSQL params and allows string or string[] values for any additional params. + */ +type ExternalParamsRecord = Partial & { + [K in string as K extends ReservedParamKeys ? never : K]: string | string[] +} + +/** + * Internal params type - used within the library. + * All values are converted to strings. + */ +type InternalParamsRecord = { + [K in string as K extends ReservedParamKeys ? never : K]: string +} + +/** + * Helper function to convert external params to internal format + */ +function toInternalParams(params: ExternalParamsRecord): InternalParamsRecord { + const result: InternalParamsRecord = {} + for (const [key, value] of Object.entries(params)) { + result[key] = Array.isArray(value) ? value.join(`,`) : value + } + return result +} type RetryOpts = { - params?: ParamsRecord + params?: ExternalParamsRecord headers?: Record } + type ShapeStreamErrorHandler = ( error: Error ) => void | RetryOpts | Promise @@ -81,34 +132,6 @@ export interface ShapeStreamOptions { */ url: string - /** - * The root table for the shape. Passed as a query parameter. Not required if you set the table in your proxy. - */ - table?: string - - /** - * The where clauses for the shape. - */ - where?: string - - /** - * The columns to include in the shape. - * Must include primary keys, and can only inlude valid columns. - */ - columns?: string[] - - /** - * If `replica` is `default` (the default) then Electric will only send the - * changed columns in an update. - * - * If it's `full` Electric will send the entire row with both changed and - * unchanged values. - * - * Setting `replica` to `full` will obviously result in higher bandwidth - * usage and so is not recommended. - */ - replica?: Replica - /** * The "offset" on the shape log. This is typically not set as the ShapeStream * will handle this automatically. A common scenario where you might pass an offset @@ -135,9 +158,12 @@ export interface ShapeStreamOptions { * Additional request parameters to attach to the URL. * These will be merged with Electric's standard parameters. * Note: You cannot use Electric's reserved parameter names - * (table, where, columns, offset, handle, live, cursor, replica). + * (offset, handle, live, cursor). + * + * PostgreSQL-specific options like table, where, columns, and replica + * should be specified here. */ - params?: ParamsRecord + params?: ExternalParamsRecord /** * Automatically fetch updates to the Shape. If you just want to sync the current @@ -153,7 +179,7 @@ export interface ShapeStreamOptions { /** * A function for handling shapestream errors. * This is optional, when it is not provided any shapestream errors will be thrown. - * If the function is provided and returns an object containing parameters and/or headers + * If the function returns an object containing parameters and/or headers * the shapestream will apply those changes and try syncing again. * If the function returns void the shapestream is stopped. */ @@ -239,7 +265,6 @@ export class ShapeStream = Row> #shapeHandle?: string #schema?: Schema #onError?: ShapeStreamErrorHandler - #replica?: Replica constructor(options: ShapeStreamOptions>) { this.options = { subscribe: true, ...options } @@ -248,7 +273,6 @@ export class ShapeStream = Row> this.#liveCacheBuster = `` this.#shapeHandle = this.options.handle this.#messageParser = new MessageParser(options.parser) - this.#replica = this.options.replica this.#onError = this.options.onError const baseFetchClient = @@ -292,7 +316,7 @@ export class ShapeStream = Row> (!this.options.signal?.aborted && !this.#isUpToDate) || this.options.subscribe ) { - const { url, table, where, columns, signal } = this.options + const { url, signal } = this.options const fetchUrl = new URL(url) @@ -308,16 +332,30 @@ export class ShapeStream = Row> ) } - for (const [key, value] of Object.entries(this.options.params)) { - fetchUrl.searchParams.set(key, value) + // Add PostgreSQL-specific parameters from params + const params = toInternalParams(this.options.params) + if (params.table) + fetchUrl.searchParams.set(TABLE_QUERY_PARAM, params.table) + if (params.where) + fetchUrl.searchParams.set(WHERE_QUERY_PARAM, params.where) + if (params.columns) + fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, params.columns) + if (params.replica) + fetchUrl.searchParams.set(REPLICA_PARAM, params.replica) + + // Add any remaining custom parameters + const customParams = { ...params } + delete customParams.table + delete customParams.where + delete customParams.columns + delete customParams.replica + + for (const [key, value] of Object.entries(customParams)) { + fetchUrl.searchParams.set(key, value as string) } } // Add Electric's internal parameters - if (table) fetchUrl.searchParams.set(TABLE_QUERY_PARAM, table) - if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where) - if (columns && columns.length > 0) - fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, columns.join(`,`)) fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) if (this.#isUpToDate) { @@ -336,13 +374,6 @@ export class ShapeStream = Row> ) } - if ( - (this.#replica ?? ShapeStream.Replica.DEFAULT) != - ShapeStream.Replica.DEFAULT - ) { - fetchUrl.searchParams.set(REPLICA_PARAM, this.#replica as string) - } - // sort query params in-place for stable URLs and improved cache hits fetchUrl.searchParams.sort() diff --git a/packages/typescript-client/src/shape.ts b/packages/typescript-client/src/shape.ts index 43fc480791..553ee91878 100644 --- a/packages/typescript-client/src/shape.ts +++ b/packages/typescript-client/src/shape.ts @@ -21,7 +21,12 @@ export type ShapeChangedCallback = Row> = (data: { * @param {ShapeStream} - the underlying shape stream * @example * ``` - * const shapeStream = new ShapeStream<{ foo: number }>(url: `http://localhost:3000/v1/shape`, table: `foo`}) + * const shapeStream = new ShapeStream<{ foo: number }>({ + * url: `http://localhost:3000/v1/shape`, + * params: { + * table: `foo` + * } + * }) * const shape = new Shape(shapeStream) * ``` * diff --git a/packages/typescript-client/test/client.test-d.ts b/packages/typescript-client/test/client.test-d.ts index 0b7f9e80ad..615071e9e9 100644 --- a/packages/typescript-client/test/client.test-d.ts +++ b/packages/typescript-client/test/client.test-d.ts @@ -19,8 +19,10 @@ describe(`client`, () => { describe(`ShapeStream`, () => { it(`should infer generic row return type when no type is provided`, () => { const shapeStream = new ShapeStream({ - table: ``, url: ``, + params: { + table: ``, + }, }) expectTypeOf(shapeStream).toEqualTypeOf>() @@ -31,8 +33,10 @@ describe(`client`, () => { it(`should infer correct return type when provided`, () => { const shapeStream = new ShapeStream({ - table: ``, url: ``, + params: { + table: ``, + }, parser: { timestampz: (date: string) => { return new Date(date) @@ -52,8 +56,10 @@ describe(`client`, () => { describe(`Shape`, () => { it(`should infer generic row return type when no type is provided`, async () => { const shapeStream = new ShapeStream({ - table: ``, url: ``, + params: { + table: ``, + }, }) const shape = new Shape(shapeStream) @@ -70,8 +76,10 @@ describe(`client`, () => { it(`should infer correct return type when provided`, async () => { const shapeStream = new ShapeStream({ - table: ``, url: ``, + params: { + table: ``, + }, parser: { timestampz: (date: string) => { return new Date(date) diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index a4c92e6d0d..bef73bb4bf 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -13,7 +13,9 @@ describe(`Shape`, () => { const start = Date.now() const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, }) const shape = new Shape(shapeStream) @@ -28,8 +30,8 @@ describe(`Shape`, () => { expect(() => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: `foo`, params: { + table: `foo`, live: `false`, }, }) @@ -47,7 +49,9 @@ describe(`Shape`, () => { const start = Date.now() const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, }) const shape = new Shape(shapeStream) @@ -82,7 +86,9 @@ describe(`Shape`, () => { const start = Date.now() const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, }) const shape = new Shape(shapeStream) @@ -175,7 +181,9 @@ describe(`Shape`, () => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, fetchClient: fetchWrapper, }) @@ -212,7 +220,9 @@ describe(`Shape`, () => { const start = Date.now() const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, }) const shape = new Shape(shapeStream) @@ -247,7 +257,9 @@ describe(`Shape`, () => { it(`should support unsubscribe`, async ({ issuesTableUrl }) => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, }) const shape = new Shape(shapeStream) @@ -264,7 +276,9 @@ describe(`Shape`, () => { const aborter = new AbortController() const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, }) @@ -289,7 +303,9 @@ describe(`Shape`, () => { let fetchShouldFail = false const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (_input, _init) => { if (fetchShouldFail) throw new FetchError( @@ -333,7 +349,9 @@ describe(`Shape`, () => { const mockErrorHandler = vi.fn() new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (_input, _init) => { return new Response(undefined, { status: 401, @@ -367,8 +385,8 @@ describe(`Shape`, () => { new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, params: { + table: issuesTableUrl, todo: `fail`, }, fetchClient: async (input, _init) => { @@ -409,7 +427,9 @@ describe(`Shape`, () => { new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, headers: { Authorization: `invalid credentials`, }, @@ -445,7 +465,9 @@ describe(`Shape`, () => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, headers: { Authorization: `invalid credentials`, }, @@ -479,7 +501,9 @@ describe(`Shape`, () => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (input, _init) => { url = input.toString() const headers = new Headers() @@ -505,7 +529,9 @@ describe(`Shape`, () => { // Also check that electric-cursor is a required header for responses to live queries const shapeStreamLive = new ShapeStream({ url: `${BASE_URL}/v1/shape?live=true`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (input, _init) => { url = input.toString() const headers = new Headers() @@ -535,7 +561,9 @@ describe(`Shape`, () => { }) => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: false, }) @@ -549,7 +577,9 @@ describe(`Shape`, () => { it(`should expose isLoading status`, async ({ issuesTableUrl }) => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (input, init) => { await sleep(20) return fetch(input, init) @@ -566,7 +596,9 @@ describe(`Shape`, () => { it(`should expose lastOffset`, async ({ issuesTableUrl }) => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, fetchClient: async (input, init) => { await sleep(20) return fetch(input, init) @@ -591,8 +623,10 @@ describe(`Shape`, () => { const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, - replica: `full`, + params: { + table: issuesTableUrl, + replica: `full`, + }, signal: aborter.signal, }) try { diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index da10e7361b..90542388a9 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -29,7 +29,9 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: false, signal: aborter.signal, }) @@ -67,7 +69,9 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, fetchClient: fetchWrapper, }) @@ -149,7 +153,9 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, }) @@ -225,7 +231,9 @@ describe(`HTTP Sync`, () => { // Now fetch the data from the HTTP endpoint const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: tableUrl, + params: { + table: tableUrl, + }, signal: aborter.signal, }) const client = new Shape(issueStream) @@ -355,7 +363,9 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, }) let secondRowId = `` @@ -388,7 +398,6 @@ describe(`HTTP Sync`, () => { it(`should wait for processing before advancing stream`, async ({ aborter, issuesTableUrl, - insertIssues, }) => { // With initial data @@ -403,7 +412,9 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter.signal, fetchClient: fetchWrapper, }) @@ -446,7 +457,9 @@ describe(`HTTP Sync`, () => { const aborter1 = new AbortController() const issueStream1 = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter1.signal, }) @@ -454,7 +467,9 @@ describe(`HTTP Sync`, () => { const aborter2 = new AbortController() const issueStream2 = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, signal: aborter2.signal, }) @@ -495,9 +510,11 @@ describe(`HTTP Sync`, () => { let lastOffset: Offset = `-1` const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, - signal: aborter.signal, + params: { + table: issuesTableUrl, + }, subscribe: false, + signal: aborter.signal, }) await h.forEachMessage(issueStream, aborter, (res, msg) => { @@ -526,7 +543,9 @@ describe(`HTTP Sync`, () => { const newAborter = new AbortController() const newIssueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: false, signal: newAborter.signal, offset: lastOffset, @@ -577,8 +596,7 @@ describe(`HTTP Sync`, () => { await sleep(40) const res2 = await fetch( - `${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1`, - {} + `${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1` ) const etag2Header = res2.headers.get(`etag`) expect(etag2Header, `Response should have etag header`).not.toEqual(null) @@ -658,9 +676,10 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, - where: `title LIKE 'foo%'`, - subscribe: true, + params: { + table: issuesTableUrl, + where: `title LIKE 'foo%'`, + }, signal: aborter.signal, }) @@ -695,8 +714,10 @@ describe(`HTTP Sync`, () => { const shapeData = new Map() const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: tableUrl, - columns: [`txt`, `i2`, `i4`], + params: { + table: tableUrl, + columns: [`txt`, `i2`, `i4`], + }, signal: aborter.signal, }) await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { @@ -740,7 +761,9 @@ describe(`HTTP Sync`, () => { let lastOffset: Offset = `-1` const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: true, signal: aborter.signal, }) @@ -792,7 +815,9 @@ describe(`HTTP Sync`, () => { const newAborter = new AbortController() const newIssueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: false, signal: newAborter.signal, offset: lastOffset, @@ -800,7 +825,7 @@ describe(`HTTP Sync`, () => { fetchClient: fetchWrapper, }) - await h.forEachMessage(newIssueStream, aborter, (res, msg) => { + await h.forEachMessage(newIssueStream, newAborter, (res, msg) => { if (isUpToDateMessage(msg)) { res() } @@ -829,7 +854,9 @@ describe(`HTTP Sync`, () => { }) => { const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: true, signal: aborter.signal, }) @@ -841,10 +868,11 @@ describe(`HTTP Sync`, () => { let error: Error const invalidIssueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, - subscribe: true, + params: { + table: issuesTableUrl, + where: `1=1`, + }, handle: issueStream.shapeHandle, - where: `1=1`, onError: (err) => { error = err }, @@ -871,8 +899,8 @@ describe(`HTTP Sync`, () => { clearIssuesShape, }) => { // With initial data - const rowId = uuidv4() - const secondRowId = uuidv4() + const rowId = uuidv4(), + rowId2 = uuidv4() await insertIssues({ id: rowId, title: `foo1` }) const statusCodesReceived: number[] = [] @@ -882,7 +910,7 @@ describe(`HTTP Sync`, () => { // before any subsequent requests after the initial one, ensure // that the existing shape is deleted and some more data is inserted if (numRequests === 2) { - await insertIssues({ id: secondRowId, title: `foo2` }) + await insertIssues({ id: rowId2, title: `foo2` }) await clearIssuesShape(issueStream.shapeHandle) } @@ -898,7 +926,9 @@ describe(`HTTP Sync`, () => { const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, - table: issuesTableUrl, + params: { + table: issuesTableUrl, + }, subscribe: true, signal: aborter.signal, fetchClient: fetchWrapper, @@ -960,7 +990,7 @@ describe(`HTTP Sync`, () => { } else { // should get the second row as well with the new shape handle expect(msg.value).toEqual({ - id: secondRowId, + id: rowId2, title: `foo2`, priority: 10, }) diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index 8d333a070e..07f8d64bcc 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -25,7 +25,9 @@ describe(`ShapeStream`, () => { const aborter = new AbortController() new ShapeStream({ url: shapeUrl, - table: `foo`, + params: { + table: `foo`, + }, signal: aborter.signal, fetchClient: fetchWrapper, headers: { @@ -60,9 +62,11 @@ describe(`ShapeStream`, () => { const aborter = new AbortController() new ShapeStream({ url: shapeUrl, - table: `foo`, - where: `a=1`, - columns: [`id`], + params: { + table: `foo`, + where: `a=1`, + columns: [`id`], + }, handle: `potato`, signal: aborter.signal, fetchClient: fetchWrapper, diff --git a/website/docs/api/clients/typescript.md b/website/docs/api/clients/typescript.md index a74ee0cd57..c4e9561659 100644 --- a/website/docs/api/clients/typescript.md +++ b/website/docs/api/clients/typescript.md @@ -35,7 +35,9 @@ import { ShapeStream } from '@electric-sql/client' const stream = new ShapeStream({ url: `http://localhost:3000/v1/shape`, - table: 'items' + params: { + table: 'items' + } }) const shape = new Shape(stream) @@ -55,7 +57,9 @@ import { ShapeStream } from '@electric-sql/client' // Passes subscribers rows as they're inserted, updated, or deleted const stream = new ShapeStream({ url: `http://localhost:3000/v1/shape`, - table: `foo`, + params: { + table: `foo` + } }) stream.subscribe(messages => { @@ -88,32 +92,44 @@ export interface ShapeStreamOptions { databaseId?: string /** - * The root table for the shape. Passed as a query parameter. Not required if you set the table in your proxy. + * PostgreSQL-specific parameters for the shape. + * This includes table, where clause, columns, and replica settings. */ - table?: string - - /** - * The where clauses for the shape. - */ - where?: string - - /** - * The columns to include in the shape. - * Must include primary keys, and can only inlude valid columns. - */ - columns?: string[] - - /** - * If `replica` is `default` (the default) then Electric will only send the - * changed columns in an update. - * - * If it's `full` Electric will send the entire row with both changed and - * unchanged values. - * - * Setting `replica` to `full` will obviously result in higher bandwidth - * usage and so is not recommended. - */ - replica?: Replica + params: { + /** + * The root table for the shape. + */ + table: string + + /** + * The where clauses for the shape. + */ + where?: string + + /** + * The columns to include in the shape. + * Must include primary keys, and can only include valid columns. + */ + columns?: string[] + + /** + * If `replica` is `default` (the default) then Electric will only send the + * changed columns in an update. + * + * If it's `full` Electric will send the entire row with both changed and + * unchanged values. + * + * Setting `replica` to `full` will obviously result in higher bandwidth + * usage and so is not recommended. + */ + replica?: Replica + + /** + * Additional request parameters to attach to the URL. + * These will be merged with Electric's standard parameters. + */ + [key: string]: string | string[] | undefined + } /** * The "offset" on the shape log. This is typically not set as the ShapeStream @@ -137,14 +153,6 @@ export interface ShapeStreamOptions { */ headers?: Record - /** - * Additional request parameters to attach to the URL. - * These will be merged with Electric's standard parameters. - * Note: You cannot use Electric's reserved parameter names - * (table, where, columns, offset, etc.). - */ - params?: Record - /** * Automatically fetch updates to the Shape. If you just want to sync the current * shape and stop, pass false. @@ -169,7 +177,7 @@ export interface ShapeStreamOptions { /** * A function for handling errors. * This is optional, when it is not provided any shapestream errors will be thrown. - * If the function is provided and returns an object containing parameters and/or headers + * If the function returns an object containing parameters and/or headers * the shapestream will apply those changes and try syncing again. * If the function returns void the shapestream is stopped. */ @@ -189,40 +197,41 @@ type ShapeStreamErrorHandler = ( ``` Note that certain parameter names are reserved for Electric's internal use and cannot be used in custom params: -- `table` -- `where` -- `columns` - `offset` - `handle` - `live` - `cursor` - `database_id` -- `replica` - -Attempting to use these reserved names will throw an error. -### ShapeStream Configuration - -The ShapeStream constructor accepts several configuration options: +The following PostgreSQL-specific parameters should be included within the `params` object: +- `table` - The root table for the shape +- `where` - SQL where clause for filtering rows +- `columns` - List of columns to include +- `replica` - Controls whether to send full or partial row updates +Example with PostgreSQL-specific parameters: ```typescript const stream = new ShapeStream({ - // Required: URL to fetch shapes from url: 'http://localhost:3000/v1/shape', - table: 'items', - // E.g. add authentication header - headers: { - 'Authorization': 'Bearer token' - }, - // E.g. add custom URL parameters params: { - 'tenant': 'acme-corp', - 'version': '1.0' + table: 'users', + where: 'age > 18', + columns: ['id', 'name', 'email'], + replica: 'full' } }) ``` -Note: When using custom parameters, be careful not to use reserved parameter names as they may conflict with Electric's internal parameters. +You can also include additional custom parameters in the `params` object alongside the PostgreSQL-specific ones: +```typescript +const stream = new ShapeStream({ + url: 'http://localhost:3000/v1/shape', + params: { + table: 'users', + customParam: 'value' + } +}) +``` #### Messages @@ -258,7 +267,9 @@ You can extend this behaviour by configuring a custom parser. This is an object ```ts const stream = new ShapeStream({ url: `http://localhost:3000/v1/shape`, - table: `foo`, + params: { + table: `foo` + }, parser: { bool: (value: string) => value === `true` ? 1 : 0 } @@ -280,8 +291,10 @@ import { ShapeStream } from "@electric-sql/client" const stream = new ShapeStream({ url: `http://localhost:3000/v1/shape`, - table: `foo`, - replica: `full`, + params: { + table: `foo`, + replica: `full` + } }) ``` @@ -295,7 +308,9 @@ Using a custom error handler we can for instance refresh the authorization token ```ts const stream = new ShapeStream({ url: 'http://localhost:3000/v1/shape', - table: 'items', + params: { + table: 'items' + }, // Add authentication header headers: { 'Authorization': 'Bearer token' @@ -327,7 +342,9 @@ import { ShapeStream, Shape } from '@electric-sql/client' const stream = new ShapeStream({ url: `http://localhost:3000/v1/shape`, - table: `foo`, + params: { + table: `foo` + } }) const shape = new Shape(stream) @@ -349,7 +366,9 @@ The `subscribe` method allows you to receive updates whenever the shape changes. ```typescript const stream = new ShapeStream({ url: 'http://localhost:3000/v1/shape', - table: 'issues' + params: { + table: 'issues' + } }) // Subscribe to both message and error handlers @@ -392,7 +411,9 @@ The ShapeStream provides two ways to handle errors: ```typescript const stream = new ShapeStream({ url: 'http://localhost:3000/v1/shape', - table: 'issues', + params: { + table: 'issues' + }, onError: (error) => { // Handle all stream errors here if (error instanceof FetchError) { diff --git a/website/docs/integrations/react.md b/website/docs/integrations/react.md index 91c0e41929..f3e786b430 100644 --- a/website/docs/integrations/react.md +++ b/website/docs/integrations/react.md @@ -36,7 +36,9 @@ import { useShape } from '@electric-sql/react' const MyComponent = () => { const { isLoading, data } = useShape<{title: string}>({ url: `http://localhost:3000/v1/shape`, - table: 'items' + params: { + table: 'items' + } }) if (isLoading) { @@ -51,6 +53,22 @@ const MyComponent = () => { } ``` +You can also include additional PostgreSQL-specific parameters: + +```tsx +const MyFilteredComponent = () => { + const { isLoading, data } = useShape<{id: number, title: string}>({ + url: `http://localhost:3000/v1/shape`, + params: { + table: 'items', + where: 'status = \'active\'', + columns: ['id', 'title'] + } + }) + // ... +} +``` + `useShape` takes the same options as [ShapeStream](/docs/api/clients/typescript#options). The return value is a `UseShapeResult`: ```tsx @@ -88,7 +106,24 @@ export interface UseShapeResult = Row> { export const clientLoader = async () => { return await preloadShape({ url: `http://localhost:3000/v1/shape`, - table: 'items' + params: { + table: 'items' + } + }) +} +``` + +You can also preload filtered data: + +```tsx +export const filteredLoader = async () => { + return await preloadShape({ + url: `http://localhost:3000/v1/shape`, + params: { + table: 'items', + where: 'category = \'electronics\'', + columns: ['id', 'name', 'price'] + } }) } ``` @@ -102,7 +137,9 @@ It takes the same options as [ShapeStream](/docs/api/clients/typescript#options) ```tsx const itemsStream = getShapeStream({ url: `http://localhost:3000/v1/shape`, - table: 'items' + params: { + table: 'items' + } }) ``` @@ -113,7 +150,12 @@ This allows you to avoid consuming multiple streams for the same shape log. [`getShape`](https://github.com/electric-sql/electric/blob/main/packages/react-hooks/src/react-hooks.tsx#L49) get-or-creates a `Shape` off the global cache. ```tsx -const itemsShape = getShape(stream) +const itemsShape = getShape({ + url: `http://localhost:3000/v1/shape`, + params: { + table: 'items' + } +}) ``` This allows you to avoid materialising multiple shapes for the same stream.