diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 3f31ffd..ff96bdb 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -20,21 +20,36 @@ import { ConfluentSubject, SchemaRegistryAPIClientOptions, AvroConfluentSchema, + ProtocolOptions, + LegacyOptions } from './@types' import { helperTypeFromSchemaType, schemaTypeFromString, schemaFromConfluentSchema, } from './schemaTypeResolver' +import { Type } from 'avsc/types' interface RegisteredSchema { id: number } +interface SchemaVersion { + subject: string + version: number +} + interface Opts { compatibility?: COMPATIBILITY separator?: string + subject: string, + referenceSchemaIds?: number[] +} + +interface Reference { + name: string subject: string + version: number } const DEFAULT_OPTS = { @@ -97,12 +112,23 @@ export default class SchemaRegistry { schema: RawAvroSchema | ConfluentSchema, userOpts?: Opts, ): Promise { - const { compatibility, separator } = { ...DEFAULT_OPTS, ...userOpts } + const { compatibility, separator, referenceSchemaIds } = { ...DEFAULT_OPTS, ...userOpts } + let opts = this.options; + + if (referenceSchemaIds) { + const referenceSchemas = Object.assign({}, ...await Promise.all(referenceSchemaIds.map(async (id) => { + const referenceSchema = await this.getSchema(id) as AvroSchema + return { [referenceSchema.name]: referenceSchema } + }))) + + opts = this.populateRegistryWithReferenceSchemas(referenceSchemas) + } const confluentSchema: ConfluentSchema = this.getConfluentSchema(schema) const helper = helperTypeFromSchemaType(confluentSchema.type) - const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) + const schemaInstance = schemaFromConfluentSchema(confluentSchema, opts) + // const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options, referenceSchemas) helper.validate(schemaInstance) let subject: ConfluentSubject @@ -133,11 +159,26 @@ export default class SchemaRegistry { } } + let references; + if (referenceSchemaIds) { + references = await Promise.all(referenceSchemaIds.map(async (id) => { + const refResponseData: SchemaVersion[] = (await this.api.Schema.versions({ id })).data() + const name = refResponseData[0].subject.split(separator).slice(-1)[0]; + return { + name, + subject: refResponseData[0].subject, + version: refResponseData[0].version + } + })) + } + + const response = await this.api.Subject.register({ subject: subject.name, body: { schemaType: confluentSchema.type, schema: confluentSchema.schema, + references }, }) @@ -150,13 +191,26 @@ export default class SchemaRegistry { public async getSchema(registryId: number): Promise { const schema = this.cache.getSchema(registryId) + let opts = this.options if (schema) { return schema } const response = await this.getSchemaOriginRequest(registryId) - const foundSchema: { schema: string; schemaType: string } = response.data() + const foundSchema: { schema: string; schemaType: string; references?: Reference[]; } = response.data() + + if (foundSchema.references) { + const referenceSchemas = Object.assign({}, ...await Promise.all(foundSchema.references.map(async (reference) => { + const referenceSchemaId = await this.getRegistryId(reference.subject, reference.version) + // @ts-ignore TODO: Fix typings for Schema... + const referenceType: Type = await this.getSchema(referenceSchemaId) + return { [reference.subject]: referenceType } + }))); + + opts = this.populateRegistryWithReferenceSchemas(referenceSchemas); + } + const rawSchema = foundSchema.schema const schemaType = schemaTypeFromString(foundSchema.schemaType) @@ -168,10 +222,24 @@ export default class SchemaRegistry { type: schemaType, schema: rawSchema, } - const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) + const schemaInstance = schemaFromConfluentSchema(confluentSchema, opts) return this.cache.setSchema(registryId, schemaInstance) } + private populateRegistryWithReferenceSchemas(referenceSchemas: { [x: string]: Type }) { + const schemaOptions = (this.options as LegacyOptions)?.forSchemaOptions || (this.options as ProtocolOptions)?.[SchemaType.AVRO] + + return { + [SchemaType.AVRO]: { + ...schemaOptions, + registry: { + ...schemaOptions?.registry, + ...referenceSchemas + } + } + } + } + public async encode(registryId: number, payload: any): Promise { if (!registryId) { throw new ConfluentSchemaRegistryArgumentError( diff --git a/src/api/index.ts b/src/api/index.ts index 4802411..ca662e3 100644 --- a/src/api/index.ts +++ b/src/api/index.ts @@ -28,6 +28,7 @@ export interface SchemaRegistryAPIClientArgs { export type SchemaRegistryAPIClient = Client<{ Schema: { find: (_: any) => any + versions: (_: any) => any } Subject: { all: (_: any) => any @@ -65,6 +66,10 @@ export default ({ method: 'get', path: '/schemas/ids/{id}', }, + versions: { + method: 'get', + path: '/schemas/ids/{id}/versions' + } }, Subject: { all: {