Skip to content

Commit

Permalink
Merge branch 'outputs'
Browse files Browse the repository at this point in the history
  • Loading branch information
monicasarbu committed Jul 9, 2014
2 parents de814ce + 4de0613 commit 26b01e0
Show file tree
Hide file tree
Showing 9 changed files with 676 additions and 173 deletions.
5 changes: 3 additions & 2 deletions deps.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
github.com/mattbaird/elastigo/api
github.com/mattbaird/elastigo/core
github.com/packetbeat/elastigo/api
github.com/packetbeat/elastigo/core
github.com/nranchev/go-libGeoIP
github.com/BurntSushi/toml
github.com/akrennmair/gopcap
github.com/tsg/fsnotify
github.com/garyburd/redigo/redis
labix.org/v2/mgo/bson
16 changes: 8 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ const (
var protocolNames = []string{"unknown", "http", "mysql", "redis", "pgsql"}

type tomlConfig struct {
Interfaces tomlInterfaces
RunOptions tomlRunOptions
Protocols map[string]tomlProtocol
Procs tomlProcs
Elasticsearch tomlMothership
Agent tomlAgent
Logging tomlLogging
Passwords tomlPasswords
Interfaces tomlInterfaces
RunOptions tomlRunOptions
Protocols map[string]tomlProtocol
Procs tomlProcs
Output map[string]tomlMothership
Agent tomlAgent
Logging tomlLogging
Passwords tomlPasswords
}

type tomlInterfaces struct {
Expand Down
152 changes: 152 additions & 0 deletions output_elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"encoding/json"
"fmt"
"strings"
"github.com/packetbeat/elastigo/api"
"github.com/packetbeat/elastigo/core"
)

type ElasticsearchOutputType struct {
OutputInterface
Index string
TopologyExpire int

TopologyMap map[string]string
}

type PublishedTopology struct {
Name string
IPs string
}

var ElasticsearchOutput ElasticsearchOutputType

func (out *ElasticsearchOutputType) Init(config tomlMothership) error {

api.Domain = config.Host
api.Port = fmt.Sprintf("%d", config.Port)
api.Username = config.Username
api.Password = config.Password
api.BasePath = config.Path

if config.Protocol != "" {
api.Protocol = config.Protocol
}

if config.Index != "" {
out.Index = config.Index
} else {
out.Index = "packetbeat"
}

out.TopologyExpire = 15000
if _Config.Agent.Topology_expire != 0 {
out.TopologyExpire = _Config.Agent.Topology_expire /*sec*/ * 1000 // millisec
}

err := out.EnableTTL()
if err != nil {
ERR("Fail to set _ttl mapping: %s", err)
return err
}

INFO("[ElasticsearchOutput] Using Elasticsearch %s://%s:%s%s", api.Protocol, api.Domain, api.Port, api.BasePath)
INFO("[ElasticsearchOutput] Using index pattern [%s-]YYYY.MM.DD", out.Index)
INFO("[ElasticsearchOutput] Topology expires after %ds", out.TopologyExpire / 1000)

return nil
}

func (out *ElasticsearchOutputType) EnableTTL() error {
setting := map[string]interface{}{
"server-ip": map[string]interface{}{
"_ttl": map[string]string{"enabled": "true", "default": "15000"},
},
}

// Make sure the index exists
_, err := core.Index("packetbeat-topology", "", "", nil, nil)
if err != nil {
return err
}

_, err = core.Index("packetbeat-topology", "server-ip", "_mapping", nil, setting)
if err != nil {
return err
}
return nil
}

func (out *ElasticsearchOutputType) GetNameByIP(ip string) string {
name, exists := out.TopologyMap[ip]
if !exists {
return ""
}
return name
}
func (out *ElasticsearchOutputType) PublishIPs(name string, localAddrs []string) error {
DEBUG("output_elasticsearch", "Publish IPs %s with expiration time %d", localAddrs, out.TopologyExpire)
_, err := core.IndexWithParameters(
"packetbeat-topology", /*index*/
"server-ip", /*type*/
name, /* id */
"", /*parent id */
0, /* version */
"", /* op_type */
"", /* routing */
"", /* timestamp */
out.TopologyExpire, /*ttl*/
"", /* percolate */
"", /* timeout */
false, /*refresh */
nil, /*args */
PublishedTopology{name, strings.Join(localAddrs, ",")} /* data */)

if err != nil {
ERR("Fail to publish IP addresses: %s", err)
return err
}

out.UpdateLocalTopologyMap()

return nil
}

func (out *ElasticsearchOutputType) UpdateLocalTopologyMap() {

// get all agents IPs from Elasticsearch
TopologyMapTmp := make(map[string]string)

res, err := core.SearchUri("packetbeat-topology", "server-ip", nil)
if err == nil {
for _, server := range res.Hits.Hits {
var pub PublishedTopology
err = json.Unmarshal([]byte(*server.Source), &pub)
if err != nil {
ERR("json.Unmarshal fails with: %s", err)
}
// add mapping
ipaddrs := strings.Split(pub.IPs, ",")
for _, addr := range ipaddrs {
TopologyMapTmp[addr] = pub.Name
}
}
} else {
ERR("Getting topology map fails with: %s", err)
}

// update topology map
out.TopologyMap = TopologyMapTmp

DEBUG("output_elasticsearch", "Topology map %s", out.TopologyMap)
}

func (out *ElasticsearchOutputType) PublishEvent(event *Event) error {

index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, event.Timestamp.Year(), event.Timestamp.Month(), event.Timestamp.Day())
_, err := core.Index(index, event.Type, "", nil, event)
DEBUG("output_elasticsearch", "Publish event")
return err
}
70 changes: 36 additions & 34 deletions publish_test.go → output_elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,58 @@
package main

