55in the background and capturing their output for injection into chat streams.
66"""
77
8+ import codecs
9+ import os
10+ import platform
811import subprocess
912import threading
1013from collections import deque
1114from typing import Dict , Optional , Tuple
1215
16+ try :
17+ import pty
18+ import termios
19+
20+ HAS_PTY = True
21+ except ImportError :
22+ HAS_PTY = False
23+
1324
1425class CircularBuffer :
1526 """
@@ -89,14 +100,48 @@ def size(self) -> int:
89100 return sum (len (chunk ) for chunk in self .buffer )
90101
91102
103+ class InputBuffer :
104+ """
105+ Thread-safe buffer for queuing input to be sent to a process.
106+ """
107+
108+ def __init__ (self ):
109+ self .queue = deque ()
110+ self .lock = threading .Lock ()
111+
112+ def append (self , text : str ) -> None :
113+ """Add text to the input queue."""
114+ with self .lock :
115+ self .queue .append (text )
116+
117+ def pop_all (self ) -> str :
118+ """Get and clear all queued input."""
119+ with self .lock :
120+ result = "" .join (self .queue )
121+ self .queue .clear ()
122+ return result
123+
124+ def has_input (self ) -> bool :
125+ """Check if there is queued input."""
126+ with self .lock :
127+ return len (self .queue ) > 0
128+
129+
92130class BackgroundProcess :
93131 """
94132 Represents a background process with output capture.
95133 """
96134
97135 def __init__ (
98- self , command : str , process : subprocess .Popen , buffer : CircularBuffer , persist : bool = False
136+ self ,
137+ command : str ,
138+ process : subprocess .Popen ,
139+ buffer : CircularBuffer ,
140+ persist : bool = False ,
141+ input_buffer : Optional [InputBuffer ] = None ,
142+ master_fd : Optional [int ] = None ,
99143 ):
144+ self .master_fd = master_fd
100145 """
101146 Initialize background process wrapper.
102147
@@ -116,7 +161,11 @@ def __init__(
116161 self .start_time = time .time ()
117162 self .end_time = None
118163 self .persist = persist
164+ self .input_buffer = input_buffer or InputBuffer ()
165+ self .writer_thread = None
166+ self ._stop_event = threading .Event ()
119167 self ._start_output_reader ()
168+ self ._start_input_writer ()
120169
121170 def _start_output_reader (self ) -> None :
122171 """Start thread to read process output."""
@@ -128,22 +177,71 @@ def reader():
128177 # we're in a separate thread and the buffer will capture
129178 # output as soon as it's available
130179
131- # Read stdout
132- for line in iter (self .process .stdout .readline , "" ):
133- if line :
134- self .buffer .append (line )
180+ if self .master_fd is not None :
181+ while not self ._stop_event .is_set ():
182+ try :
183+ data = os .read (self .master_fd , 4096 ).decode (errors = "replace" )
184+ if not data :
185+ break
186+ self .buffer .append (data )
187+ except (OSError , EOFError ):
188+ break
189+ else :
190+ # Read stdout
191+ for line in iter (self .process .stdout .readline , "" ):
192+ if line :
193+ self .buffer .append (line )
135194
136- # Read stderr
137- for line in iter (self .process .stderr .readline , "" ):
138- if line :
139- self .buffer .append (line )
195+ # Read stderr
196+ for line in iter (self .process .stderr .readline , "" ):
197+ if line :
198+ self .buffer .append (line )
140199
141200 except Exception as e :
142201 self .buffer .append (f"\n [Error reading process output: { str (e )} ]\n " )
143202
144203 self .reader_thread = threading .Thread (target = reader , daemon = True )
145204 self .reader_thread .start ()
146205
206+ def _start_input_writer (self ) -> None :
207+ """Start thread to write input to process stdin."""
208+
209+ def writer ():
210+ try :
211+ while not self ._stop_event .is_set () and self .is_alive ():
212+ if self .input_buffer .has_input ():
213+ text = self .input_buffer .pop_all ()
214+ if text :
215+ if self .master_fd is not None :
216+ os .write (self .master_fd , text .encode ("latin-1" ))
217+ else :
218+ try :
219+ # Try to write to the binary buffer for lossless propagation
220+ self .process .stdin .buffer .write (text .encode ("latin-1" ))
221+ self .process .stdin .buffer .flush ()
222+ except (AttributeError , ValueError ):
223+ # Fallback to text mode if buffer is not available
224+ self .process .stdin .write (text )
225+ self .process .stdin .flush ()
226+ import time
227+
228+ time .sleep (0.1 )
229+ except (BrokenPipeError , OSError ):
230+ pass
231+ except Exception as e :
232+ self .buffer .append (f"\n [Error writing to process input: { str (e )} ]\n " )
233+ finally :
234+ try :
235+ if self .master_fd is not None :
236+ os .close (self .master_fd )
237+ else :
238+ self .process .stdin .close ()
239+ except Exception :
240+ pass
241+
242+ self .writer_thread = threading .Thread (target = writer , daemon = True )
243+ self .writer_thread .start ()
244+
147245 def get_output (self , clear : bool = False ) -> str :
148246 """
149247 Get current output buffer.
@@ -171,6 +269,11 @@ def is_alive(self) -> bool:
171269 """Check if process is running."""
172270 return self .process .poll () is None
173271
272+ def send_input (self , text : str ) -> None :
273+ """Queue input to be sent to the process."""
274+ if self .input_buffer :
275+ self .input_buffer .append (text )
276+
174277 def stop (self , timeout : float = 5.0 ) -> Tuple [bool , str , Optional [int ]]:
175278 """
176279 Stop the process gracefully.
@@ -184,6 +287,9 @@ def stop(self, timeout: float = 5.0) -> Tuple[bool, str, Optional[int]]:
184287 import time
185288
186289 try :
290+ # Signal threads to stop
291+ self ._stop_event .set ()
292+
187293 # Try SIGTERM first
188294 self .process .terminate ()
189295 self .process .wait (timeout = timeout )
@@ -192,7 +298,6 @@ def stop(self, timeout: float = 5.0) -> Tuple[bool, str, Optional[int]]:
192298 output = self .get_output (clear = True )
193299 exit_code = self .process .returncode
194300 self .end_time = time .time ()
195-
196301 return True , output , exit_code
197302
198303 except subprocess .TimeoutExpired :
@@ -262,6 +367,8 @@ def start_background_command(
262367 existing_process : Optional [subprocess .Popen ] = None ,
263368 existing_buffer : Optional [CircularBuffer ] = None ,
264369 persist : bool = False ,
370+ existing_input_buffer : Optional [InputBuffer ] = None ,
371+ use_pty : bool = False ,
265372 ) -> str :
266373 """
267374 Start a command in background.
@@ -283,23 +390,52 @@ def start_background_command(
283390 buffer = existing_buffer or CircularBuffer (max_size = max_buffer_size )
284391
285392 # Use existing process or start new one
286- if existing_process :
393+ master_fd = None
394+ if use_pty and HAS_PTY and platform .system () != "Windows" :
395+ master_fd , slave_fd = pty .openpty ()
396+
397+ # Disable echo on the slave PTY
398+ attr = termios .tcgetattr (slave_fd )
399+ attr [3 ] = attr [3 ] & ~ termios .ECHO
400+ termios .tcsetattr (slave_fd , termios .TCSANOW , attr )
401+
402+ process = subprocess .Popen (
403+ command ,
404+ shell = True ,
405+ stdout = slave_fd ,
406+ stderr = slave_fd ,
407+ stdin = slave_fd ,
408+ cwd = cwd ,
409+ close_fds = True ,
410+ text = True ,
411+ bufsize = 1 ,
412+ universal_newlines = True ,
413+ )
414+ os .close (slave_fd )
415+ elif existing_process :
287416 process = existing_process
288417 else :
289418 process = subprocess .Popen (
290419 command ,
291420 shell = True ,
292421 stdout = subprocess .PIPE ,
293422 stderr = subprocess .PIPE ,
294- stdin = subprocess .DEVNULL , # No stdin for background commands
423+ stdin = subprocess .PIPE ,
295424 cwd = cwd ,
296- text = True , # Use text mode for easier handling
297- bufsize = 1 , # Line buffered
425+ text = True ,
426+ bufsize = 1 ,
298427 universal_newlines = True ,
299428 )
300429
301430 # Create background process wrapper
302- bg_process = BackgroundProcess (command , process , buffer , persist = persist )
431+ bg_process = BackgroundProcess (
432+ command ,
433+ process ,
434+ buffer ,
435+ persist = persist ,
436+ input_buffer = existing_input_buffer ,
437+ master_fd = master_fd ,
438+ )
303439
304440 # Generate unique key and store
305441 command_key = cls ._generate_command_key (command )
@@ -367,6 +503,30 @@ def get_new_command_output(cls, command_key: str) -> str:
367503 return f"[Error] No background command found with key: { command_key } "
368504 return bg_process .get_new_output ()
369505
506+ @classmethod
507+ def send_command_input (cls , command_key : str , text : str ) -> bool :
508+ """
509+ Send input to a background command.
510+
511+ Args:
512+ command_key: Command key returned by start_background_command
513+ text: Text to send to the command's stdin
514+
515+ Returns:
516+ True if input was queued, False if command not found
517+ """
518+ with cls ._lock :
519+ bg_process = cls ._background_commands .get (command_key )
520+ if not bg_process :
521+ return False
522+ # Decode escape sequences (like \x1b) if present in the string
523+ try :
524+ text = codecs .decode (text , "unicode_escape" )
525+ except Exception :
526+ pass
527+ bg_process .send_input (text )
528+ return True
529+
370530 @classmethod
371531 def get_all_command_outputs (cls , clear : bool = False ) -> Dict [str , str ]:
372532 """
0 commit comments