This repository has been archived by the owner on Feb 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopic.go
122 lines (97 loc) · 3.05 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package kafkalock
import (
"context"
"errors"
"fmt"
"github.com/Shopify/sarama"
)
var clusterAdminFactory = sarama.NewClusterAdmin
func validateTopic(config *Config) error {
if config.TopicPolicy == TopicPolicyNoValidation {
return nil
}
kafkaConfig := *config.KafkaConfig
kafkaConfig.Metadata.Full = true
admin, err := clusterAdminFactory(config.BootstrapServers, &kafkaConfig)
if err != nil {
return fmt.Errorf("failed to create cluster admin: %w", err)
}
defer admin.Close()
topics, err := admin.ListTopics()
if err != nil {
return fmt.Errorf("failed to list topics: %w", err)
}
expectedTopic := &sarama.TopicDetail{
NumPartitions: int32(config.MaxHolders),
ReplicationFactor: int16(config.ReplicationFactor),
}
topic, ok := topics[config.Topic]
if !ok {
if config.TopicPolicy == TopicPolicyValidateOnly {
return fmt.Errorf("topic \"%s\" doesn't exist", config.Topic)
}
config.Logger.Infow("topic doesn't exist, going to create it", "partitions", config.MaxHolders)
return createTopic(admin, config.Topic, expectedTopic)
}
if isTopicValid(&topic, expectedTopic) {
return nil
}
if config.TopicPolicy != TopicPolicyDropAndCreate {
return fmt.Errorf(
"topic \"%s\" isn't configured properly. "+
"Expected number of partitions to be %d (got %d) and replication factor to be %d (got %d)",
config.Topic,
config.MaxHolders, topic.NumPartitions,
config.ReplicationFactor, topic.ReplicationFactor,
)
}
config.Logger.Infow("topic is invalid, going to re-create it",
"oldPartitions", topic.NumPartitions,
"partitions", config.MaxHolders,
"oldReplicationFactor", topic.ReplicationFactor,
"replicationFactor", expectedTopic.ReplicationFactor,
)
err = admin.DeleteTopic(config.Topic)
if err != nil {
return fmt.Errorf("failed to delete topic: %w", err)
}
err = waitForTopicDeletionAndCreateTopic(config, admin, config.Topic, expectedTopic)
if err != nil {
return fmt.Errorf(
"topic deletion didn't propage through the cluster before configured timeout was reached: %w",
err,
)
}
return nil
}
func isTopicValid(topic *sarama.TopicDetail, expectedTopic *sarama.TopicDetail) bool {
return topic.NumPartitions == expectedTopic.NumPartitions &&
topic.ReplicationFactor == expectedTopic.ReplicationFactor
}
func createTopic(admin sarama.ClusterAdmin, name string, topic *sarama.TopicDetail) error {
err := admin.CreateTopic(name, topic, false)
if err != nil {
return fmt.Errorf("failed to create topic: %w", err)
}
return nil
}
func waitForTopicDeletionAndCreateTopic(
config *Config, admin sarama.ClusterAdmin, name string, topic *sarama.TopicDetail,
) error {
ctx, cancel := context.WithTimeout(context.Background(), config.KafkaConfig.Admin.Timeout*3)
defer cancel()
for {
if ctx.Err() != nil {
return ctx.Err()
}
err := createTopic(admin, name, topic)
if err == nil {
return nil
}
if kafkaErr, ok := errors.Unwrap(err).(*sarama.TopicError); ok && kafkaErr.Err == sarama.ErrTopicAlreadyExists {
clock.Sleep(config.KafkaConfig.Admin.Timeout / 10)
continue
}
return err
}
}