import (
"github.com/packetbeat/elastigo/api"
"github.com/packetbeat/elastigo/core"
"testing"
"time"
)

func TestTopology(t *testing.T) {
const elasticsearchAddr = "localhost"
const elasticsearchPort = 9200

func createElasticsearchConnection() ElasticsearchOutputType {

var elasticsearchOutput ElasticsearchOutputType
elasticsearchOutput.Init(tomlMothership{
Enabled: true,
Save_topology: true,
Host: elasticsearchAddr,
Port: elasticsearchPort,
Username: "",
Password: "",
Path: "",
Index: "packetbeat",
Protocol: "",
})

return elasticsearchOutput
}

func TestTopologyInES(t *testing.T) {
if testing.Short() {
t.Skip("Skipping topology tests in short mode, because they require Elasticsearch")
}

api.Domain = "localhost"
api.Port = "9200"

_, _ = core.Delete("packetbeat-topology", "server-ip", "", nil)
var publisher1 PublisherType = PublisherType{name: "proxy1"}
var publisher2 PublisherType = PublisherType{name: "proxy2"}
var publisher3 PublisherType = PublisherType{name: "proxy3"}

elasticsearchOutput1 := createElasticsearchConnection()
elasticsearchOutput2 := createElasticsearchConnection()
elasticsearchOutput3 := createElasticsearchConnection()

publisher1.TopologyOutput = OutputInterface(&elasticsearchOutput1)
publisher2.TopologyOutput = OutputInterface(&elasticsearchOutput2)
publisher3.TopologyOutput = OutputInterface(&elasticsearchOutput3)

publisher1.PublishTopology("10.1.0.4")
publisher2.PublishTopology("10.1.0.9", "fe80::4e8d:79ff:fef2:de6a")
publisher3.PublishTopology("10.1.0.10")

// give some time to Elasticsearch to add the IPs
time.Sleep(1 * time.Second)

publisher1.UpdateTopology()
publisher2.UpdateTopology()
publisher3.UpdateTopology()
elasticsearchOutput3.UpdateLocalTopologyMap()

name2 := publisher1.GetServerName("10.1.0.9")
name2 := publisher3.GetServerName("10.1.0.9")
if name2 != "proxy2" {
t.Error("Failed to update proxy2 in topology: name=%s", name2)
}
Expand All @@ -48,41 +69,22 @@ func TestTopology(t *testing.T) {
// give some time to Elasticsearch to add the IPs
time.Sleep(1 * time.Second)

publisher1.UpdateTopology()
publisher2.UpdateTopology()
publisher3.UpdateTopology()
elasticsearchOutput3.UpdateLocalTopologyMap()

name3 := publisher1.GetServerName("192.168.1.2")
name3 := publisher3.GetServerName("192.168.1.2")
if name3 != "proxy3" {
t.Error("Failed to add a new IP")
}

name3 = publisher1.GetServerName("10.1.0.10")
name3 = publisher3.GetServerName("10.1.0.10")
if name3 != "" {
t.Error("Failed to delete old IP of proxy3: %s", name3)
}

name2 = publisher1.GetServerName("fe80::4e8d:79ff:fef2:de6a")
name2 = publisher3.GetServerName("fe80::4e8d:79ff:fef2:de6a")
if name2 != "" {
t.Error("Failed to delete old IP of proxy2: %s", name2)
}
}

func TestGetServerName(t *testing.T) {

if testing.Short() {
t.Skip("Skipping topology tests in short mode, because they require Elasticsearch")
}

LogInit(LOG_DEBUG, "" /*!toSyslog*/, true, []string{})
// TODO: delete old topology
api.Domain = "localhost"
api.Port = "9200"

var publisher PublisherType = PublisherType{name: "proxy1", RefreshTopologyTimer: time.Tick(1 * time.Second)}

name := publisher.GetServerName("127.0.0.1")
if name != "proxy1" {
t.Error("GetServerName should return the agent name in case of localhost: %s", name)
}
}
Loading

0 comments on commit 26b01e0

Please sign in to comment.