Skip to content

Commit

Permalink
Add schema registry implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstucki committed Oct 15, 2024
1 parent 60373ad commit 0ac96b1
Show file tree
Hide file tree
Showing 44 changed files with 3,177 additions and 69 deletions.
38 changes: 38 additions & 0 deletions acceptance/features/schema-crds.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
@cluster:basic
Feature: Schema CRDs
Background: Cluster available
Given cluster "basic" is available

@skip:gke @skip:aks @skip:eks
Scenario: Managing Schemas
Given there is no schema "schema1" in cluster "basic"
When I apply Kubernetes manifest:
"""
---
apiVersion: cluster.redpanda.com/v1alpha2
kind: Schema
metadata:
name: schema1
spec:
cluster:
clusterRef:
name: basic
text: |
{
"type": "record",
"name": "test",
"fields":
[
{
"type": "string",
"name": "field1"
},
{
"type": "int",
"name": "field2"
}
]
}
"""
And schema "schema1" is successfully synced
Then I should be able to check compatibility against "schema1" in cluster "basic"
15 changes: 6 additions & 9 deletions acceptance/features/topic-crds.feature
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@ Feature: Topic CRDs
Given there is no topic "topic1" in cluster "basic"
When I apply Kubernetes manifest:
"""
# tag::basic-topic-example[]
# In this example manifest, a topic called "topic1" is created in a cluster called "basic". It has a replication factor of 1 and is distributed across a single partition.
---
apiVersion: cluster.redpanda.com/v1alpha2
kind: Topic
metadata:
name: topic1
name: topic1
spec:
cluster:
clusterRef:
name: basic
partitions: 1
replicationFactor: 1
# end::basic-topic-example[]
cluster:
clusterRef:
name: basic
partitions: 1
replicationFactor: 1
"""
And topic "topic1" is successfully synced
Then I should be able to produce and consume from "topic1" in cluster "basic"
64 changes: 27 additions & 37 deletions acceptance/features/user-crds.feature
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Feature: User CRDs
Given cluster "sasl" is available

@skip:gke @skip:aks @skip:eks
Scenario: Manage users
Scenario: Managing Users
Given there is no user "bob" in cluster "sasl"
And there is no user "james" in cluster "sasl"
And there is no user "alice" in cluster "sasl"
Expand All @@ -18,69 +18,59 @@ Feature: User CRDs
And "alice" should exist and be able to authenticate to the "sasl" cluster

@skip:gke @skip:aks @skip:eks
Scenario: Manage authentication-only users
Scenario: Managing Authentication-only Users
Given there is no user "jason" in cluster "sasl"
And there are already the following ACLs in cluster "sasl":
| user | acls |
| jason | [{"type":"allow","resource":{"type":"cluster"},"operations":["Read"]}] |
When I apply Kubernetes manifest:
"""
# tag::manage-authn-only-manifest[]
# In this example manifest, a user called "jason" is created in a cluster called "sasl".
# The user's password is defined in a Secret called "jason-password".
# This example assumes that you will create ACLs for this user separately.
---
apiVersion: cluster.redpanda.com/v1alpha2
kind: User
metadata:
name: jason
name: jason
spec:
cluster:
clusterRef:
name: sasl
authentication:
type: scram-sha-512
password:
valueFrom:
secretKeyRef:
name: jason-password
key: password
# end::manage-authn-only-manifest[]
cluster:
clusterRef:
name: sasl
authentication:
type: scram-sha-512
password:
valueFrom:
secretKeyRef:
name: jason-password
key: password
"""
And user "jason" is successfully synced
And I delete the CRD user "jason"
Then there should be ACLs in the cluster "sasl" for user "jason"

