From 4e32ae50b1ff4778de643e471db9e6e74dd79791 Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 10:04:26 -0700 Subject: [PATCH 1/8] Make ser crate public --- avro/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 6ec1a337..742e23ed 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -868,7 +868,6 @@ mod decode; mod duration; mod encode; mod reader; -mod ser; mod ser_schema; mod util; mod writer; @@ -879,6 +878,7 @@ pub mod rabin; pub mod schema; pub mod schema_compatibility; pub mod schema_equality; +pub mod ser; pub mod types; pub mod validator; From bea500cb1a015981504db855fb4d2bad573b60b4 Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 10:57:16 -0700 Subject: [PATCH 2/8] Added ability to disable header writes, so buffers can be composed --- avro/src/writer.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/avro/src/writer.rs b/avro/src/writer.rs index 9c879918..8442b316 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -430,6 +430,15 @@ impl<'a, W: Write> Writer<'a, W> { } } + /// Disables writing the header into the writer. + /// This is useful for contexts when you just want to serialize blocks of Avro data + /// without creating a well formed Avro file. + /// + /// Please use at your own risk + pub fn disable_header_write(&mut self) { + self.has_header = true; + } + /// Create an Avro header based on schema, codec and sync marker. fn header(&self) -> Result, Error> { let schema_bytes = serde_json::to_string(self.schema) From 1870bc5f99d0a000fdefa3a0a93de64dca26a251 Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 15:18:00 -0700 Subject: [PATCH 3/8] wip --- avro/src/writer.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/avro/src/writer.rs b/avro/src/writer.rs index 8442b316..22b964ea 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -63,6 +63,11 @@ pub struct Writer<'a, W: Write> { user_metadata: HashMap, } +pub struct AvroSerializedBuffer { + buffer: Vec, + num_values: usize, +} + impl<'a, W: Write> Writer<'a, W> { /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write /// to. @@ -304,6 +309,69 @@ impl<'a, W: Write> Writer<'a, W> { Ok(num_bytes) } + /** + * Writes a previously serialized bundle of rows directly to the writer (see self::serialize_ser). + * This will not flush any intermediate buffers - only write the provided buffer + * directly to the underlying writer. + */ + pub fn extend_avro_serialized_buffer( + &mut self, + avro_serialized_buffer: AvroSerializedBuffer, + ) -> AvroResult { + let mut num_bytes = self.maybe_write_header()?; + let buffer = avro_serialized_buffer.buffer; + let stream_len = buffer.len(); + let num_values = avro_serialized_buffer.num_values; + + num_bytes += self.append_raw(&num_values.into(), &Schema::Long)? + + self.append_raw(&stream_len.into(), &Schema::Long)? + + self + .writer + .write(buffer.as_ref()) + .map_err(Details::WriteBytes)? + + self.append_marker()?; + + self.writer.flush().map_err(Details::FlushWriter)?; + + Ok(num_bytes) + } + + /** + * Serialize an iterator of serde::Serialize objects into an AvroSerializedBuffer. This call + * does not need a `mut` self - so it is safe to call from multiple threads to prepare data + * for writing. + */ + pub fn serialize_ser(&self, values: I) -> AvroResult + where + I: IntoIterator, + { + let rs = match self.resolved_schema { + Some(ref rs) => rs, + None => &ResolvedSchema::try_from(self.schema)?, + }; + + let mut buffer = Vec::new(); + let mut count = 0; + let mut serializer = SchemaAwareWriteSerializer::new( + &mut buffer, + self.schema, + rs.get_names(), + None, + ); + + for value in values { + value.serialize(&mut serializer)?; + count += 1; + } + + self.codec.compress(&mut buffer)?; + + Ok(AvroSerializedBuffer { + buffer, + num_values: count, + }) + } + /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema /// validation on each value appended. /// From a335ec41e8ad12ae2103b69cb18aafc067fd75a4 Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 15:20:01 -0700 Subject: [PATCH 4/8] make AvroSerializedBuffer private --- avro/src/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/avro/src/writer.rs b/avro/src/writer.rs index 22b964ea..f15ce69d 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -63,7 +63,7 @@ pub struct Writer<'a, W: Write> { user_metadata: HashMap, } -pub struct AvroSerializedBuffer { +struct AvroSerializedBuffer { buffer: Vec, num_values: usize, } From 6eb77470aa31af34a419a87fccfb453f1a0a4a6a Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 15:22:31 -0700 Subject: [PATCH 5/8] revert unneeded changes --- avro/src/lib.rs | 2 +- avro/src/writer.rs | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 742e23ed..6ec1a337 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -868,6 +868,7 @@ mod decode; mod duration; mod encode; mod reader; +mod ser; mod ser_schema; mod util; mod writer; @@ -878,7 +879,6 @@ pub mod rabin; pub mod schema; pub mod schema_compatibility; pub mod schema_equality; -pub mod ser; pub mod types; pub mod validator; diff --git a/avro/src/writer.rs b/avro/src/writer.rs index f15ce69d..c74eab87 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -498,15 +498,6 @@ impl<'a, W: Write> Writer<'a, W> { } } - /// Disables writing the header into the writer. - /// This is useful for contexts when you just want to serialize blocks of Avro data - /// without creating a well formed Avro file. - /// - /// Please use at your own risk - pub fn disable_header_write(&mut self) { - self.has_header = true; - } - /// Create an Avro header based on schema, codec and sync marker. fn header(&self) -> Result, Error> { let schema_bytes = serde_json::to_string(self.schema) From d2e585a4bce20e5f3442b52e924cb38c21532c28 Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 17:44:28 -0700 Subject: [PATCH 6/8] Make AvroSerializedBuffer pub again --- avro/src/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/avro/src/writer.rs b/avro/src/writer.rs index c74eab87..afae6310 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -63,7 +63,7 @@ pub struct Writer<'a, W: Write> { user_metadata: HashMap, } -struct AvroSerializedBuffer { +pub struct AvroSerializedBuffer { buffer: Vec, num_values: usize, } From 25a4ee3b8c2cc42bc0df176a4f0669b3e7852786 Mon Sep 17 00:00:00 2001 From: brent-statsig <126510059+brent-statsig@users.noreply.github.com> Date: Tue, 28 Oct 2025 22:57:09 -0700 Subject: [PATCH 7/8] Update avro/src/writer.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- avro/src/writer.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/avro/src/writer.rs b/avro/src/writer.rs index afae6310..e3438ac5 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -309,11 +309,9 @@ impl<'a, W: Write> Writer<'a, W> { Ok(num_bytes) } - /** - * Writes a previously serialized bundle of rows directly to the writer (see self::serialize_ser). - * This will not flush any intermediate buffers - only write the provided buffer - * directly to the underlying writer. - */ + /// Writes a previously serialized bundle of rows directly to the writer (see self::serialize_ser). + /// This will not flush any intermediate buffers - only write the provided buffer + /// directly to the underlying writer. pub fn extend_avro_serialized_buffer( &mut self, avro_serialized_buffer: AvroSerializedBuffer, From 06e27e6dc0266b42cb50ad202edb651e301646f3 Mon Sep 17 00:00:00 2001 From: Brent Echols Date: Tue, 28 Oct 2025 23:37:54 -0700 Subject: [PATCH 8/8] Updated docs and copilot review --- avro/src/writer.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/avro/src/writer.rs b/avro/src/writer.rs index e3438ac5..799d2043 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -63,6 +63,8 @@ pub struct Writer<'a, W: Write> { user_metadata: HashMap, } +/// A buffer containing Avro serialized data ready to be written to a Writer +/// See [Writer::serialize_ser] and [Writer::extend_avro_serialized_buffer] pub struct AvroSerializedBuffer { buffer: Vec, num_values: usize, @@ -309,7 +311,7 @@ impl<'a, W: Write> Writer<'a, W> { Ok(num_bytes) } - /// Writes a previously serialized bundle of rows directly to the writer (see self::serialize_ser). + /// Writes a previously serialized bundle of rows directly to the writer (see [serialize_ser](Self::serialize_ser)). /// This will not flush any intermediate buffers - only write the provided buffer /// directly to the underlying writer. pub fn extend_avro_serialized_buffer( @@ -334,11 +336,9 @@ impl<'a, W: Write> Writer<'a, W> { Ok(num_bytes) } - /** - * Serialize an iterator of serde::Serialize objects into an AvroSerializedBuffer. This call - * does not need a `mut` self - so it is safe to call from multiple threads to prepare data - * for writing. - */ + /// Serialize an iterator of serde::Serialize objects into an AvroSerializedBuffer. This call + /// does not need a `mut` self - so it is safe to call from multiple threads to prepare data + /// for writing. pub fn serialize_ser(&self, values: I) -> AvroResult where I: IntoIterator, @@ -350,12 +350,8 @@ impl<'a, W: Write> Writer<'a, W> { let mut buffer = Vec::new(); let mut count = 0; - let mut serializer = SchemaAwareWriteSerializer::new( - &mut buffer, - self.schema, - rs.get_names(), - None, - ); + let mut serializer = + SchemaAwareWriteSerializer::new(&mut buffer, self.schema, rs.get_names(), None); for value in values { value.serialize(&mut serializer)?;