Skip to content

Commit

Permalink
Merge pull request #85 from zhonghui12/nested-partition-key
Browse files Browse the repository at this point in the history
Add support for nested partition_key
  • Loading branch information
zhonghui12 authored Oct 29, 2020
2 parents 84e8b2a + 1da9c7c commit 2ea8765
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ If you think you’ve found a potential security issue, please do not post it in

* `region`: The region which your Kinesis Data Stream is in.
* `stream`: The name of the Kinesis Data Stream that you want log records sent to.
* `partition_key`: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. If the partition key is invalid, the plugin will print an warning message.
* `partition_key`: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. Nested partition key is supported and you can use `->` to point to your target key which is nested under another key. For example, your `partition_key` could be `kubernetes->pod_name`. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. If the partition key is invalid, the plugin will print an warning message.
* `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited.
* `log_key`: By default, the whole log record will be sent to Kinesis. If you specify a key name with this option, then only the value of that key will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `log_key log` and only the log message will be sent to Kinesis.
* `role_arn`: ARN of an IAM role to assume (for cross account access).
Expand Down
29 changes: 25 additions & 4 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques
}

partitionKey := outputPlugin.getPartitionKey(record)
logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
data, err := outputPlugin.processRecord(record, partitionKey)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
Expand Down Expand Up @@ -562,22 +563,42 @@ func (outputPlugin *OutputPlugin) randomString() string {
return string(outputPlugin.random.buffer)
}

func getFromMap(dataKey string, record map[interface{}]interface{}) interface{} {
for k, v := range record {
currentKey := stringOrByteArray(k)
if currentKey == dataKey {
return v
}
}

return ""
}

// getPartitionKey returns the value for a given valid key
// if the given key is empty or invalid, it returns a random string
func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string {
partitionKey := outputPlugin.partitionKey
if partitionKey != "" {
for k, v := range record {
dataKey := stringOrByteArray(k)
if dataKey == partitionKey {
value := stringOrByteArray(v)
partitionKeys := strings.Split(partitionKey, "->")
num := len(partitionKeys)
for count, dataKey := range partitionKeys {
newRecord := getFromMap(dataKey, record)
if count == num-1 {
value := stringOrByteArray(newRecord)
if value != "" {
if len(value) > partitionKeyMaxLength {
value = value[0:partitionKeyMaxLength]
}
return value
}
}
_, ok := newRecord.(map[interface{}]interface{})
if ok {
record = newRecord.(map[interface{}]interface{})
} else {
logrus.Errorf("[kinesis %d] The partition key could not be found in the record, using a random string instead", outputPlugin.PluginID)
return outputPlugin.randomString()
}
}
}
if outputPlugin.isAggregate {
Expand Down
42 changes: 42 additions & 0 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,45 @@ func TestDotReplace(t *testing.T) {
assert.Equal(t, "some message", log["message-key"]["message-value/one"])
assert.Equal(t, "some message", log["message-key"]["message-value/two"])
}

func TestGetPartitionKey(t *testing.T) {
record := map[interface{}]interface{}{
"testKey": []byte("test value with no nested keys"),
"testKeyWithOneNestedKey": map[interface{}]interface{}{
"nestedKey": []byte("test value with one nested key"),
},
"testKeyWithNestedKeys": map[interface{}]interface{}{
"outerKey": map[interface{}]interface{}{
"innerKey": []byte("test value with inner key"),
},
},
}

//test getPartitionKey() with single partition key
outputPlugin, _ := newMockOutputPlugin(nil, false)
outputPlugin.partitionKey = "testKey"
value := outputPlugin.getPartitionKey(record)
assert.Equal(t, value, "test value with no nested keys")

//test getPartitionKey() with nested partition key
outputPlugin.partitionKey = "testKeyWithOneNestedKey->nestedKey"
value = outputPlugin.getPartitionKey(record)
assert.Equal(t, value, "test value with one nested key")

outputPlugin.partitionKey = "testKeyWithNestedKeys->outerKey->innerKey"
value = outputPlugin.getPartitionKey(record)
assert.Equal(t, value, "test value with inner key")

//test getPartitionKey() with partition key not found
outputPlugin.partitionKey = "some key"
value = outputPlugin.getPartitionKey(record)
assert.Len(t, value, 8, "This should be a random string")

outputPlugin.partitionKey = "testKeyWithOneNestedKey"
value = outputPlugin.getPartitionKey(record)
assert.Len(t, value, 8, "This should be a random string")

outputPlugin.partitionKey = "testKeyWithOneNestedKey->someKey"
value = outputPlugin.getPartitionKey(record)
assert.Len(t, value, 8, "This should be a random string")
}

0 comments on commit 2ea8765

Please sign in to comment.