-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
399 lines (329 loc) · 13.1 KB
/
Copy pathapp.py
File metadata and controls
399 lines (329 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
import streamlit as st
import os
import tempfile
import shutil
from pathlib import Path
from dotenv import load_dotenv
# Import custom modules
from src.github_loader import GitHubLoader
from src.code_parser import CodeParser
from src.chunking import CodeChunker
from src.embeddings import EmbeddingGenerator
from src.vector_store import VectorStore
from src.retriever import CodeRetriever
from src.generator import ResponseGenerator
from config.settings import settings
# Load environment variables
load_dotenv()
# Page configuration
st.set_page_config(
page_title="Code Documentation RAG",
page_icon="📚",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS
st.markdown("""
<style>
.main {
padding: 0rem 1rem;
}
.stAlert {
margin-top: 1rem;
}
pre {
background-color: #2b2b2b;
padding: 1rem;
border-radius: 5px;
overflow-x: auto;
}
.source-box {
background-color: #f0f2f6;
padding: 0.5rem;
border-radius: 5px;
margin: 0.5rem 0;
}
</style>
""", unsafe_allow_html=True)
# Initialize session state
if 'vector_store' not in st.session_state:
st.session_state.vector_store = None
if 'embeddings_generator' not in st.session_state:
st.session_state.embeddings_generator = None
if 'retriever' not in st.session_state:
st.session_state.retriever = None
if 'generator' not in st.session_state:
st.session_state.generator = None
if 'repo_loaded' not in st.session_state:
st.session_state.repo_loaded = False
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
def initialize_components():
"""Initialize RAG components"""
if not st.session_state.embeddings_generator:
st.session_state.embeddings_generator = EmbeddingGenerator(
model_type="openai",
model_name=settings.EMBEDDING_MODEL
)
if not st.session_state.vector_store:
st.session_state.vector_store = VectorStore(
collection_name=settings.COLLECTION_NAME,
persist_directory=settings.PERSIST_DIRECTORY
)
parser = CodeParser()
if not st.session_state.retriever:
st.session_state.retriever = CodeRetriever(
vector_store=st.session_state.vector_store,
embeddings_generator=st.session_state.embeddings_generator,
parser=parser
)
if not st.session_state.generator:
st.session_state.generator = ResponseGenerator(
model_name=settings.LLM_MODEL
)
return parser
def process_repository(repo_url: str, progress_callback=None):
"""Process a GitHub repository"""
temp_dir = None
try:
# Initialize components
parser = initialize_components()
# Initialize GitHub loader
loader = GitHubLoader(github_token=settings.GITHUB_TOKEN)
# Update progress
if progress_callback:
progress_callback(0.1, "Cloning repository...")
# Clone repository
temp_dir = loader.clone_repository(repo_url)
# Get repository info
repo_info = loader.get_repository_info(repo_url)
# Update progress
if progress_callback:
progress_callback(0.2, "Loading files...")
# Load files
all_extensions = list(settings.CODE_EXTENSIONS.keys()) + settings.DOC_EXTENSIONS
documents = loader.load_repository_files(temp_dir, all_extensions)
if not documents:
raise ValueError("No supported files found in the repository")
# Update progress
if progress_callback:
progress_callback(0.4, "Parsing and chunking code...")
# Initialize chunker
chunker = CodeChunker(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
# Process documents
all_chunks = []
for i, doc in enumerate(documents):
progress = 0.4 + (0.3 * (i / len(documents)))
if progress_callback:
progress_callback(progress, f"Processing file {i+1}/{len(documents)}...")
# Choose chunking strategy based on file type
if doc['metadata']['file_type'] in settings.CODE_EXTENSIONS:
chunks = chunker.chunk_code(doc['content'], doc['metadata'])
else:
chunks = chunker.chunk_documentation(doc['content'], doc['metadata'])
all_chunks.extend(chunks)
# Create semantic chunks for code files
if progress_callback:
progress_callback(0.7, "Creating semantic chunks...")
semantic_chunks = chunker.create_semantic_chunks(documents, parser)
all_chunks.extend(semantic_chunks)
# Update progress
if progress_callback:
progress_callback(0.8, "Generating embeddings and storing...")
# Add to vector store
st.session_state.vector_store.add_documents(
all_chunks,
st.session_state.embeddings_generator
)
# Update progress
if progress_callback:
progress_callback(1.0, "Repository processed successfully!")
return {
'success': True,
'repo_info': repo_info,
'total_files': len(documents),
'total_chunks': len(all_chunks)
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
finally:
# Cleanup temporary directory
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def main():
st.title("🔍 Code Documentation RAG System")
st.markdown("*Intelligent code understanding and documentation retrieval*")
# Sidebar
with st.sidebar:
st.header("⚙️ Configuration")
# API Key input
api_key = st.text_input(
"OpenAI API Key",
value=os.getenv("OPENAI_API_KEY", ""),
type="password",
help="Enter your OpenAI API key"
)
if api_key:
os.environ["OPENAI_API_KEY"] = api_key
settings.OPENAI_API_KEY = api_key
st.divider()
# Repository input
st.header("📦 Repository Setup")
repo_url = st.text_input(
"GitHub Repository URL",
placeholder="https://github.com/owner/repo",
help="Enter the GitHub repository URL to analyze"
)
github_token = st.text_input(
"GitHub Token (Optional)",
value=os.getenv("GITHUB_TOKEN", ""),
type="password",
help="For private repositories or to avoid rate limits"
)
if github_token:
settings.GITHUB_TOKEN = github_token
if st.button("🚀 Load Repository", type="primary", disabled=not repo_url):
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(progress, status):
progress_bar.progress(progress)
status_text.text(status)
result = process_repository(repo_url, update_progress)
if result['success']:
st.success(f"✅ Repository loaded successfully!")
st.info(f"📊 Processed {result['total_files']} files into {result['total_chunks']} chunks")
if result['repo_info']:
with st.expander("Repository Information"):
for key, value in result['repo_info'].items():
if value:
st.write(f"**{key.title()}:** {value}")
st.session_state.repo_loaded = True
else:
st.error(f"❌ Error: {result['error']}")
st.divider()
# Settings
with st.expander("🔧 Advanced Settings"):
response_type = st.selectbox(
"Response Type",
["explanation", "code_example", "api_reference", "implementation_guide"],
help="Choose the type of response generation"
)
num_results = st.slider(
"Number of Results",
min_value=1,
max_value=10,
value=5,
help="Number of relevant chunks to retrieve"
)
st.session_state.response_type = response_type
st.session_state.num_results = num_results
# Stats
if st.session_state.vector_store:
stats = st.session_state.vector_store.get_collection_stats()
if stats:
st.divider()
st.metric("Documents in Store", stats.get('total_documents', 0))
# Main content area
if not api_key:
st.warning("⚠️ Please enter your OpenAI API key in the sidebar to continue")
st.stop()
if not st.session_state.repo_loaded:
# Welcome message
st.info("👋 Welcome! Please load a GitHub repository from the sidebar to get started.")
# Feature highlights
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
### 🎯 Features
- Multi-language support
- Intelligent code parsing
- Semantic search
- Context-aware responses
""")
with col2:
st.markdown("""
### 🔧 Capabilities
- Code explanation
- API documentation
- Implementation guides
- Example generation
""")
with col3:
st.markdown("""
### 📚 Supported Languages
- Python
- JavaScript/TypeScript
- Java, C/C++
- Go, Rust, Ruby, PHP
""")
else:
# Chat interface
st.header("💬 Ask Questions About the Code")
# Query input
query = st.text_area(
"Enter your question:",
placeholder="e.g., How does the authentication system work? Show me examples of API endpoints.",
height=100
)
col1, col2 = st.columns([1, 5])
with col1:
search_button = st.button("🔍 Search", type="primary", disabled=not query)
with col2:
clear_button = st.button("🗑️ Clear History")
if clear_button:
st.session_state.chat_history = []
st.rerun()
if search_button and query:
with st.spinner("Searching and generating response..."):
# Retrieve relevant documents
retrieved_docs = st.session_state.retriever.retrieve(
query,
k=st.session_state.num_results
)
if retrieved_docs:
# Generate response
response = st.session_state.generator.generate_response(
query,
retrieved_docs,
response_type=st.session_state.response_type
)
# Add to chat history
st.session_state.chat_history.append({
'query': query,
'response': response
})
else:
st.warning("No relevant documents found. Try rephrasing your query.")
# Display chat history
if st.session_state.chat_history:
st.divider()
st.header("📝 Conversation History")
for i, item in enumerate(reversed(st.session_state.chat_history)):
with st.container():
# User query
st.markdown(f"**🧑 You:** {item['query']}")
# Bot response
st.markdown("**🤖 Assistant:**")
# Display formatted response
response_html = st.session_state.generator.format_response_with_citations(item['response'])
st.markdown(response_html, unsafe_allow_html=True)
# Show sources
if item['response'].get('sources'):
with st.expander(f"📎 Sources ({len(item['response']['sources'])} files)"):
for source in item['response']['sources']:
st.markdown(f"""
<div class="source-box">
<b>File:</b> {source['file']}<br>
<b>Language:</b> {source['language']}<br>
<b>Type:</b> {source['type']}
</div>
""", unsafe_allow_html=True)
st.divider()
if __name__ == "__main__":
main()