@@ -204,7 +204,8 @@ class MCPGymRolloutProcessor(BaseRolloutProcessor):
204204 """
205205
206206 def __init__ (self ):
207- self .current_run_state : Dict [str , Any ] = {}
207+ self .server = None
208+ self .policy = None
208209
209210 def __call__ (self , rows : List [EvaluationRow ], config : RolloutProcessorConfig ) -> List [asyncio .Task [EvaluationRow ]]:
210211 """Process evaluation rows with MCP gym environments."""
@@ -215,52 +216,43 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->
215216 if config .server_script_path is None :
216217 raise ValueError ("server_script_path is required for MCPGymRolloutProcessor" )
217218
218- server = MCPServerManager (config .server_script_path , port = 9700 , ** (config .kwargs or {}))
219+ self . server = MCPServerManager (config .server_script_path , port = 9700 , ** (config .kwargs or {}))
219220
220221 try :
221- server .start ()
222+ self . server .start ()
222223
223- policy = ep .LiteLLMPolicy (
224+ self . policy = ep .LiteLLMPolicy (
224225 model_id = config .completion_params .get ("model" , None ),
225226 temperature = config .completion_params .get ("temperature" , 0.0 ),
226227 max_tokens = config .completion_params .get ("max_tokens" , 4096 ),
227228 reasoning_effort = config .completion_params .get ("reasoning_effort" , None ),
228229 )
229230
230- # Store in instance state for cleanup
231- self .current_run_state .update (
232- {
233- "server" : server ,
234- "policy" : policy ,
235- }
236- )
237-
238231 except Exception as e :
239- server .stop ()
240- self .current_run_state .clear ()
232+ if self .server :
233+ self .server .stop ()
234+ self .server = None
235+ self .policy = None
241236 raise e
242237
243238 else :
244239 # Reuse existing MCP environments for retry
245- if not self .current_run_state :
240+ if not self .server or not self . policy :
246241 raise RuntimeError (
247242 "Cannot retry without existing server/environments. Call with start_server=True first."
248243 )
249244
250- server = self .current_run_state ["server" ]
251- policy = self .current_run_state ["policy" ]
252-
253245 # Create MCP environments directly from evaluation_rows
254246 envs = ep .make (
255247 "http://localhost:9700/mcp/" ,
256248 evaluation_rows = rows ,
257- model_id = policy .model_id ,
249+ model_id = self . policy .model_id ,
258250 )
259251
260252 # Get rollout tasks from ep.rollout
261253 tasks = ep .rollout (
262254 envs ,
263- policy = policy ,
255+ policy = self . policy ,
264256 evaluation_rows = rows ,
265257 steps = config .steps ,
266258 max_concurrent_rollouts = config .max_concurrent_rollouts ,
@@ -269,6 +261,7 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->
269261
270262 def cleanup (self ) -> None :
271263 """Cleanup MCP server and environments."""
272- if self .current_run_state and "server" in self .current_run_state :
273- self .current_run_state ["server" ].stop ()
274- self .current_run_state .clear ()
264+ if self .server :
265+ self .server .stop ()
266+ self .server = None
267+ self .policy = None
0 commit comments