@skip:gke @skip:aks @skip:eks
Scenario: Manage authorization-only users
Scenario: Managing Authorization-only Users
Given there are the following pre-existing users in cluster "sasl"
| name | password | mechanism |
| travis | password | SCRAM-SHA-256 |
When I apply Kubernetes manifest:
"""
# tag::manage-authz-only-manifest[]
# In this example manifest, an ACL called "travis" is created in a cluster called "sasl".
# The ACL give an existing user called "travis" permissions to read from all topics whose names start with some-topic.
# This example assumes that you already have a user called "travis" in your cluster.
---
apiVersion: cluster.redpanda.com/v1alpha2
kind: User
metadata:
name: travis
name: travis
spec:
cluster:
clusterRef:
name: sasl
authorization:
acls:
- type: allow
resource:
type: topic
name: some-topic
patternType: prefixed
operations: [Read]
# end::manage-authz-only-manifest[]
cluster:
clusterRef:
name: sasl
authorization:
acls:
- type: allow
resource:
type: topic
name: some-topic
patternType: prefixed
operations: [Read]
"""
And user "travis" is successfully synced
And I delete the CRD user "travis"
Then "travis" should be able to authenticate to the "sasl" cluster with password "password" and mechanism "SCRAM-SHA-256"
Then "travis" should be able to authenticate to the "sasl" cluster with password "password" and mechanism "SCRAM-SHA-256"
1 change: 1 addition & 0 deletions acceptance/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect
github.com/twmb/franz-go/pkg/sr v1.0.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
Expand Down
1 change: 1 addition & 0 deletions acceptance/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1C
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 h1:alKdbddkPw3rDh+AwmUEwh6HNYgTvDSFIe/GWYRR9RM=
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0/go.mod h1:k8BoBjyUbFj34f0rRbn+Ky12sZFAPbmShrg0karAIMo=
github.com/twmb/franz-go/pkg/sr v1.0.1 h1:hf3eRFDUWSfmR7JQCS/3JiqZEQwqbiDSS/DooewMHCE=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs=
github.com/virtuald/go-ordered-json v0.0.0-20170621173500-b18e6e673d74 h1:JwtAtbp7r/7QSyGz8mKUbYJBg2+6Cd7OjM8o/GNOcVo=
Expand Down
24 changes: 24 additions & 0 deletions acceptance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
redpandav1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha1"
redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
"github.com/stretchr/testify/require"
rbacv1 "k8s.io/api/rbac/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -74,10 +75,33 @@ func TestMain(m *testing.M) {
},
})
t.Log("Successfully installed Redpanda operator chart")

// hack to patch the RBAC policies we'll need for schemas
var role rbacv1.Role
require.NoError(t, t.Get(ctx, t.ResourceKey("redpanda-operator"), &role))
role.Rules = append(role.Rules, []rbacv1.PolicyRule{
{
Verbs: []string{"get", "list", "patch", "update", "watch"},
APIGroups: []string{"cluster.redpanda.com"},
Resources: []string{"schemas"},
},
{
Verbs: []string{"update"},
APIGroups: []string{"cluster.redpanda.com"},
Resources: []string{"schemas/finalizers"},
},
{
Verbs: []string{"get", "patch", "update"},
APIGroups: []string{"cluster.redpanda.com"},
Resources: []string{"schemas/status"},
},
}...)
require.NoError(t, t.Update(ctx, &role))
}).
RegisterTag("cluster", 1, ClusterTag).
ExitOnCleanupFailures().
Build()

if err != nil {
fmt.Printf("error running test suite: %v\n", err)
os.Exit(1)
Expand Down
43 changes: 43 additions & 0 deletions acceptance/steps/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -65,6 +66,14 @@ func (c *clusterClients) Kafka(ctx context.Context) *kgo.Client {
return client
}

func (c *clusterClients) SchemaRegistry(ctx context.Context) *sr.Client {
t := framework.T(ctx)

client, err := c.factory.SchemaRegistryClient(ctx, c.resourceTarget)
require.NoError(t, err)
return client
}

func (c *clusterClients) RedpandaAdmin(ctx context.Context) *rpadmin.AdminAPI {
t := framework.T(ctx)

Expand Down Expand Up @@ -114,6 +123,40 @@ func (c *clusterClients) ExpectNoUser(ctx context.Context, user string) {
t.Logf("Found no user %q in cluster %q", user, c.cluster)
}

func (c *clusterClients) ExpectSchema(ctx context.Context, schema string) {
t := framework.T(ctx)

t.Logf("Checking that schema %q exists in cluster %q", schema, c.cluster)
c.checkSchema(ctx, schema, true, fmt.Sprintf("Schema %q does not exist in cluster %q", schema, c.cluster))
t.Logf("Found schema %q in cluster %q", schema, c.cluster)
}

func (c *clusterClients) ExpectNoSchema(ctx context.Context, schema string) {
t := framework.T(ctx)

t.Logf("Checking that schema %q does not exist in cluster %q", schema, c.cluster)
c.checkSchema(ctx, schema, false, fmt.Sprintf("Schema %q still exists in cluster %q", schema, c.cluster))
t.Logf("Found no schema %q in cluster %q", schema, c.cluster)
}

func (c *clusterClients) checkSchema(ctx context.Context, schema string, exists bool, message string) {
t := framework.T(ctx)

var subjects []string
var err error

if !assert.Eventually(t, func() bool {
t.Logf("Pulling list of schema subjects from cluster")
schemaRegistry := c.SchemaRegistry(ctx)
subjects, err = schemaRegistry.Subjects(ctx)
require.NoError(t, err)

return exists == slices.Contains(subjects, schema)
}, 10*time.Second, 1*time.Second, message) {
t.Errorf("Final list of schema subjects: %v", subjects)
}
}

func (c *clusterClients) ExpectTopic(ctx context.Context, topic string) {
t := framework.T(ctx)

Expand Down
5 changes: 5 additions & 0 deletions acceptance/steps/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func init() {

framework.RegisterStep(`^I apply Kubernetes manifest:$`, iApplyKubernetesManifest)

// Schema scenario steps
framework.RegisterStep(`^there is no schema "([^"]*)" in cluster "([^"]*)"$`, thereIsNoSchema)
framework.RegisterStep(`^schema "([^"]*)" is successfully synced$`, schemaIsSuccessfullySynced)
framework.RegisterStep(`^I should be able to check compatibility against "([^"]*)" in cluster "([^"]*)"$`, iShouldBeAbleToCheckCompatibilityAgainst)

// Topic scenario steps
framework.RegisterStep(`^there is no topic "([^"]*)" in cluster "([^"]*)"$`, thereIsNoTopic)
framework.RegisterStep(`^topic "([^"]*)" is successfully synced$`, topicIsSuccessfullySynced)
Expand Down
43 changes: 43 additions & 0 deletions acceptance/steps/schemas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package steps

import (
"context"

framework "github.com/redpanda-data/redpanda-operator/harpoon"
redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func schemaIsSuccessfullySynced(ctx context.Context, t framework.TestingT, schema string) {
var schemaObject redpandav1alpha2.Schema
require.NoError(t, t.Get(ctx, t.ResourceKey(schema), &schemaObject))

// make sure the resource is stable
checkStableResource(ctx, t, &schemaObject)

// make sure it's synchronized
t.RequireCondition(metav1.Condition{
Type: redpandav1alpha2.ResourceConditionTypeSynced,
Status: metav1.ConditionTrue,
Reason: redpandav1alpha2.ResourceConditionReasonSynced,
}, schemaObject.Status.Conditions)
}

func thereIsNoSchema(ctx context.Context, schema, cluster string) {
clientsForCluster(ctx, cluster).ExpectNoSchema(ctx, schema)
}

func iShouldBeAbleToCheckCompatibilityAgainst(ctx context.Context, t framework.TestingT, schema, cluster string) {
clients := clientsForCluster(ctx, cluster)
clients.ExpectSchema(ctx, schema)
}
4 changes: 2 additions & 2 deletions acceptance/steps/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func iCreateCRDbasedUsers(ctx context.Context, t framework.TestingT, cluster str

// make sure it's synchronized
t.RequireCondition(metav1.Condition{
Type: redpandav1alpha2.UserConditionTypeSynced,
Type: redpandav1alpha2.ResourceConditionTypeSynced,
Status: metav1.ConditionTrue,
Reason: redpandav1alpha2.UserConditionReasonSynced,
Reason: redpandav1alpha2.ResourceConditionReasonSynced,
}, user.Status.Conditions)

t.Cleanup(func(ctx context.Context) {
Expand Down
Loading

0 comments on commit 0ac96b1

Please sign in to comment.