Skip to content

Add log support to Splunk HEC exporter #875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 29, 2020
Merged
40 changes: 35 additions & 5 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/url"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
Expand Down Expand Up @@ -109,14 +110,23 @@ func (c *client) pushTraceData(
return numDroppedSpans, nil
}

err = c.sendSplunkEvents(splunkEvents)
if err != nil {
return td.SpanCount(), err
}

return numDroppedSpans, nil
}

func (c *client) sendSplunkEvents(splunkEvents []*splunkEvent) error {
body, compressed, err := encodeBodyEvents(&c.zippers, splunkEvents, c.config.DisableCompression)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
return consumererror.Permanent(err)
}

req, err := http.NewRequest("POST", c.url.String(), body)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
return consumererror.Permanent(err)
}

for k, v := range c.headers {
Expand All @@ -129,7 +139,7 @@ func (c *client) pushTraceData(

resp, err := c.client.Do(req)
if err != nil {
return td.SpanCount(), err
return err
}

io.Copy(ioutil.Discard, resp.Body)
Expand All @@ -141,10 +151,26 @@ func (c *client) pushTraceData(
"HTTP %d %q",
resp.StatusCode,
http.StatusText(resp.StatusCode))
return td.SpanCount(), err
return err
}
return nil
}

return numDroppedSpans, nil
func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (numDroppedLogs int, err error) {
c.wg.Add(1)
defer c.wg.Done()

splunkEvents, numDroppedLogs := logDataToSplunk(c.logger, ld, c.config)
if len(splunkEvents) == 0 {
return numDroppedLogs, nil
}

err = c.sendSplunkEvents(splunkEvents)
if err != nil {
return ld.LogRecordCount(), err
}

return numDroppedLogs, nil
}

func encodeBodyEvents(zippers *sync.Pool, evs []*splunkEvent, disableCompression bool) (bodyReader io.Reader, compressed bool, err error) {
Expand Down Expand Up @@ -197,6 +223,10 @@ func (c *client) stop(context context.Context) error {
return nil
}

func (c *client) start(context.Context, component.Host) (err error) {
return nil
}

func numMetricPoint(md pdata.Metrics) int {
_, numPoints := md.MetricAndDataPointCount()
return numPoints
Expand Down
120 changes: 120 additions & 0 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"net"
"net/http"
"net/url"
"sync"
"testing"
"time"
Expand All @@ -31,12 +32,16 @@ import (
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/testutil/metricstestutil"
"go.opentelemetry.io/collector/translator/conventions"
"go.opentelemetry.io/collector/translator/internaldata"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
)

func createMetricsData(numberOfDataPoints int) pdata.Metrics {
Expand Down Expand Up @@ -98,6 +103,31 @@ func createTraceData(numberOfTraces int) pdata.Traces {
})
}

func createLogData(numberOfLogs int) pdata.Logs {
logs := pdata.NewLogs()
rl := pdata.NewResourceLogs()
rl.InitEmpty()
logs.ResourceLogs().Append(rl)
ill := pdata.NewInstrumentationLibraryLogs()
ill.InitEmpty()
rl.InstrumentationLibraryLogs().Append(ill)

ts := pdata.TimestampUnixNano(123)
for i := 0; i < numberOfLogs; i++ {
logRecord := pdata.NewLogRecord()
logRecord.InitEmpty()
logRecord.Body().SetStringVal("mylog")
logRecord.Attributes().InsertString(conventions.AttributeServiceName, "myapp")
logRecord.Attributes().InsertString(splunk.SourcetypeLabel, "myapp-type")
logRecord.Attributes().InsertString(conventions.AttributeHostHostname, "myhost")
logRecord.Attributes().InsertString("custom", "custom")
logRecord.SetTimestamp(ts)
ill.Logs().Append(logRecord)
}

return logs
}

type CapturingData struct {
testing *testing.T
receivedRequest chan string
Expand Down Expand Up @@ -193,6 +223,42 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (
}
}

func runLogExport(disableCompression bool, numberOfLogs int, t *testing.T) (string, error) {
receivedRequest := make(chan string)
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !disableCompression}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
s := &http.Server{
Handler: &capture,
}
go func() {
panic(s.Serve(listener))
}()

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.DisableCompression = disableCompression
cfg.Token = "1234-1234"

params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg)
assert.NoError(t, err)

ld := createLogData(numberOfLogs)

err = exporter.ConsumeLogs(context.Background(), ld)
assert.NoError(t, err)
select {
case request := <-receivedRequest:
return request, nil
case <-time.After(5 * time.Second):
return "", errors.New("Timeout")
}
}

