Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: avrogen for topics with nested schema differences #2411

Open
michaldivisprocore opened this issue Feb 9, 2025 · 0 comments
Open

Comments

@michaldivisprocore
Copy link

Hi,

Hopefully, this is the right place to ask this.

I'm trying to consume multiple CDC Avro topics, each of their schemas defines a nested source schema, but the source definitions differ between topics. I haven't found a way to generate the C# schemas from .avsc files in a way where all of the consumers worked.

Note that I'm not in control of producing to or managing these topics. I can't change anything about them.

The schemas and other code used in the example below is fictional but roughly represents the real schemas I'm dealing with.

Companies schema:

{
  "namespace": "coffeeandco.companies",
  "type": "record",
  "name": "value",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {
      "name": "source",
      "type": {
        "connect.name": "io.debezium.connector.postgresql.Source",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "name": "name",
            "type": "string"
          }
        ],
        "name": "Source",
        "namespace": "io.debezium.connector.postgresql",
        "type": "record"
      }
    }
  ],
}

Projects schema:

{
  "namespace": "coffeeandco.projects",
  "type": "record",
  "name": "value",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {
      "name": "source",
      "type": {
        "connect.name": "io.debezium.connector.postgresql.Source",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          }
        ],
        "name": "Source",
        "namespace": "io.debezium.connector.postgresql",
        "type": "record"
      }
    }
  ]
}

Generating the schema for companies with avrogen -s schema-companies.avsc . creates value.cs and Source.cs files. The value.cs file isn't problematic, so I'm leaving it out.

source.cs

// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace io.debezium.connector.postgresql
{
    using System;
    using System.Collections.Generic;
    using System.Text;
    using global::Avro;
    using global::Avro.Specific;
    
    [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
    public partial class Source : global::Avro.Specific.ISpecificRecord
    {
        public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("""{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"ts_ms","type":"long"},{"name":"name","type":"string"}],"connect.name":"io.debezium.connector.postgresql.Source"}""");
        private string _version;
        private long _ts_ms;
        private string _name;
        public virtual global::Avro.Schema Schema
        {
            get
            {
                return Source._SCHEMA;
            }
        }
        public string version
        {
            get
            {
                return this._version;
            }
            set
            {
                this._version = value;
            }
        }
        public long ts_ms
        {
            get
            {
                return this._ts_ms;
            }
            set
            {
                this._ts_ms = value;
            }
        }
        public string name
        {
            get
            {
                return this._name;
            }
            set
            {
                this._name = value;
            }
        }
        public virtual object Get(int fieldPos)
        {
            switch (fieldPos)
            {
            case 0: return this.version;
            case 1: return this.ts_ms;
            case 2: return this.name;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }
        public virtual void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
            case 0: this.version = (System.String)fieldValue; break;
            case 1: this.ts_ms = (System.Int64)fieldValue; break;
            case 2: this.name = (System.String)fieldValue; break;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }
}

Generating the C# classes for projects with avrogen -s schema-projects.avsc . replaces the Source.cs with:

Source.cs

// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace io.debezium.connector.postgresql
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;

    [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
    public partial class Source : global::Avro.Specific.ISpecificRecord
    {
        public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("""{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"name","type":"string"}],"connect.name":"io.debezium.connector.postgresql.Source"}""");
        private string _version;
        private string _name;
        public virtual global::Avro.Schema Schema
        {
            get
            {
                return Source._SCHEMA;
            }
        }
        public string version
        {
            get
            {
                return this._version;
            }
            set
            {
                this._version = value;
            }
        }
        public string name
        {
            get
            {
                return this._name;
            }
            set
            {
                this._name = value;
            }
        }
        public virtual object Get(int fieldPos)
        {
            switch (fieldPos)
            {
            case 0: return this.version;
            case 1: return this.name;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }
        public virtual void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
            case 0: this.version = (System.String)fieldValue; break;
            case 1: this.name = (System.String)fieldValue; break;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }
}

It overwrites the companies' Source class with the projects' Source class as both sources share the same name and namespace in the Avro schema, but now the remaining Source class doesn't have the ts_ms field, which makes the companies consumer fail on deserialization with an error similar to:

Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> Avro.AvroException: Unable to cast object of type 'System.Nullable`1[System.Int64]' to type 'System.String'. in field schema in field source ---> Avro.AvroException: Unable to cast object of type 'System.Nullable`1[System.Int64]' to type 'System.String'. in field schema ---> System.InvalidCastException: Unable to cast object of type 'System.Nullable`1[System.Int64]' to type 'System.String'.
   at io.debezium.connector.postgresql.Source.Put(int fieldPos, object fieldValue) in Whatever/Source.cs:line 227
   at Avro.Specific.SpecificDefaultReader.ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)   --- End of inner exception stack trace ---
   at Avro.Specific.SpecificDefaultReader.ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)
   at Avro.Specific.SpecificDefaultReader.ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)   --- End of inner exception stack trace ---
   at Avro.Specific.SpecificDefaultReader.ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)
   at Avro.Generic.DefaultReader.Read<T>(T reuse, Decoder decoder)
   at Avro.Specific.SpecificReader<T>.Read(T reuse, Decoder dec)
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl<T>.Read(DatumReader<T> datumReader, Decoder decoder)
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl<T>+<Deserialize>d__7.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task, ConfigureAwaitOptions options)
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl<T>+<DeserializeAsync>d__6.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task, ConfigureAwaitOptions options)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer<T>+<DeserializeAsync>d__7.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task, ConfigureAwaitOptions options)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable<TResult>+ConfiguredTaskAwaiter.GetResult()
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer<T>.Deserialize(ReadOnlySpan<T> data, bool isNull, SerializationContext context)
   at Confluent.Kafka.Consumer<TKey, TValue>.Consume(int millisecondsTimeout)   --- End of inner exception stack trace ---
   at Confluent.Kafka.Consumer<TKey, TValue>.Consume(int millisecondsTimeout)
   at Confluent.Kafka.Consumer<TKey, TValue>.Consume(CancellationToken cancellationToken)
   ...

