@@ -46,6 +46,18 @@ fn source_line_number(line: usize) -> i64 {
4646 i64:: try_from ( line) . unwrap_or ( i64:: MAX )
4747}
4848
49+ fn record_db_query_success ( span : & tracing:: Span , returned_rows : i64 ) {
50+ span. record ( "db.response.returned_rows" , returned_rows) ;
51+ span. record ( "otel.status_code" , "OK" ) ;
52+ }
53+
54+ fn record_db_query_error ( span : & tracing:: Span , returned_rows : i64 , error : & anyhow:: Error ) {
55+ span. record ( "db.response.returned_rows" , returned_rows) ;
56+ span. record ( "otel.status_code" , "ERROR" ) ;
57+ span. record ( "exception.message" , tracing:: field:: display ( error) ) ;
58+ span. record ( "exception.details" , tracing:: field:: debug ( error) ) ;
59+ }
60+
4961impl Database {
5062 pub ( crate ) async fn prepare_with (
5163 & self ,
@@ -87,6 +99,9 @@ pub fn stream_query_results_with_conn<'a>(
8799 db. system. name = request. app_state. db. info. database_type. otel_name( ) ,
88100 code. file. path = %source_file. display( ) ,
89101 code. line. number = source_line_number( stmt. query_position. start. line) ,
102+ otel. status_code = tracing:: field:: Empty ,
103+ exception. message = tracing:: field:: Empty ,
104+ exception. details = tracing:: field:: Empty ,
90105 db. response. returned_rows = tracing:: field:: Empty ,
91106 ) ;
92107 record_query_params( & query_span, & query. param_values) ;
@@ -103,16 +118,24 @@ pub fn stream_query_results_with_conn<'a>(
103118 returned_rows += 1 ;
104119 }
105120 apply_json_columns( & mut query_result, & stmt. json_columns) ;
106- apply_delayed_functions( request, & stmt. delayed_functions, & mut query_result) . instrument( query_span. clone( ) ) . await ?;
121+ if let Err ( err) = apply_delayed_functions( request, & stmt. delayed_functions, & mut query_result)
122+ . instrument( query_span. clone( ) )
123+ . await
124+ {
125+ error = Some ( err) ;
126+ break ;
127+ }
107128 for db_item in parse_dynamic_rows( query_result) {
108129 yield db_item;
109130 }
110131 }
111132 drop( stream) ;
112- query_span. record( "db.response.returned_rows" , returned_rows) ;
113133 if let Some ( error) = error {
134+ record_db_query_error( & query_span, returned_rows, & error) ;
114135 try_rollback_transaction( connection) . await ;
115136 yield DbItem :: Error ( error) ;
137+ } else {
138+ record_db_query_success( & query_span, returned_rows) ;
116139 }
117140 } ,
118141 ParsedStatement :: SetVariable { variable, value} => {
@@ -254,6 +277,9 @@ async fn execute_set_variable_query<'a>(
254277 db. system. name = request. app_state. db. info. database_type. otel_name( ) ,
255278 code. file. path = %source_file. display( ) ,
256279 code. line. number = source_line_number( statement. query_position. start. line) ,
280+ otel. status_code = tracing:: field:: Empty ,
281+ exception. message = tracing:: field:: Empty ,
282+ exception. details = tracing:: field:: Empty ,
257283 db. response. returned_rows = tracing:: field:: Empty ,
258284 ) ;
259285 record_query_params ( & query_span, & query. param_values ) ;
@@ -263,16 +289,17 @@ async fn execute_set_variable_query<'a>(
263289 . await
264290 {
265291 Ok ( Some ( row) ) => {
266- query_span . record ( "db.response.returned_rows" , 1_i64 ) ;
292+ record_db_query_success ( & query_span , 1_i64 ) ;
267293 row_to_string ( & row)
268294 }
269295 Ok ( None ) => {
270- query_span . record ( "db.response.returned_rows" , 0_i64 ) ;
296+ record_db_query_success ( & query_span , 0_i64 ) ;
271297 None
272298 }
273299 Err ( e) => {
274300 try_rollback_transaction ( connection) . await ;
275301 let err = display_stmt_db_error ( source_file, statement, e) ;
302+ record_db_query_error ( & query_span, 0_i64 , & err) ;
276303 return Err ( err) ;
277304 }
278305 } ;
@@ -599,7 +626,15 @@ impl<'q> sqlx::Execute<'q, Any> for StatementWithParams<'q> {
599626#[ cfg( test) ]
600627mod tests {
601628 use super :: * ;
629+ use std:: collections:: HashMap ;
630+ use std:: sync:: { Arc , Mutex } ;
631+
602632 use serde_json:: { json, Value } ;
633+ use tracing:: field:: { Field , Visit } ;
634+ use tracing_subscriber:: layer:: Context ;
635+ use tracing_subscriber:: prelude:: * ;
636+ use tracing_subscriber:: registry:: LookupSpan ;
637+ use tracing_subscriber:: Layer ;
603638
604639 fn create_row_item ( value : Value ) -> DbItem {
605640 DbItem :: Row ( value)
@@ -672,4 +707,125 @@ mod tests {
672707 assert_json_value ( & item, "json_col" , json ! ( { "key" : "value" } ) ) ;
673708 assert_json_value ( & item, "normal_col" , json ! ( "text" ) ) ;
674709 }
710+
711+ #[ derive( Default ) ]
712+ struct RecordedFields ( HashMap < & ' static str , String > ) ;
713+
714+ #[ derive( Clone , Default ) ]
715+ struct TestSpanLayer {
716+ closed_spans : Arc < Mutex < Vec < HashMap < & ' static str , String > > > > ,
717+ }
718+
719+ struct TestFieldVisitor < ' a > ( & ' a mut HashMap < & ' static str , String > ) ;
720+
721+ impl Visit for TestFieldVisitor < ' _ > {
722+ fn record_debug ( & mut self , field : & Field , value : & dyn std:: fmt:: Debug ) {
723+ self . 0 . insert ( field. name ( ) , format ! ( "{value:?}" ) ) ;
724+ }
725+
726+ fn record_str ( & mut self , field : & Field , value : & str ) {
727+ self . 0 . insert ( field. name ( ) , value. to_owned ( ) ) ;
728+ }
729+
730+ fn record_i64 ( & mut self , field : & Field , value : i64 ) {
731+ self . 0 . insert ( field. name ( ) , value. to_string ( ) ) ;
732+ }
733+ }
734+
735+ impl < S > Layer < S > for TestSpanLayer
736+ where
737+ S : tracing:: Subscriber + for < ' a > LookupSpan < ' a > ,
738+ {
739+ fn on_new_span (
740+ & self ,
741+ attrs : & tracing:: span:: Attributes < ' _ > ,
742+ id : & tracing:: span:: Id ,
743+ ctx : Context < ' _ , S > ,
744+ ) {
745+ let mut fields = RecordedFields :: default ( ) ;
746+ attrs. record ( & mut TestFieldVisitor ( & mut fields. 0 ) ) ;
747+ if let Some ( span) = ctx. span ( id) {
748+ span. extensions_mut ( ) . insert ( fields) ;
749+ }
750+ }
751+
752+ fn on_record (
753+ & self ,
754+ id : & tracing:: span:: Id ,
755+ values : & tracing:: span:: Record < ' _ > ,
756+ ctx : Context < ' _ , S > ,
757+ ) {
758+ if let Some ( span) = ctx. span ( id) {
759+ let mut extensions = span. extensions_mut ( ) ;
760+ let fields = extensions
761+ . get_mut :: < RecordedFields > ( )
762+ . expect ( "recorded fields" ) ;
763+ values. record ( & mut TestFieldVisitor ( & mut fields. 0 ) ) ;
764+ }
765+ }
766+
767+ fn on_close ( & self , id : tracing:: span:: Id , ctx : Context < ' _ , S > ) {
768+ if let Some ( span) = ctx. span ( & id) {
769+ let extensions = span. extensions ( ) ;
770+ let fields = extensions. get :: < RecordedFields > ( ) . expect ( "recorded fields" ) ;
771+ self . closed_spans . lock ( ) . unwrap ( ) . push ( fields. 0 . clone ( ) ) ;
772+ }
773+ }
774+ }
775+
776+ fn with_recorded_span_fields (
777+ f : impl FnOnce ( ) + Send + ' static ,
778+ ) -> HashMap < & ' static str , String > {
779+ let layer = TestSpanLayer :: default ( ) ;
780+ let closed_spans = layer. closed_spans . clone ( ) ;
781+ let subscriber = tracing_subscriber:: registry ( ) . with ( layer) ;
782+ tracing:: subscriber:: with_default ( subscriber, f) ;
783+ let fields = closed_spans
784+ . lock ( )
785+ . unwrap ( )
786+ . pop ( )
787+ . expect ( "closed span fields" ) ;
788+ fields
789+ }
790+
791+ #[ test]
792+ fn db_query_success_records_ok_status_and_row_count ( ) {
793+ let fields = with_recorded_span_fields ( || {
794+ let span = tracing:: info_span!(
795+ "db.query" ,
796+ otel. status_code = tracing:: field:: Empty ,
797+ exception. message = tracing:: field:: Empty ,
798+ exception. details = tracing:: field:: Empty ,
799+ db. response. returned_rows = tracing:: field:: Empty ,
800+ ) ;
801+ record_db_query_success ( & span, 3 ) ;
802+ drop ( span) ;
803+ } ) ;
804+
805+ assert_eq ! ( fields[ "otel.status_code" ] , "OK" ) ;
806+ assert_eq ! ( fields[ "db.response.returned_rows" ] , "3" ) ;
807+ assert ! ( !fields. contains_key( "exception.message" ) ) ;
808+ assert ! ( !fields. contains_key( "exception.details" ) ) ;
809+ }
810+
811+ #[ test]
812+ fn db_query_error_records_error_status_and_exception_fields ( ) {
813+ let fields = with_recorded_span_fields ( || {
814+ let span = tracing:: info_span!(
815+ "db.query" ,
816+ otel. status_code = tracing:: field:: Empty ,
817+ exception. message = tracing:: field:: Empty ,
818+ exception. details = tracing:: field:: Empty ,
819+ db. response. returned_rows = tracing:: field:: Empty ,
820+ ) ;
821+ let error = anyhow ! ( "query failed" ) . context ( "while executing SELECT 1" ) ;
822+ record_db_query_error ( & span, 2 , & error) ;
823+ drop ( span) ;
824+ } ) ;
825+
826+ assert_eq ! ( fields[ "otel.status_code" ] , "ERROR" ) ;
827+ assert_eq ! ( fields[ "db.response.returned_rows" ] , "2" ) ;
828+ assert ! ( fields[ "exception.message" ] . contains( "while executing SELECT 1" ) ) ;
829+ assert ! ( fields[ "exception.details" ] . contains( "query failed" ) ) ;
830+ }
675831}
0 commit comments