-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
277 lines (221 loc) · 8.84 KB
/
server.py
File metadata and controls
277 lines (221 loc) · 8.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import random
import threading
import duckdb
import nltk
from pathlib import Path
from duckdb.typing import VARCHAR, INTEGER
from flask import Flask, render_template, request, g, send_file, Response
from werkzeug.exceptions import RequestEntityTooLarge
# Initialize Flask Application
app = Flask(__name__)
# Set max file upload size
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
DB_CONN = None
# Lazy DB Connection
def get_conn():
global DB_CONN
db = DB_CONN
# If connection is not established currently, create a connection!
if (db is None):
print("Creating DB CONNECTION")
# Check if file exists, in order to add DB Tables
exists = Path("./locker.duckdb").exists()
# Create connection
DB_CONN = duckdb.connect("locker.duckdb")
db = DB_CONN
# Add string distance, for searching
# try:
# db.remove_function("dist")
# except:
# pass
db.create_function("dist", dist)
# Run DB Schema from SQueaL file
if not exists:
with open("./db.sql", "r") as f:
db.sql(f.read())
return db
return db
# close connection whenever application closes
# @app.teardown_appcontext
# def close_connection(exception):
# print("CLOSING CONNECTION")
# db = DB_CONN
# if not (db is None): # only if connection exists
# db.close()
# Basic error screen for the runtime errors
@app.errorhandler(RuntimeError)
def handle_bad_request(e):
return f"""
<h1>An error occured with the server</h1>
<code>{e}</code>
<br />
<a href="/">Go back to homepage</a>
""", 500
# Max file size error
@app.errorhandler(RequestEntityTooLarge)
def handle_too_big(e):
return f"""
<h1>The file uploaded was too big! Max 16 MB</h1>
<br />
<a href="/">Go back to homepage</a>
""", 413
# render borrowing page
@app.route("/")
@app.route("/index")
def front_page():
return render_template("index.html")
# render lending page
@app.route("/lend")
def submit_page():
return render_template("lend.html")
# establishes admin console accessible through `$ deno run --allow-net sql_client.js`
@app.route("/admin-execute-sql", methods=["POST"])
def get_locations():
cmd = request.data.decode()
print("EXECUTING:", cmd)
res = get_conn().sql(cmd)
return str(res)
# Generate combination code for unlocking
def code_gen(num_digits = 6):
code_list = 0
available_digits = [1,2,3,4,5,6,7,8,9,0]
for i in range(num_digits):
buffer = available_digits.pop(random.randrange(0,9))
code_list += buffer
if (i != num_digits-1):
code_list *= 10
available_digits.append(buffer)
return code_list
@app.route("/69")
def meow():
return int.to_bytes(code_gen(6), 4, "little", signed=False)
# Checks if uploaded image IS an image
ALLOWED_EXTENSIONS=["png", "jpg", "jpeg", "jxl", "gif", "webp", "avif", "heif"]
def get_fext(filename):
return filename.rsplit('.', 1)[1].lower()
def allowed_file(filename):
return '.' in filename and get_fext(filename) in ALLOWED_EXTENSIONS
# Form action for submitting a lend
@app.post("/lend-item")
def lend_item():
loc = request.values.get("location", type=int) or 1
item_name = request.values.get("item-name", type=str)
item_description = request.values.get("item-description", type=str)
if not item_name or not item_description:
raise RuntimeError("Item name and description required!")
image = request.files.get("image")
#if image and allowed_file(image.filename):
print(f"Creating item {item_name} for location {loc}")
cubby_id = get_conn().execute("SELECT cubby_id FROM Cubby WHERE location_id == $1 AND item_id is NULL", [loc]).fetchone()
if cubby_id is None:
raise RuntimeError("Could not find a free cubby in this location");
cubby_id = cubby_id[0]
print(f"using cubby id: {cubby_id}")
generated_item_id = get_conn().execute("INSERT INTO Item VALUES(DEFAULT, $1, $2, $3, $4) RETURNING (item_id)", [item_name, item_description, image.read(), get_fext(image.filename)]).fetchone()[0]
print("generated item id:", generated_item_id)
res = get_conn().execute("UPDATE cubby SET item_id = $1, code = $2 WHERE cubby_id == $3 RETURNING code, cubby_id", [generated_item_id, code_gen(), cubby_id]).fetchone()
print("RETURNING:", res)
return render_template("lend_response.html", cubby_id=res[1], cubby_code=res[0])
# Item image endpoint
@app.get("/item-image/<id>")
def item_img(id: int):
# Get image from database
res = get_conn().execute("SELECT image, image_ext FROM Item WHERE item_id == $1", [id]).fetchone();
# If exists, send it back to the client
if data := res:
img, ext = data
return Response(img, mimetype=f"image/{ext}")
# Else send an error
return f"""
<h1>Image not found</h1>
<br />
<a href="/">Go back to homepage</a>
""", 404
# borrow item
@app.get("/borrow/<id>")
def borrow_item(id: int):
# Get image from database
res = get_conn().execute("SELECT cubby_id, code, item_id FROM Cubby WHERE item_id == $1", [id]).fetchone();
# If exists, send it back to the client
if data := res:
cubby_id, code, item_id = data
get_conn().execute("UPDATE cubby SET item_id = NULL WHERE cubby_id == $1", [cubby_id]).fetchone()
get_conn().execute("DELETE FROM item WHERE item_id == $1", [id]).fetchone()
# Return successful borrow template with cubby details
return render_template("lend_response.html", cubby_id=cubby_id, cubby_code=code)
# Return 404 error if item not found
return f"""
<h1>Image not found</h1>
<br />
<a href="/">Go back to homepage</a>
""", 404
# Endpoint to verify if entered code matches cubby code
# Returns HTTP 200 if code matches, 401 if invalid
@app.get("/check-code")
def check_code():
# Extract the entered security code from request parameters
# Converts to integer type automatically
code = request.values.get("code", type=int)
# Location ID param currently disabled but kept for future use
#location_id = request.values.get("location_id", type=int)
# Get the cubby ID to check against from request parameters
# Converts to integer type automatically
cubby_id = request.values.get("cubby_id", type=int)
# Query database to get the actual stored security code for this cubby
# Using parameterized query for security
real_code = get_conn().execute("SELECT code FROM Cubby WHERE cubby_id == $1", [cubby_id]).fetchone()
# If no cubby found with this ID, raise an error
if real_code is None:
raise RuntimeError("Could not find cubby");
# Compare entered code with stored code
# real_code[0] accesses first column of result (the code)
if real_code[0] == code:
# Return "True" with HTTP 200 OK status if codes match
return "True", 200
# Return "False" with HTTP 401 Unauthorized status if codes don't match
return "False", 401
# Global flag used to control recursive string distance calculation
# Can be set to True to terminate deep recursion
end_flag = False
# Helper function for Levenshtein distance calculation
def tail(s1, s2):
# Find common prefix between two strings
index = 0
max_index = min(len(s1), len(s2))
while(index < max_index and s1[index] == s2[index]):
index += 1
return s1[index:]
def lev_dist(s, target, stop_dist) -> int:
if stop_dist == 0:
return int("inf")
if s == "" or target == "":
end_flag = True
return 0
tail_s = tail(s, target)
tail_t = tail(s, target)
if tail_s == tail_t:
return lev_dist(tail_s, tail_t, stop_dist-1)
return 1 + min(lev_dist(s, tail_t, stop_dist-1),
lev_dist(tail_s, target, stop_dist-1),
lev_dist(tail_s, tail_t, stop_dist-1))
def dist(s: str, target: str) -> int:
return nltk.edit_distance(s, target)
# return lev_dist(s, target, max(len(s), len(target)) + 1)
@app.post("/search")
def search_handler():
query_type = request.values.get("type", type=str) or "Location"
query = request.values.get("query", type=str) or ""
count = request.values.get("count", type=int) or 5
match query_type:
case "Location":
return search_location(query, count);
case "Item":
return search_item(query, count);
def search_location(query, count):
res = get_conn().execute('SELECT name, address FROM Location ORDER BY dist(name, $1) LIMIT $2', [str(query), count]).fetchall()
return str(res)
def search_item(query, count):
res = get_conn().execute('SELECT item_id, item_name FROM Item ORDER BY dist(item_name, $1) LIMIT $2', [str(query), count]).fetchall()
return render_template("search_results.html", results=res)
if __name__ == "__main__":
app.run(debug=True, threaded=True) # Ensure threading is enabled for testing