func TestReceiveTraces(t *testing.T) {
actual, err := runTraceExport(true, 3, t)
assert.NoError(t, err)
Expand All @@ -205,6 +271,18 @@ func TestReceiveTraces(t *testing.T) {
assert.Equal(t, expected, actual)
}

func TestReceiveLogs(t *testing.T) {
actual, err := runLogExport(true, 3, t)
assert.NoError(t, err)
expected := `{"time":0,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += "\n\r\n\r\n"
expected += `{"time":0,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += "\n\r\n\r\n"
expected += `{"time":0,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += "\n\r\n\r\n"
assert.Equal(t, expected, actual)
}

func TestReceiveMetrics(t *testing.T) {
actual, err := runMetricsExport(true, 3, t)
assert.NoError(t, err)
Expand All @@ -223,6 +301,12 @@ func TestReceiveTracesWithCompression(t *testing.T) {
assert.NotEqual(t, "", request)
}

func TestReceiveLogsWithCompression(t *testing.T) {
request, err := runLogExport(false, 5000, t)
assert.NoError(t, err)
assert.NotEqual(t, "", request)
}

func TestReceiveMetricsWithCompression(t *testing.T) {
request, err := runMetricsExport(false, 5000, t)
assert.NoError(t, err)
Expand Down Expand Up @@ -271,6 +355,11 @@ func TestInvalidTraces(t *testing.T) {
assert.Error(t, err)
}

func TestInvalidLogs(t *testing.T) {
_, err := runLogExport(false, 0, t)
assert.Error(t, err)
}

func TestInvalidMetrics(t *testing.T) {
_, err := runMetricsExport(false, 0, t)
assert.Error(t, err)
Expand Down Expand Up @@ -311,3 +400,34 @@ func TestInvalidJson(t *testing.T) {
reader, _, err := encodeBodyEvents(&syncPool, evs, false)
assert.Error(t, err, reader)
}

func TestStartAlwaysReturnsNil(t *testing.T) {
c := client{}
err := c.start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
}

func TestInvalidJsonClient(t *testing.T) {
badEvent := badJSON{
Foo: math.Inf(1),
}
evs := []*splunkEvent{
{
Event: badEvent,
},
nil,
}
c := client{url: nil, zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}}, config: &Config{Timeout: time.Microsecond}}
err := c.sendSplunkEvents(evs)
assert.EqualError(t, err, "Permanent error: json: unsupported value: +Inf")
}

func TestInvalidURLClient(t *testing.T) {
c := client{url: &url.URL{Host: "in va lid"}, zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}}, config: &Config{Timeout: time.Microsecond}}
err := c.sendSplunkEvents([]*splunkEvent{})
assert.EqualError(t, err, "Permanent error: parse \"//in%20va%20lid\": invalid URL escape \"%20\"")
}
17 changes: 15 additions & 2 deletions exporter/splunkhecexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ const (
type splunkExporter struct {
pushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)
pushTraceData func(ctx context.Context, td pdata.Traces) (numDroppedSpans int, err error)
pushLogData func(ctx context.Context, td pdata.Logs) (numDroppedSpans int, err error)
stop func(ctx context.Context) (err error)
start func(ctx context.Context, host component.Host) (err error)
}

type exporterOptions struct {
Expand Down Expand Up @@ -70,7 +72,9 @@ func createExporter(
return &splunkExporter{
pushMetricsData: client.pushMetricsData,
pushTraceData: client.pushTraceData,
pushLogData: client.pushLogData,
stop: client.stop,
start: client.start,
}, nil
}

Expand Down Expand Up @@ -108,8 +112,8 @@ func buildClient(options *exporterOptions, config *Config, logger *zap.Logger) *
}
}

func (se splunkExporter) Start(context.Context, component.Host) error {
return nil
func (se splunkExporter) Start(ctxt context.Context, host component.Host) error {
return se.start(ctxt, host)
}

func (se splunkExporter) Shutdown(ctxt context.Context) error {
Expand All @@ -134,3 +138,12 @@ func (se splunkExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) err
obsreport.EndTraceDataExportOp(ctx, td.SpanCount(), numDroppedSpans, err)
return err
}

func (se splunkExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
ctx = obsreport.StartLogsExportOp(ctx, typeStr)

numDroppedLogs, err := se.pushLogData(ctx, ld)

obsreport.EndLogsExportOp(ctx, ld.LogRecordCount(), numDroppedLogs, err)
return err
}
Loading