-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
291 lines (246 loc) · 11.1 KB
/
utils.py
File metadata and controls
291 lines (246 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import requests
import json
import base64
import logging
import os
import re
from cache_manager import load_cache, save_cache
from dotenv import load_dotenv
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
load_dotenv() # Load environment variables from .env file
# Cache for stock tickers
TICKER_CACHE_KEY = 'ticker'
ticker_cache = load_cache(TICKER_CACHE_KEY) or {}
def search_with_perplexity(query):
"""
Performs a search using Perplexity API
"""
url = "https://api.perplexity.ai/chat/completions"
headers = {
"Authorization": f"Bearer {os.getenv('PERPLEXITY_API_KEY')}",
"Content-Type": "application/json"
}
payload = {
"model": "sonar",
"messages": [
{
"role": "system",
"content": "You are a financial data assistant. Return only factual information about stock tickers and company listings. If uncertain, say so."
},
{
"role": "user",
"content": query
}
],
"temperature": 0.2,
"max_tokens": 150,
"search_recency_filter": "month"
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Perplexity API error: {str(e)}")
return None
def parse_ticker_with_openrouter(perplexity_response):
"""
Uses OpenRouter with Gemini to parse ticker from Perplexity response
"""
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}",
"Content-Type": "application/json",
"HTTP-Referer": "https://watermelon-api.com",
"X-Title": "Watermelon API Ticker Parser"
}
prompt = f"""Return just the ticker symbol. If more than one ticker is mentioned, return the most prominent one. If no valid ticker is found, return 'null'.
{perplexity_response}
Response:"""
payload = {
"model": "google/gemini-2.5-flash",
"messages": [
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1,
"max_tokens": 10
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
result = response.json()
if 'choices' in result and len(result['choices']) > 0:
ticker = result['choices'][0]['message']['content'].strip()
logger.info(f"OpenRouter parsed ticker: {ticker}")
return ticker if ticker.lower() != 'null' else None
return None
except Exception as e:
logger.error(f"OpenRouter API error: {str(e)}")
return None
def is_valid_ticker(ticker):
"""
Validates if a string looks like a valid stock ticker.
Valid formats:
- 1-5 letters (NYSE/NASDAQ)
- 4-5 letters ending in Y (ADR)
- Letters followed by .X where X is exchange code
- May have up to 2 digits but not at start
- OTC/ADR tickers can be 5-6 letters
"""
if not ticker or len(ticker) > 10: # Allow longer tickers for OTC/ADR with exchange codes
return False
# Basic patterns for different types of tickers
patterns = [
r'^[A-Z]{1,5}$', # Regular NYSE/NASDAQ
r'^[A-Z]{4,5}Y$', # ADRs ending in Y
r'^[A-Z]{2,6}F$', # Foreign ordinary shares
r'^[A-Z]{4,6}(\.[A-Z]{1,2})?$', # OTC/ADR tickers (like ABBNY)
r'^[A-Z]{1,4}\d{1,2}(\.[A-Z]{1,2})?$' # Tickers with numbers
]
return any(bool(re.match(pattern, ticker)) for pattern in patterns)
def get_stock_ticker(company_name):
"""
Uses Perplexity search to find the stock ticker for a publicly traded company.
Results are cached persistently to avoid redundant API calls.
Returns None if the company is not publicly traded or if the ticker cannot be found.
"""
global ticker_cache
# Check cache first
if company_name in ticker_cache:
return ticker_cache[company_name]
try:
# Clean company name
search_name = company_name.replace(".", "").replace(",", "")
# Search focusing on major exchanges and ADRs
search_query = f"What is the stock ticker symbol for {search_name} in the US (NYSE, NASDAQ, OTC markets, or as an ADR)? Only return the ticker symbol, no explanation. Return null if not found."
logger.info(f"Searching for ticker with query: {search_query}")
response = search_with_perplexity(search_query)
if not response or 'choices' not in response:
return None
content = response['choices'][0]['message']['content'].strip()
logger.info(f"Perplexity returned '{content}' for {company_name}")
# Handle null-like responses
if content.lower().strip('.') in ['null', 'none', '-', 'n/a']:
ticker_cache[company_name] = None
save_cache(ticker_cache, TICKER_CACHE_KEY)
return None
# Use OpenRouter with Gemini to parse the ticker from Perplexity response
ticker = parse_ticker_with_openrouter(content)
# If OpenRouter fails, fallback to first word
if not ticker:
ticker = content.split()[0] if content.split() else None
# Validate the ticker format
if not is_valid_ticker(ticker):
logger.warning(f"Invalid ticker format received for {company_name}: {ticker}")
ticker_cache[company_name] = None
save_cache(ticker_cache, TICKER_CACHE_KEY)
return None
# Cache the result
ticker_cache[company_name] = ticker
save_cache(ticker_cache, TICKER_CACHE_KEY)
return ticker
except Exception as e:
logger.error(f"Error getting stock ticker for {company_name}: {str(e)}")
return None
def flatten_and_standardize(data):
"""
Flattens and standardizes the input JSON data into a list of JSON entries
representing companies and their associated data.
"""
companies = []
# Process Sheet1 data
for company_data in data['Sheet1']:
# Get all sources in order
sources = []
source_fields = ['Source', 'Second source', 'Information source 3', 'Information source 4']
for field in source_fields:
value = company_data['data'].get(field, '')
if value:
sources.append(value)
company = {
'companyName': company_data['data']['Company Name'],
'companyId': company_data['data']['Company name'],
'sector': company_data['data']['Sector'],
'complicityDetails': company_data['data']['Complicity details'],
'recordLastUpdated': company_data['data']['Record last updated']['repr'],
'sources': sources, # Add sources array
'stockTicker': get_stock_ticker(company_data['data']['Company Name']) # Add stock ticker
}
#Add complicity categories. Note that some categories may be absent.
for category in ["Military", "Settlement production", "Population control", "Economic exploitation", "Cultural"]:
if category in company_data['data']:
company[category.lower().replace(' ', '_')] = company_data['data'][category]
companies.append(company)
# Process Campaigns data, adding campaign-related information to companies
for campaign_data in data['Campaigns']:
# Check if 'Companies' field exists in campaign data
companies_field = campaign_data['data'].get('Companies', '')
if not companies_field:
logger.warning(f"Campaign {campaign_data.get('id', 'unknown')} has no 'Companies' field, skipping")
continue
print(companies_field)
company_ids = [x.strip() for x in companies_field.split(',')] #Handle potential multiple companies
for company_id in company_ids:
for i, company in enumerate(companies):
if company['companyId'] == company_id:
campaign_info = campaign_data['data']
company['campaignName'] = campaign_info.get('Campaign Name', '')
company['campaignId'] = campaign_data['id']
company['campaignDescription'] = campaign_info.get('Description', '')
company['campaignLocation'] = campaign_info.get('Location', '')
company['campaignOutcomes'] = campaign_info.get('Outcomes', '')
company['campaignAimsAchieved'] = campaign_info.get('Aims achieved', '')
company['campaignGroups'] = campaign_info.get('Campaign Groups', '')
company['campaignMethods'] = campaign_info.get('9f119b48c6e3251dc6be2ae8a8b969c4', '')
campaign_links = campaign_info.get('Campaign link', {}).get('$arrayItems', [])
company['campaignLinks'] = [item for item in campaign_links if item]
company['targetAim'] = campaign_info.get('Target aim: Divestment,Contract,Sponsor,Supply,Operations,Position,Other', '')
break
return companies
def fetch_raw_data():
"""Fetches raw data from the API endpoint"""
url = 'https://watermelonindex.glide.page/api/container/playerFunctionCritical/getAppSnapshot?reqid=cRtoCkoYLvPuumH1tQ03'
headers = {
'accept': '*/*',
'content-type': 'application/json',
'origin': 'https://watermelonindex.glide.page',
'referer': 'https://watermelonindex.glide.page/dl/companies',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
payload = {"appID": "57dVVMXNFIuBOYtiLIaP"}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data_snapshot_url = response.json().get('dataSnapshot')
if not data_snapshot_url:
raise ValueError("dataSnapshot URL not found in response")
snapshot_response = requests.get(data_snapshot_url)
snapshot_response.raise_for_status()
decoded_bytes = base64.b64decode(snapshot_response.text)
return json.loads(decoded_bytes)
def fetch_and_decode_data():
"""Main function to fetch and process data with caching"""
cached_data = load_cache()
if cached_data and 'processed_data' in cached_data:
logger.info("Returning processed data from cache")
return {'data': cached_data['raw_data'], 'processed_data': cached_data['processed_data']}
try:
decoded_json = fetch_raw_data()
processed_data = flatten_and_standardize(decoded_json['data'])
cache_data = {
'raw_data': decoded_json,
'processed_data': processed_data
}
save_cache(cache_data)
return cache_data
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
if hasattr(e, 'response'):
logger.error(f"Error response: {e.response.text}")
raise
except Exception as e:
logger.error(f"Error: {str(e)}")
raise