-
Notifications
You must be signed in to change notification settings - Fork 635
fix(js): Compatibility with cfworker env #4116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @ssbushi, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the Genkit JavaScript SDK's adaptability to various runtime environments, particularly serverless platforms like Cloudflare Workers. It achieves this by providing an alternative Reflection API handler that adheres to standard Web API interfaces and by implementing a flexible schema validation mechanism that can operate without relying on dynamic code generation, which is often restricted in such environments. These changes broaden the deployment possibilities for Genkit applications, making them more versatile across different JavaScript runtimes. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces compatibility with Cloudflare Worker environments by adding an alternative schema validation mechanism using @cfworker/json-schema and a new reflection API handler based on the standard Web API Request and Response objects. The changes are well-structured, including updates to dependency management and feature flags to enable this new mode. My review includes a couple of suggestions to refactor duplicated code in the new reflection API handler to improve maintainability.
js/core/src/reflection.ts
Outdated
| const actions = await registry.listResolvableActions(); | ||
| const convertedActions: any = {}; | ||
| Object.keys(actions).forEach((key) => { | ||
| const action = actions[key]; | ||
| convertedActions[key] = { | ||
| key, | ||
| name: action.name, | ||
| description: action.description, | ||
| metadata: action.metadata, | ||
| }; | ||
| if (action.inputSchema || action.inputJsonSchema) { | ||
| convertedActions[key].inputSchema = toJsonSchema({ | ||
| schema: action.inputSchema, | ||
| jsonSchema: action.inputJsonSchema, | ||
| }); | ||
| } | ||
| if (action.outputSchema || action.outputJsonSchema) { | ||
| convertedActions[key].outputSchema = toJsonSchema({ | ||
| schema: action.outputSchema, | ||
| jsonSchema: action.outputJsonSchema, | ||
| }); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an opportunity to reduce code duplication here. The logic for converting actions to a serializable format is also present in the ReflectionServer class (lines 165-187).
To improve maintainability, you could extract this logic into a shared helper function that both ReflectionServer and createReflectionApiHandler can use.
For example:
function convertActionsForApi(actions: Record<string, Action>): any {
const convertedActions: any = {};
Object.keys(actions).forEach((key) => {
const action = actions[key];
convertedActions[key] = {
key,
name: action.name,
description: action.description,
metadata: action.metadata,
};
if (action.inputSchema || action.inputJsonSchema) {
convertedActions[key].inputSchema = toJsonSchema({
schema: action.inputSchema,
jsonSchema: action.inputJsonSchema,
});
}
if (action.outputSchema || action.outputJsonSchema) {
convertedActions[key].outputSchema = toJsonSchema({
schema: action.outputSchema,
jsonSchema: action.outputJsonSchema,
});
}
});
return convertedActions;
}
js/core/src/reflection.ts
Outdated
| if (stream) { | ||
| const encoder = new TextEncoder(); | ||
| const readable = new ReadableStream({ | ||
| async start(controller) { | ||
| try { | ||
| const onTraceStartCallback = (metadata: any) => { | ||
| traceId = metadata.traceId; | ||
| activeActions.set(traceId!, { | ||
| abortController, | ||
| startTime: new Date(), | ||
| }); | ||
| }; | ||
|
|
||
| const result = await action.run(input, { | ||
| context, | ||
| onChunk: (chunk) => { | ||
| controller.enqueue( | ||
| encoder.encode(JSON.stringify(chunk) + '\n') | ||
| ); | ||
| }, | ||
| telemetryLabels, | ||
| onTraceStart: onTraceStartCallback, | ||
| abortSignal: abortController.signal, | ||
| }); | ||
|
|
||
| await flushTracing(); | ||
| controller.enqueue( | ||
| encoder.encode( | ||
| JSON.stringify({ | ||
| result: result.result, | ||
| telemetry: { traceId: result.telemetry.traceId }, | ||
| } as RunActionResponse) | ||
| ) | ||
| ); | ||
| controller.close(); | ||
| } catch (err: any) { | ||
| const errorResponse: Status = { | ||
| code: isAbortError(err) | ||
| ? StatusCodes.CANCELLED | ||
| : StatusCodes.INTERNAL, | ||
| message: isAbortError(err) | ||
| ? 'Action was cancelled' | ||
| : err.message, | ||
| details: { stack: err.stack }, | ||
| }; | ||
| if (err.traceId) errorResponse.details.traceId = err.traceId; | ||
| controller.enqueue( | ||
| encoder.encode( | ||
| JSON.stringify({ error: errorResponse } as RunActionResponse) | ||
| ) | ||
| ); | ||
| controller.close(); | ||
| } finally { | ||
| if (traceId) activeActions.delete(traceId); | ||
| } | ||
| }, | ||
| }); | ||
|
|
||
| return new Response(readable, { | ||
| headers: { 'Content-Type': 'text/plain', ...corsHeaders }, | ||
| }); | ||
| } else { | ||
| // Non-streaming | ||
| let result; | ||
| try { | ||
| result = await action.run(input, { | ||
| context, | ||
| telemetryLabels, | ||
| onTraceStart: (metadata) => { | ||
| traceId = metadata.traceId; | ||
| activeActions.set(traceId!, { | ||
| abortController, | ||
| startTime: new Date(), | ||
| }); | ||
| }, | ||
| abortSignal: abortController.signal, | ||
| }); | ||
| await flushTracing(); | ||
| } catch (err: any) { | ||
| const errorResponse: Status = { | ||
| code: isAbortError(err) | ||
| ? StatusCodes.CANCELLED | ||
| : StatusCodes.INTERNAL, | ||
| message: isAbortError(err) ? 'Action was cancelled' : err.message, | ||
| details: { stack: err.stack, traceId }, | ||
| }; | ||
| return new Response( | ||
| JSON.stringify({ error: errorResponse } as RunActionResponse), | ||
| { | ||
| headers: { 'Content-Type': 'application/json', ...corsHeaders }, | ||
| } | ||
| ); | ||
| } finally { | ||
| if (traceId) activeActions.delete(traceId); | ||
| } | ||
|
|
||
| return new Response( | ||
| JSON.stringify({ | ||
| result: result.result, | ||
| telemetry: { traceId: result.telemetry.traceId }, | ||
| } as RunActionResponse), | ||
| { | ||
| headers: { 'Content-Type': 'application/json', ...corsHeaders }, | ||
| } | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for handling streaming and non-streaming requests in runAction has significant code duplication. Specifically, the onTraceStart callback, error response creation, and the finally block for cleanup are very similar in both the if (stream) and else branches.
Consider refactoring this to reduce duplication and improve maintainability. For instance:
- Define the
onTraceStartcallback once before theif (stream)block. - Create a helper function to build the error
Statusobject. - Structure the code to avoid duplicating the
try...catch...finallyblocks as much as possible.
This will make the code cleaner and easier to maintain in the future.
JS Fix for #3968
Cfworker restrictions on:
https://developers.cloudflare.com/workers/runtime-apis/nodejs/
Checklist (if applicable):