Skip to content

implement schema-serde event streams#641

Merged
lucix-aws merged 4 commits intofeat-serde2-eventstreamfrom
feat-serde2-eventstream-impl
Apr 3, 2026
Merged

implement schema-serde event streams#641
lucix-aws merged 4 commits intofeat-serde2-eventstreamfrom
feat-serde2-eventstream-impl

Conversation

@lucix-aws
Copy link
Copy Markdown
Contributor

@lucix-aws lucix-aws commented Mar 19, 2026

implements schema-serde event stream support for aws-framed protocols (i.e. all of them)

https://smithy.io/2.0/aws/amazon-eventstream.html#amazon-eventstream

This is fundamentally the exact same thing that we had before, with all of the pieces of event stream wireup split out to allow for schema-serde. If you open up an existing generated event stream body, e.g. https://github.com/aws/aws-sdk-go-v2/blob/main/service/transcribestreaming/eventstream.go, you will see how things sort of map out in the new runtime. I made a point of preserving the structure of the old code as much as possible.

at a glance:

  • ClientProtocol is extended with new event stream APIs. in practice since all the protocols use the same one, our implementations can just embed the new eventstream.Codec to get support
  • the "generated" part of event streams is now just a type marshaler that figures out what variant schema to pass to the underlying event stream writer which is now in the runtime. pasted an example of that below
  • as you saw in a previous PR i migrated the aws eventstream/ module over to smithy-go. in this patch i've also flattened it a bit by dropping the eventstreamapi/ sub-package, which had middleware stuff in it which caused an import cycle. that got moved over to transport/http and the constants got promoted up into smithy-go/eventstream/
  • i ended up just adapting the SDK's sigv4 for event streams. I extended the auth resolution stuff to where smithyhttp.Signer can optionally implement a message signer, and the SDK provides support for that.

example of generated type adapter:

type audioStreamWriter struct {
    // smithyhttp.EventStreamWriter is new, previously basically its entire body of logic was generated inline per-op
    writer *smithyhttp.EventStreamWriter
}

var _ AudioStreamWriter = (*audioStreamWriter)(nil)

func (w *audioStreamWriter) Send(ctx context.Context, event types.AudioStream) error {
    var variant *smithy.Schema
    switch event.(type) {
    case *types.AudioStreamMemberAudioEvent:
        variant = schemas.AudioStream_AudioEvent
    case *types.AudioStreamMemberConfigurationEvent:
        variant = schemas.AudioStream_ConfigurationEvent
    default:
        return fmt.Errorf("unknown event type: %T", event)
    }   
    sv, ok := event.(smithy.Serializable)
    if !ok {
        return fmt.Errorf("event %T is not serializable", event)
    }   
   // pass it along to the new runtime, which handles delegating to the protocol
    return w.writer.Send(ctx, variant, sv) 
}

I did manual e2e tests w/ transcribestreaming,bedrockruntime, and cloudwatchlogs.

@lucix-aws lucix-aws requested review from a team as code owners March 19, 2026 14:50
@lucix-aws
Copy link
Copy Markdown
Contributor Author

relates #458

Symbol opEventStreamConstructor,
Symbol opEventStreamSymbol) {

// For v2 (early-return) event streams, we must:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

// DeserializeInitialResponse reads the first event stream message and
// deserializes it as the operation output.
func (c *Codec) DeserializeInitialResponse(schema *smithy.Schema, r io.Reader, out smithy.Deserializable) error {
c.payloadBuf = c.payloadBuf[0:0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL this pattern

}

func (d *ShapeDeserializer) ReadString(s *smithy.Schema, v *string) error {
if d.inBindings {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly a question for me, but why inBindings mean look into event stream stuff?

if d.inBindings && isEventHeader(s) {
return readEventHeaderInt(d, s, v)
}
return d.inner.ReadInt8(s, v)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do any bounds check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of doing bounds check I made it actually type assert for Int8Value, Int16Value, etc. to match what the old code did. so they should be functionally equivalent now

@lucix-aws lucix-aws merged commit ac2d590 into feat-serde2-eventstream Apr 3, 2026
1 check passed
@lucix-aws lucix-aws deleted the feat-serde2-eventstream-impl branch April 3, 2026 15:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants