Coverage for students/models.py: 77%

189 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 14:02 +0000

1""" 

2This module defines the User class which handles user authentication and session management. 

3""" 

4 

5from html import escape 

6import tempfile 

7import uuid 

8from flask import jsonify, send_file, session 

9import pandas as pd 

10from core import email_handler, handlers 

11from opportunities.models import Opportunity 

12 

13 

14class Student: 

15 """Student class.""" 

16 

17 def add_student(self, student, overwrite=False): 

18 """Adding new student.""" 

19 # student = { 

20 # "_id": uuid.uuid1().hex, 

21 # "first_name": request.form.get("first_name"), 

22 # "last_name": request.form.get("last_name"), 

23 # "email": request.form.get("email"), 

24 # "student_id": request.form.get("student_id"), 

25 # } 

26 # overwrite = bool(request.form.get("overwrite")) 

27 from app import DATABASE_MANAGER 

28 

29 if not overwrite and DATABASE_MANAGER.get_one_by_field( 

30 "students", "student_id", student["student_id"] 

31 ): 

32 return jsonify({"error": "Student already in database"}), 400 

33 

34 if overwrite: 

35 DATABASE_MANAGER.delete_one_by_field( 

36 "students", "student_id", student["student_id"] 

37 ) 

38 

39 DATABASE_MANAGER.insert("students", student) 

40 

41 if student: 

42 return jsonify({"message": "Student added"}), 200 

43 

44 return jsonify({"error": "Student not added"}), 400 

45 

46 def get_student_by_id(self, student_id): 

47 """Getting student.""" 

48 from app import DATABASE_MANAGER 

49 

50 student = DATABASE_MANAGER.get_one_by_field( 

51 "students", "student_id", str(student_id) 

52 ) 

53 

54 if student: 

55 return student 

56 

57 return None 

58 

59 def get_student_by_uuid(self, _id: str): 

60 """Getting student.""" 

61 from app import DATABASE_MANAGER 

62 

63 student = DATABASE_MANAGER.get_one_by_id("students", _id) 

64 

65 if student: 

66 return student 

67 

68 return None 

69 

70 def get_students(self): 

71 """Getting all students.""" 

72 from app import DATABASE_MANAGER 

73 

74 students = DATABASE_MANAGER.get_all("students") 

75 

76 if students: 

77 return students 

78 

79 return [] 

80 

81 def get_students_map(self): 

82 """Getting all students.""" 

83 from app import DATABASE_MANAGER 

84 

85 students = DATABASE_MANAGER.get_all("students") 

86 

87 if students: 

88 return {student["_id"]: student for student in students} 

89 

90 return {} 

91 

92 def update_student_by_id(self, student_id, student_data): 

93 """Update student in the database by student_id.""" 

94 # Attempt to update the student directly with the provided data 

95 from app import DATABASE_MANAGER 

96 

97 result = DATABASE_MANAGER.update_by_field( 

98 "students", "student_id", str(student_id), student_data 

99 ) 

100 

101 # Return True if the update was successful (i.e., a document was matched and modified) 

102 if result.matched_count > 0: 

103 return jsonify({"message": "Student updated"}), 200 

104 

105 return jsonify({"error": "Student not found"}), 404 

106 

107 def update_student_by_uuid(self, uuid, student_data): 

108 """Update student in the database by student_id.""" 

109 # Attempt to update the student directly with the provided data 

110 from app import DATABASE_MANAGER 

111 

112 result = DATABASE_MANAGER.update_one_by_id("students", uuid, student_data) 

113 

114 # Return True if the update was successful (i.e., a document was matched and modified) 

115 if result.matched_count > 0: 

116 return jsonify({"message": "Student updated"}), 200 

117 return jsonify({"error": "Student not found"}), 404 

118 

119 def delete_student_by_id(self, student_id): 

120 """Deleting student.""" 

121 from app import DATABASE_MANAGER 

122 

123 student = DATABASE_MANAGER.get_one_by_field( 

124 "students", "student_id", str(student_id) 

125 ) 

126 

127 if not student: 

128 return jsonify({"error": "Student not found"}), 404 

129 

130 DATABASE_MANAGER.delete_by_id("students", student["_id"]) 

131 

132 opportunities = Opportunity().get_opportunities() 

133 

134 for opportunity in opportunities: 

135 if ( 

136 "preferences" in opportunity 

137 and student["_id"] in opportunity["preferences"] 

138 ): 

139 opportunity["preferences"].remove(student["_id"]) 

140 DATABASE_MANAGER.update_one_by_id( 

141 "opportunities", opportunity["_id"], opportunity 

142 ) 

143 

144 return jsonify({"message": "Student deleted"}), 200 

145 

146 def delete_students(self): 

147 """Deleting all students.""" 

148 from app import DATABASE_MANAGER 

149 

150 DATABASE_MANAGER.delete_all("students") 

151 return jsonify({"message": "All students deleted"}), 200 

152 

153 def get_student_by_email(self, email): 

154 """Getting student.""" 

155 from app import DATABASE_MANAGER 

156 

157 student = DATABASE_MANAGER.get_one_by_field("students", "email", email) 

158 

159 if student: 

160 return jsonify(student), 200 

161 

162 return jsonify({"error": "Student not found"}), 404 

163 

164 def import_from_xlsx(self, base_email, file): 

165 """Importing students from Excel file.""" 

166 from app import DATABASE_MANAGER 

167 

168 current_ids = set() 

169 current_emails = set() 

170 

171 students = DATABASE_MANAGER.get_all("students") 

172 

173 for student in students: 

174 current_ids.add(student["student_id"]) 

175 current_emails.add(student["email"]) 

176 

177 try: 

178 df = handlers.excel_verifier_and_reader( 

179 file, {"First Name", "Last Name", "Email (Uni)", "Student Number"} 

180 ) 

181 students = df.to_dict(orient="records") 

182 data = [] 

183 for student in students: 

184 temp_student = {} 

185 temp_student["_id"] = uuid.uuid4().hex 

186 temp_student["first_name"] = student["First Name"] 

187 temp_student["last_name"] = student["Last Name"] 

188 temp_student["email"] = student["Email (Uni)"] 

189 if temp_student["email"].split("@")[1] != base_email: 

190 error_msg = { 

191 "error": ( 

192 f"Invalid student {temp_student['first_name']}, " 

193 f"{temp_student['last_name']}" 

194 ) 

195 } 

196 return jsonify(error_msg), 400 

197 

198 temp_student["student_id"] = str(student["Student Number"]) 

199 if len(str(temp_student["student_id"])) != 8: 

200 error_msg = { 

201 "error": ( 

202 f"Invalid student {temp_student['first_name']}, " 

203 f"{temp_student['last_name']}" 

204 ) 

205 } 

206 return jsonify(error_msg), 400 

207 

208 temp_student["first_name"] = escape(temp_student["first_name"]) 

209 temp_student["last_name"] = escape(temp_student["last_name"]) 

210 temp_student["email"] = escape(temp_student["email"]) 

211 temp_student["student_id"] = escape(temp_student["student_id"]) 

212 

213 if temp_student["student_id"] in current_ids: 

214 error_msg = { 

215 "error": ( 

216 f"Student {temp_student['first_name']} " 

217 f"{temp_student['last_name']} student id already exists" 

218 ) 

219 } 

220 return jsonify(error_msg), 400 

221 

222 if temp_student["email"] in current_emails: 

223 error_msg = { 

224 "error": ( 

225 f"Student {temp_student['first_name']} " 

226 f"{temp_student['last_name']} email already exists" 

227 ) 

228 } 

229 return jsonify(error_msg), 400 

230 current_ids.add(temp_student["student_id"]) 

231 current_emails.add(temp_student["email"]) 

232 data.append(temp_student) 

233 for temp_student in data: 

234 DATABASE_MANAGER.insert("students", temp_student) 

235 

236 return jsonify({"message": f"{len(data)} students imported"}), 200 

237 except Exception as e: 

238 return jsonify({"error": f"Failed to read file: {str(e)}"}), 400 

239 

240 def student_login(self, student_id): 

241 """Handle student login.""" 

242 from app import DATABASE_MANAGER 

243 

244 # Find the student by id which is their password 

245 student = DATABASE_MANAGER.get_one_by_field( 

246 "students", "student_id", student_id 

247 ) 

248 

249 if student: 

250 email_handler.send_otp(student["email"]) 

251 session["student"] = student 

252 return jsonify({"message": "OTP sent"}), 200 

253 

254 return jsonify({"error": "Student not found"}), 404 

255 

256 def rank_preferences(self, student_id, preferences): 

257 """Sets a students preferences.""" 

258 from app import DATABASE_MANAGER 

259 

260 student = DATABASE_MANAGER.get_one_by_field( 

261 "students", "student_id", str(student_id) 

262 ) 

263 

264 if not student: 

265 return jsonify({"error": "Student not found"}), 404 

266 

267 DATABASE_MANAGER.update_one_by_field( 

268 "students", "student_id", str(student_id), {"preferences": preferences} 

269 ) 

270 

271 return jsonify({"message": "Preferences updated"}), 200 

272 

273 def get_opportunities_by_student(self, student_id): 

274 """Get opportunities that a student could do""" 

275 find_student = self.get_student_by_id(student_id) 

276 

277 if not find_student: 

278 return jsonify({"error": "Student not found"}), 404 

279 

280 opportunities = Opportunity().get_opportunities() 

281 

282 student = find_student 

283 

284 student["modules"] = set(student["modules"]) 

285 

286 valid_opportunities = [] 

287 for opportunity in opportunities: 

288 if opportunity["modules_required"] == [""]: 

289 opportunity["modules_required"] = [] 

290 if opportunity["courses_required"] == [""]: 

291 opportunity["courses_required"] = [] 

292 modules_required = set(opportunity["modules_required"]) 

293 

294 if modules_required.issubset(student["modules"]): 

295 if ( 

296 student["course"] in opportunity.get("courses_required", "") 

297 or opportunity.get("courses_required") == [] 

298 ): 

299 if opportunity["duration"] in student["placement_duration"]: 

300 valid_opportunities.append(opportunity) 

301 

302 return valid_opportunities 

303 

304 def download_students(self): 

305 """Download all students as a XLSX file.""" 

306 from app import DATABASE_MANAGER 

307 

308 students = DATABASE_MANAGER.get_all("students") 

309 

310 clean_data = [] 

311 

312 skills_map = { 

313 skill["_id"]: skill["skill_name"] 

314 for skill in DATABASE_MANAGER.get_all("skills") 

315 } 

316 

317 for student in students: 

318 student_data = { 

319 "First Name": student["first_name"], 

320 "Last Name": student["last_name"], 

321 "Email (Uni)": student["email"], 

322 "Student Number": student["student_id"], 

323 } 

324 if "course" in student: 

325 student_data["Course"] = student["course"] 

326 else: 

327 student_data["Course"] = "" 

328 

329 if "modules" in student: 

330 student_data["Modules"] = ",".join(student["modules"]) 

331 else: 

332 student_data["Modules"] = "" 

333 

334 if "skills" in student: 

335 student_data["Skills"] = ",".join( 

336 [skills_map[skill] for skill in student["skills"]] 

337 ) 

338 else: 

339 student_data["Skills"] = "" 

340 

341 if "comments" in student: 

342 student_data["Comments"] = student["comments"] 

343 else: 

344 student_data["Comments"] = "" 

345 

346 if "placement_duration" in student: 

347 student_data["Placement Duration"] = ",".join( 

348 student["placement_duration"] 

349 ) 

350 else: 

351 student_data["Placement Duration"] = "" 

352 

353 clean_data.append(student_data) 

354 

355 df = pd.DataFrame(clean_data) 

356 

357 with tempfile.NamedTemporaryFile(suffix=".xlsx") as tmp_file: 

358 df.to_excel(tmp_file.name, index=False) 

359 tmp_file_path = tmp_file.name 

360 

361 return send_file( 

362 tmp_file_path, as_attachment=True, download_name="students.xlsx" 

363 ) 

364 

365 def delete_all_students(self): 

366 """Delete all students.""" 

367 from app import DATABASE_MANAGER 

368 

369 DATABASE_MANAGER.delete_all("students") 

370 

371 opportunities = DATABASE_MANAGER.get_all("opportunities") 

372 

373 for opportunity in opportunities: 

374 if "preferences" in opportunity: 

375 DATABASE_MANAGER.delete_field_by_id( 

376 "opportunities", opportunity["_id"], "preferences" 

377 ) 

378 

379 return jsonify({"message": "All students deleted"}), 200