Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 62 additions & 10 deletions instrumentation/opentelemetry/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"crypto/tls"
"crypto/x509"
"fmt"
"go.uber.org/zap"
"log"
"maps"
"net/http"
Expand Down Expand Up @@ -41,7 +40,10 @@
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
)

Expand All @@ -63,7 +65,8 @@
type ServiceOption func(*ServiceOptions)

type ServiceOptions struct {
headers map[string]string
headers map[string]string
grpcConn *grpc.ClientConn
}

func WithHeaders(headers map[string]string) ServiceOption {
Expand All @@ -75,6 +78,49 @@
}
}

// Please ref https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc#WithGRPCConn
// To create the grpc connection with same logic as goagent please use CreateGrpcConn
func WithGrpcConn(conn *grpc.ClientConn) ServiceOption {
return func(opts *ServiceOptions) {
opts.grpcConn = conn
}

Check warning on line 86 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L83-L86

Added lines #L83 - L86 were not covered by tests
}

// Can be used for external clients to reference the underlying connection for otlp grpc exporter
func CreateGrpcConn(cfg *config.AgentConfig) (*grpc.ClientConn, error) {
endpoint := removeProtocolPrefixForOTLP(cfg.GetReporting().GetEndpoint().GetValue())

dialOpts := []grpc.DialOption{}

if !cfg.GetReporting().GetSecure().GetValue() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
certFile := cfg.GetReporting().GetCertFile().GetValue()
if len(certFile) > 0 {
tlsCredentials, err := credentials.NewClientTLSFromFile(certFile, "")
if err != nil {
return nil, fmt.Errorf("error creating TLS credentials from cert path %s: %v", certFile, err)
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(tlsCredentials))
} else {
// Default to system certs
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}

Check warning on line 108 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L98-L108

Added lines #L98 - L108 were not covered by tests
}

if cfg.Reporting.GetEnableGrpcLoadbalancing().GetValue() {
resolver.SetDefaultScheme("dns")
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`))
}

conn, err := grpc.NewClient(endpoint, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to %s: %v", endpoint, err)
}

Check warning on line 119 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L118-L119

Added lines #L118 - L119 were not covered by tests

return conn, nil
}

func makePropagator(formats []config.PropagationFormat) propagation.TextMapPropagator {
var propagators []propagation.TextMapPropagator
for _, format := range formats {
Expand Down Expand Up @@ -177,6 +223,7 @@
zipkin.WithHeaders(serviceOpts.headers),
)
}

case config.TraceReporterType_LOGGING:
return func(opts ...ServiceOption) (sdktrace.SpanExporter, error) {
// TODO: Define if endpoint could be a filepath to write into a file.
Expand Down Expand Up @@ -211,7 +258,7 @@
return otlphttp.New(context.Background(), finalOpts...)
}

default:
default: // OTLP GRPC
standardOpts := []otlpgrpc.Option{
otlpgrpc.WithEndpoint(removeProtocolPrefixForOTLP(cfg.GetReporting().GetEndpoint().GetValue())),
}
Expand Down Expand Up @@ -246,6 +293,11 @@
finalOpts := append([]otlpgrpc.Option{}, standardOpts...)
finalOpts = append(finalOpts, otlpgrpc.WithHeaders(serviceOpts.headers))

// Important: gRPC connection takes precedence over other connection based options
if serviceOpts.grpcConn != nil {
finalOpts = append(finalOpts, otlpgrpc.WithGRPCConn(serviceOpts.grpcConn))
}

Check warning on line 299 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L298-L299

Added lines #L298 - L299 were not covered by tests

return otlptrace.New(
context.Background(),
otlpgrpc.NewClient(finalOpts...),
Expand Down Expand Up @@ -293,20 +345,20 @@

// Init initializes opentelemetry tracing and returns a shutdown function to flush data immediately
// on a termination signal.
func Init(cfg *config.AgentConfig) func() {
return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes)
func Init(cfg *config.AgentConfig, opts ...ServiceOption) func() {
return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes, opts...)
}

// InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor
// and returns a shutdown function to flush data immediately on a termination signal.
func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper,
versionInfoAttrs []attribute.KeyValue) func() {
versionInfoAttrs []attribute.KeyValue, opts ...ServiceOption) func() {
logger, err := zap.NewProduction()
if err != nil {
logger = nil
log.Printf("error while creating default zap logger %v", err)
}
return InitWithSpanProcessorWrapperAndZap(cfg, wrapper, versionInfoAttrs, logger)
return InitWithSpanProcessorWrapperAndZap(cfg, wrapper, versionInfoAttrs, logger, opts...)
}

// InitWithSpanProcessorWrapperAndZap initializes opentelemetry tracing with a wrapper over span processor
Expand Down Expand Up @@ -349,11 +401,11 @@

// Initialize metrics
metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs, opts...)

exporterFactory = makeExporterFactory(cfg)
configFactory = makeConfigFactory(cfg)

exporter, err := exporterFactory()
exporter, err := exporterFactory(opts...)

if err != nil {
log.Fatal(err)
}
Expand All @@ -374,8 +426,8 @@
if err != nil {
log.Fatal(err)
}

sampler := sdktrace.AlwaysSample()

tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sampler),
sdktrace.WithSpanProcessor(sp),
Expand Down
38 changes: 38 additions & 0 deletions instrumentation/opentelemetry/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/types/known/wrapperspb"

coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
)
Expand Down Expand Up @@ -589,3 +590,40 @@ func TestMakeExporterFactory_Headers_ZipkinAndOTLPHTTP(t *testing.T) {
})
}
}

func TestCreateGrpcConn(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
defer lis.Close()

server := grpc.NewServer()
defer server.Stop()

go func() {
if err := server.Serve(lis); err != nil {
t.Logf("Server exited with error: %v", err)
}
}()

cfg := &v1.AgentConfig{
Reporting: &v1.Reporting{
Endpoint: &wrapperspb.StringValue{
Value: lis.Addr().String(),
},
Secure: &wrapperspb.BoolValue{
Value: false,
},
},
}

conn, err := CreateGrpcConn(cfg)
require.NoError(t, err)
require.NotNil(t, conn)
conn.Close()

cfg.Reporting.EnableGrpcLoadbalancing = &wrapperspb.BoolValue{Value: true}
conn, err = CreateGrpcConn(cfg)
require.NoError(t, err)
require.NotNil(t, conn)
conn.Close()
}
Loading