From 30e4890af0b5a3b57f3e117dc23a8c6c2430d65a Mon Sep 17 00:00:00 2001 From: Donald Propst Date: Mon, 14 Jul 2025 16:37:15 -0400 Subject: [PATCH 1/2] ENGTAI-64543: expose withGrpcConn to allow exporters to share connection --- instrumentation/opentelemetry/init.go | 64 ++++++++++++++++++++-- instrumentation/opentelemetry/init_test.go | 38 +++++++++++++ 2 files changed, 96 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index a55de9f..67d451e 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "go.uber.org/zap" "log" "maps" "net/http" @@ -41,7 +40,10 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" ) @@ -63,7 +65,8 @@ var ( type ServiceOption func(*ServiceOptions) type ServiceOptions struct { - headers map[string]string + headers map[string]string + grpcConn *grpc.ClientConn } func WithHeaders(headers map[string]string) ServiceOption { @@ -75,6 +78,49 @@ func WithHeaders(headers map[string]string) ServiceOption { } } +// Please ref https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc#WithGRPCConn +// To create the grpc connection with same logic as goagent please use CreateGrpcConn +func WithGrpcConn(conn *grpc.ClientConn) ServiceOption { + return func(opts *ServiceOptions) { + opts.grpcConn = conn + } +} + +// Can be used for external clients to reference the underlying connection for otlp grpc exporter +func CreateGrpcConn(cfg *config.AgentConfig) (*grpc.ClientConn, error) { + endpoint := removeProtocolPrefixForOTLP(cfg.GetReporting().GetEndpoint().GetValue()) + + dialOpts := []grpc.DialOption{} + + if !cfg.GetReporting().GetSecure().GetValue() { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + certFile := cfg.GetReporting().GetCertFile().GetValue() + if len(certFile) > 0 { + tlsCredentials, err := credentials.NewClientTLSFromFile(certFile, "") + if err != nil { + return nil, fmt.Errorf("error creating TLS credentials from cert path %s: %v", certFile, err) + } + dialOpts = append(dialOpts, grpc.WithTransportCredentials(tlsCredentials)) + } else { + // Default to system certs + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) + } + } + + if cfg.Reporting.GetEnableGrpcLoadbalancing().GetValue() { + resolver.SetDefaultScheme("dns") + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`)) + } + + conn, err := grpc.NewClient(endpoint, dialOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to %s: %v", endpoint, err) + } + + return conn, nil +} + func makePropagator(formats []config.PropagationFormat) propagation.TextMapPropagator { var propagators []propagation.TextMapPropagator for _, format := range formats { @@ -177,6 +223,7 @@ func makeExporterFactory(cfg *config.AgentConfig) func(serviceOpts ...ServiceOpt zipkin.WithHeaders(serviceOpts.headers), ) } + case config.TraceReporterType_LOGGING: return func(opts ...ServiceOption) (sdktrace.SpanExporter, error) { // TODO: Define if endpoint could be a filepath to write into a file. @@ -211,7 +258,7 @@ func makeExporterFactory(cfg *config.AgentConfig) func(serviceOpts ...ServiceOpt return otlphttp.New(context.Background(), finalOpts...) } - default: + default: // OTLP GRPC standardOpts := []otlpgrpc.Option{ otlpgrpc.WithEndpoint(removeProtocolPrefixForOTLP(cfg.GetReporting().GetEndpoint().GetValue())), } @@ -246,6 +293,11 @@ func makeExporterFactory(cfg *config.AgentConfig) func(serviceOpts ...ServiceOpt finalOpts := append([]otlpgrpc.Option{}, standardOpts...) finalOpts = append(finalOpts, otlpgrpc.WithHeaders(serviceOpts.headers)) + // Important: gRPC connection takes precedence over other connection based options + if serviceOpts.grpcConn != nil { + finalOpts = append(finalOpts, otlpgrpc.WithGRPCConn(serviceOpts.grpcConn)) + } + return otlptrace.New( context.Background(), otlpgrpc.NewClient(finalOpts...), @@ -349,11 +401,11 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro // Initialize metrics metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs, opts...) - exporterFactory = makeExporterFactory(cfg) configFactory = makeConfigFactory(cfg) - exporter, err := exporterFactory() + exporter, err := exporterFactory(opts...) + if err != nil { log.Fatal(err) } @@ -374,8 +426,8 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro if err != nil { log.Fatal(err) } - sampler := sdktrace.AlwaysSample() + tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sampler), sdktrace.WithSpanProcessor(sp), diff --git a/instrumentation/opentelemetry/init_test.go b/instrumentation/opentelemetry/init_test.go index a3f0775..8b15482 100644 --- a/instrumentation/opentelemetry/init_test.go +++ b/instrumentation/opentelemetry/init_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" + "google.golang.org/protobuf/types/known/wrapperspb" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" ) @@ -589,3 +590,40 @@ func TestMakeExporterFactory_Headers_ZipkinAndOTLPHTTP(t *testing.T) { }) } } + +func TestCreateGrpcConn(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + defer lis.Close() + + server := grpc.NewServer() + defer server.Stop() + + go func() { + if err := server.Serve(lis); err != nil { + t.Logf("Server exited with error: %v", err) + } + }() + + cfg := &v1.AgentConfig{ + Reporting: &v1.Reporting{ + Endpoint: &wrapperspb.StringValue{ + Value: lis.Addr().String(), + }, + Secure: &wrapperspb.BoolValue{ + Value: false, + }, + }, + } + + conn, err := CreateGrpcConn(cfg) + require.NoError(t, err) + require.NotNil(t, conn) + conn.Close() + + cfg.Reporting.EnableGrpcLoadbalancing = &wrapperspb.BoolValue{Value: true} + conn, err = CreateGrpcConn(cfg) + require.NoError(t, err) + require.NotNil(t, conn) + conn.Close() +} From cbf40569ecc5eafa20e5b37d5e7a245da5be1a1f Mon Sep 17 00:00:00 2001 From: Donald Propst Date: Mon, 14 Jul 2025 17:26:21 -0400 Subject: [PATCH 2/2] Allow service options to be wired through any Init method --- instrumentation/opentelemetry/init.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index 67d451e..ca6c0b6 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -345,20 +345,20 @@ func createCaCertPoolFromFile(certFile string) *x509.CertPool { // Init initializes opentelemetry tracing and returns a shutdown function to flush data immediately // on a termination signal. -func Init(cfg *config.AgentConfig) func() { - return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes) +func Init(cfg *config.AgentConfig, opts ...ServiceOption) func() { + return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes, opts...) } // InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor // and returns a shutdown function to flush data immediately on a termination signal. func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper, - versionInfoAttrs []attribute.KeyValue) func() { + versionInfoAttrs []attribute.KeyValue, opts ...ServiceOption) func() { logger, err := zap.NewProduction() if err != nil { logger = nil log.Printf("error while creating default zap logger %v", err) } - return InitWithSpanProcessorWrapperAndZap(cfg, wrapper, versionInfoAttrs, logger) + return InitWithSpanProcessorWrapperAndZap(cfg, wrapper, versionInfoAttrs, logger, opts...) } // InitWithSpanProcessorWrapperAndZap initializes opentelemetry tracing with a wrapper over span processor