My guess is that the parser which is unaware of the ts_ms field is trying to parse name where the message actually contains ts_ms before the name.

Replacing Avro namespaces

I tried to adjust the namespaces of the generated C# classes so that each topic and its dependent schemas are under a special namespace. For example, the namespace Coffeeandco.Schemas.Companies would contain the value.cs and Source.cs for the companies topic, and so on. The command:

avrogen -s schema-companies.avsc .
    --namespace coffeeandco.companies:Coffeeandco.Schemas.Companies
    --namespace io.debezium.connector.postgresql:Coffeeandco.Schemas.Companies

This produces Source.cs that look like:

// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace Coffeeandco.Schemas.Companies
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;

    [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
    public partial class Source : global::Avro.Specific.ISpecificRecord
    {
        // even this string now contains the Coffeeandco.Schemas.Companies namespace... huh...
        public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("""{"type":"record","name":"Source","namespace":"Coffeeandco.Schemas.Companies","fields":[{"name":"version","type":"string"},{"name":"ts_ms","type":"long"},{"name":"name","type":"string"}],"connect.name":"io.debezium.connector.postgresql.Source"}""");
        private string _version;
        private long _ts_ms;
        private string _name;
        public virtual global::Avro.Schema Schema
        {
            get
            {
                return Source._SCHEMA;
            }
        }
        public string version
        {
            get
            {
                return this._version;
            }
            set
            {
                this._version = value;
            }
        }
        public long ts_ms
        {
            get
            {
                return this._ts_ms;
            }
            set
            {
                this._ts_ms = value;
            }
        }
        public string name
        {
            get
            {
                return this._name;
            }
            set
            {
                this._name = value;
            }
        }
        public virtual object Get(int fieldPos)
        {
            switch (fieldPos)
            {
            case 0: return this.version;
            case 1: return this.ts_ms;
            case 2: return this.name;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }
        public virtual void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
            case 0: this.version = (System.String)fieldValue; break;
            case 1: this.ts_ms = (System.Int64)fieldValue; break;
            case 2: this.name = (System.String)fieldValue; break;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }
}

Notice that even the namespace in the Avro schema string (_SCHEMA field) changed to Coffeeandco.Schemas.Companies. That seems to be a problem, because the consumer now fails with:

Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> Avro.AvroException: Schema mismatch
...

My guess is that we changed the Avro schema, so understandably, the Avro parser is complaining.

Revert _SCHEMA namespaces

Next I tried reverting the namespaces in the generated class' _SCHEMA field back to what is in schema registry.

Manually changing the Source.cs file to:

// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace Coffeeandco.Schemas.Companies
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;

    [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
    public partial class Source : global::Avro.Specific.ISpecificRecord
    {
        // changed namespace here in the string...
        public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("""{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"ts_ms","type":"long"},{"name":"name","type":"string"}],"connect.name":"io.debezium.connector.postgresql.Source"}""");
        private string _version;
        private long _ts_ms;
        private string _name;
        public virtual global::Avro.Schema Schema
        {
            get
            {
                return Source._SCHEMA;
            }
        }
        public string version
        {
            get
            {
                return this._version;
            }
            set
            {
                this._version = value;
            }
        }
        public long ts_ms
        {
            get
            {
                return this._ts_ms;
            }
            set
            {
                this._ts_ms = value;
            }
        }
        public string name
        {
            get
            {
                return this._name;
            }
            set
            {
                this._name = value;
            }
        }
        public virtual object Get(int fieldPos)
        {
            switch (fieldPos)
            {
            case 0: return this.version;
            case 1: return this.ts_ms;
            case 2: return this.name;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }
        public virtual void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
            case 0: this.version = (System.String)fieldValue; break;
            case 1: this.ts_ms = (System.Int64)fieldValue; break;
            case 2: this.name = (System.String)fieldValue; break;
            default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }
}

Now there's no schema mismatch, but the consumer still fails:

Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> Avro.AvroException: Unable to find type 'staging.cdc.procore.public.companies.compact.Value' in all loaded assemblies in field before
---> Avro.AvroException: Unable to find type 'staging.cdc.procore.public.companies.compact.Value' in all loaded assemblies
...

For whatever reason, the code from the Apache.Avro nuget package tries to find the class in loaded assemblies using the namespace from the _SCHEMA field and is unable to do that when that namespace differs from the C# class namespace.

What next?

I know I can always use a generic consumer instead, but I really do want to take advantage of schema registry and the fact that our company uses it. How can I make this work? Thanks in advance for any advice.

@michaldivisprocore michaldivisprocore changed the title Question: consuming Avro topics with nested schema differences Question: avrogen for topics with nested schema differences Feb 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants