Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fail_on): Added fail_on list to enable failing plan on condition such as decreasing partition count #432

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
SASLAWSProfile string
SASLAWSCredsDebug bool
SASLTokenUrl string
FailOn []string
}

type OAuth2Config interface {
Expand Down Expand Up @@ -301,6 +302,7 @@ func (config *Config) copyWithMaskedSensitiveValues() Config {
config.SASLAWSRoleArn,
config.SASLAWSCredsDebug,
config.SASLTokenUrl,
config.FailOn,
}
return copy
}
19 changes: 19 additions & 0 deletions kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,22 @@ func Test_newTLSConfig(t *testing.T) {
})
}
}

func TestConfigFailOn(t *testing.T) {
// Test with a value
cfg := &Config{FailOn: []string{"partition_lower"}}
if len(cfg.FailOn) != 1 {
t.Fatalf("expected 1 fail_on condition, got %d", len(cfg.FailOn))
}
if !Contains(cfg.FailOn, "partition_lower") {
t.Fatalf("expected fail_on condition 'partition_lower', got %s", cfg.FailOn[0])
}
}

func TestConfigFailOnEmpty(t *testing.T) {
// Test with an empty value
cfgEmpty := &Config{FailOn: []string{}}
if len(cfgEmpty.FailOn) != 0 {
t.Fatalf("expected 0 fail_on conditions, got %d", len(cfgEmpty.FailOn))
}
}
15 changes: 15 additions & 0 deletions kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ func Provider() *schema.Provider {
Default: 120,
Description: "Timeout in seconds",
},
"fail_on": {
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
DefaultFunc: func() (interface{}, error) {
return []interface{}{}, nil
},
Description: "List of conditions to fail on.",
},
},

ConfigureFunc: providerConfigure,
Expand Down Expand Up @@ -160,6 +169,11 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
return nil, fmt.Errorf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\", \"oauthbearer\" or \"plain\"", saslMechanism)
}

var failOn []string
for _, v := range d.Get("fail_on").([]interface{}) {
failOn = append(failOn, v.(string))
}

config := &Config{
BootstrapServers: brokers,
CACert: d.Get("ca_cert").(string),
Expand All @@ -178,6 +192,7 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
SASLMechanism: saslMechanism,
TLSEnabled: d.Get("tls_enabled").(bool),
Timeout: d.Get("timeout").(int),
FailOn: failOn,
}

if config.CACert == "" {
Expand Down
84 changes: 82 additions & 2 deletions kafka/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -44,10 +45,15 @@ func overrideProviderFactory() map[string]func() (*schema.Provider, error) {
}

func overrideProvider() (*schema.Provider, error) {
return overrideProviderFull(map[string]interface{}{})
}

func overrideProviderFull(configOverrides map[string]interface{}) (*schema.Provider, error) {

log.Println("[INFO] Setting up override for a provider")
provider := Provider()

rc, err := accTestProviderConfig()
rc, err := accTestProviderConfig(configOverrides)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +66,7 @@ func overrideProvider() (*schema.Provider, error) {
return provider, nil
}

func accTestProviderConfig() (*terraform.ResourceConfig, error) {
func accTestProviderConfig(overrides map[string]interface{}) (*terraform.ResourceConfig, error) {
bootstrapServers := bootstrapServersFromEnv()
bs := make([]interface{}, len(bootstrapServers))

Expand All @@ -72,6 +78,24 @@ func accTestProviderConfig() (*terraform.ResourceConfig, error) {
"bootstrap_servers": bs,
}

jsonOverrides, err := json.Marshal(overrides)
if err != nil {
panic(fmt.Sprintf("failed to marshal overrides: %v", err))
}

var newOverrides map[string]interface{}
err = json.Unmarshal(jsonOverrides, &newOverrides)
if err != nil {
panic(fmt.Sprintf("failed to unmarshal JSON: %v", err))
}

// Apply overrides
for k, v := range newOverrides {
if _, exists := raw[k]; !exists {
raw[k] = v
}
}

return terraform.NewResourceConfigRaw(raw), nil
}

Expand All @@ -92,3 +116,59 @@ func bootstrapServersFromEnv() []string {

return bootstrapServers
}

func ProviderChecker(t *testing.T, configOverrides map[string]interface{}) *Config {
provider, err := overrideProviderFull(configOverrides)
if err != nil {
t.Fatalf("could not configure provider: %v", err)
}

meta := provider.Meta()
if meta == nil {
t.Fatal("meta is nil")
}

cfg := meta.(*LazyClient).Config
if cfg == nil {
t.Fatal("expected non-nil Config")
}

return cfg
}

func Test_ProviderBaseConfig(t *testing.T) {
cfg := ProviderChecker(t, map[string]interface{}{})

if len(cfg.FailOn) != 0 {
t.Fatalf("expected 0 fail_on conditions, got %d", len(cfg.FailOn))
}
if Contains(cfg.FailOn, "partition_lower") {
t.Fatalf("fail_on contains 'partition_lower' but shouldn't: %+v", cfg.FailOn)
}
}

func Test_ProviderFailOnConfig(t *testing.T) {
var failOn []string
failOn = append(failOn, "partition_lower")
extraConfig := map[string]interface{}{
"fail_on": failOn,
}
cfg := ProviderChecker(t, extraConfig)

if len(cfg.FailOn) == 0 {
t.Fatalf("expected 1 fail_on conditions, got %d", len(cfg.FailOn))
}
if !Contains(cfg.FailOn, "partition_lower") {
t.Fatalf("fail_on missing 'partition_lower' but shouldn't: %+v", cfg.FailOn)
}
}

func Test_ProviderFailOnEmptyConfig(t *testing.T) {
cfg := ProviderChecker(t, map[string]interface{}{
"fail_on": []string{},
})

if len(cfg.FailOn) != 0 {
t.Fatalf("expected 0 fail_on conditions, got %d", len(cfg.FailOn))
}
}
31 changes: 23 additions & 8 deletions kafka/resource_kafka_acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,12 @@ resource "kafka_acl" "test" {
}
`

// lintignore:AT004
func cfg(t *testing.T, bs string, extraCfg string) string {
return full_cfg(t, bs, extraCfg, []string{})
}

// lintignore:AT004
func full_cfg(t *testing.T, bs string, extraCfg string, fail_on []string) string {
_, err := os.ReadFile("../secrets/ca.crt")
if err != nil {
t.Fatal(err)
Expand All @@ -270,11 +274,22 @@ func cfg(t *testing.T, bs string, extraCfg string) string {
t.Fatal(err)
}

return fmt.Sprintf(`
provider "kafka" {
bootstrap_servers = ["%s"]
}

%s
`, bs, extraCfg)
if len(fail_on) > 0 {
return fmt.Sprintf(`
provider "kafka" {
bootstrap_servers = ["%s"]
fail_on = %q
}

%s
`, bs, fail_on, extraCfg)
} else {
return fmt.Sprintf(`
provider "kafka" {
bootstrap_servers = ["%s"]
}

%s
`, bs, extraCfg)
}
}
9 changes: 7 additions & 2 deletions kafka/resource_kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,20 @@ func customDiff(ctx context.Context, diff *schema.ResourceDiff, v interface{}) e
if diff.Id() == "" {
return nil
}

client := v.(*LazyClient)
failOnPartitionLower := Contains(client.Config.FailOn, "partition_lower")

if diff.HasChange("partitions") {
log.Printf("[INFO] Partitions have changed!")
o, n := diff.GetChange("partitions")
oi := o.(int)
ni := n.(int)
log.Printf("[INFO] Partitions is changing from %d to %d", oi, ni)
if ni < oi {
if failOnPartitionLower {
return fmt.Errorf("decreasing the number of partitions is not allowed when fail_on includes 'partition_lower'")
}
log.Printf("Partitions decreased from %d to %d. Forcing new resource", oi, ni)
if err := diff.ForceNew("partitions"); err != nil {
return err
Expand All @@ -304,8 +311,6 @@ func customDiff(ctx context.Context, diff *schema.ResourceDiff, v interface{}) e

if diff.HasChange("replication_factor") {
log.Printf("[INFO] Checking the diff!")
client := v.(*LazyClient)

canAlterRF, err := client.CanAlterReplicationFactor()
if err != nil {
return err
Expand Down
Loading
Loading