Coverage for opportunities/models.py: 94%

229 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 14:02 +0000

1""" 

2Opportunity model. 

3""" 

4 

5from html import escape 

6import tempfile 

7import uuid 

8from flask import jsonify, send_file, session 

9import pandas as pd 

10from core import handlers 

11from employers.models import Employers 

12 

13 

14class Opportunity: 

15 """Opportunity class.""" 

16 

17 def add_update_opportunity( 

18 self, 

19 opportunity, 

20 is_admin=False, 

21 ): 

22 """Adding new opportunity.""" 

23 from app import DATABASE_MANAGER 

24 

25 find_opportunity = DATABASE_MANAGER.get_one_by_id( 

26 "opportunities", opportunity["_id"] 

27 ) 

28 if find_opportunity and not is_admin: 

29 if find_opportunity["employer_id"] != session["employer"]["_id"]: 

30 return jsonify({"error": "Unauthorized Access."}), 401 

31 DATABASE_MANAGER.delete_by_id("opportunities", opportunity["_id"]) 

32 

33 DATABASE_MANAGER.insert("opportunities", opportunity) 

34 

35 if opportunity: 

36 return jsonify(opportunity), 200 

37 

38 return jsonify({"error": "Opportunity not added"}), 400 

39 

40 def get_opportunities_for_search(self, _id): 

41 """Get opportunities for search.""" 

42 from app import DATABASE_MANAGER 

43 

44 opportunities = DATABASE_MANAGER.get_all("opportunities") 

45 employers_map = { 

46 employer["_id"]: employer["company_name"] 

47 for employer in Employers().get_employers() 

48 } 

49 for opportunity in opportunities: 

50 if "preferences" in opportunity: 

51 opportunity["ranked"] = True 

52 opportunity["company_name"] = employers_map.get( 

53 opportunity["employer_id"], "Unknown Company" 

54 ) 

55 

56 if not _id: 

57 return opportunities 

58 

59 filtered_opportunities = [ 

60 opportunity 

61 for opportunity in opportunities 

62 if opportunity["employer_id"] == _id 

63 ] 

64 return filtered_opportunities 

65 

66 def get_opportunities_by_title(self, title): 

67 """Fetch opportunities by title.""" 

68 from app import DATABASE_MANAGER 

69 

70 try: 

71 if not title: 

72 print("[DEBUG] No title provided.") 

73 return [] 

74 

75 query = {"title": {"$regex": title, "$options": "i"}} 

76 print(f"[DEBUG] Query for title: {query}") 

77 

78 opportunities = DATABASE_MANAGER.get_all_by_field( 

79 "opportunities", "title", {"$regex": title, "$options": "i"} 

80 ) 

81 print(f"[DEBUG] Opportunities found: {len(opportunities)}") 

82 return opportunities 

83 except Exception as e: 

84 print(f"[ERROR] Failed to fetch opportunities by title: {e}") 

85 return [] 

86 

87 def get_opportunities_by_company(self, company_name): 

88 """Fetch opportunities by company.""" 

89 from app import DATABASE_MANAGER 

90 

91 try: 

92 if not company_name: 

93 print("[DEBUG] No company name provided.") 

94 return [] 

95 

96 # Find the employer by exact company name 

97 company = DATABASE_MANAGER.get_one_by_field( 

98 "employers", "company_name", company_name 

99 ) 

100 

101 if not company: 

102 print(f"[DEBUG] No company found for name: {company_name}") 

103 return [] 

104 

105 # Use the employer's _id to search for opportunities 

106 employer_id = company["_id"] 

107 print(f"[DEBUG] Employer ID found: {employer_id}") 

108 

109 # Query the opportunities collection with employer_id 

110 

111 opportunities = DATABASE_MANAGER.get_all_by_field( 

112 "opportunities", "employer_id", employer_id 

113 ) 

114 print(f"[DEBUG] Opportunities found: {len(opportunities)}") 

115 

116 return opportunities 

117 except Exception as e: 

118 print(f"[ERROR] Failed to fetch opportunities by company: {e}") 

119 return [] 

120 

121 def get_opportunity_by_company_id(self, company_id): 

122 """Get opportunity by company ID.""" 

123 from app import DATABASE_MANAGER 

124 

125 opportunities = DATABASE_MANAGER.get_all_by_field( 

126 "opportunities", "employer_id", company_id 

127 ) 

128 return opportunities 

129 

130 def get_opportunity_by_id(self, _id): 

131 """Getting opportunity.""" 

132 from app import DATABASE_MANAGER 

133 

134 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", _id) 

135 

136 if opportunity: 

137 return opportunity 

138 

139 return None 

140 

141 def get_employer_by_id(self, _id): 

142 """Get employer_id by ID.""" 

143 opportunity = self.get_opportunity_by_id(_id) 

144 if not opportunity: 

145 return "" 

146 return opportunity["employer_id"] 

147 

148 def get_opportunities(self): 

149 """Getting all opportunities.""" 

150 from app import DATABASE_MANAGER 

151 

152 return DATABASE_MANAGER.get_all("opportunities") 

153 

154 def get_opportunities_by_duration(self, duration): 

155 """Getting all opportunities that match duration.""" 

156 from app import DATABASE_MANAGER 

157 

158 duration_list = [d.strip().replace('"', "") for d in duration[1:-1].split(",")] 

159 data = DATABASE_MANAGER.get_all_by_in_list( 

160 "opportunities", "duration", duration_list 

161 ) 

162 

163 return jsonify(data), 200 

164 

165 def delete_opportunity_by_id(self, opportunity_id): 

166 """Deleting opportunity.""" 

167 from app import DATABASE_MANAGER 

168 

169 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", opportunity_id) 

170 

171 if ( 

172 handlers.get_user_type() == "employer" 

173 and opportunity["employer_id"] != session["employer"]["_id"] 

174 ): 

175 return jsonify({"error": "Unauthorized Access."}), 401 

176 if not opportunity: 

177 return jsonify({"error": "Opportunity not found"}), 404 

178 

179 DATABASE_MANAGER.delete_by_id("opportunities", opportunity_id) 

180 

181 students = DATABASE_MANAGER.get_all("students") 

182 

183 for student in students: 

184 if "preferences" in student and opportunity_id in student["preferences"]: 

185 student["preferences"].remove(opportunity_id) 

186 DATABASE_MANAGER.update_one_by_id("students", student["_id"], student) 

187 

188 return jsonify({"message": "Opportunity deleted"}), 200 

189 

190 def get_valid_students(self, opportunity_id): 

191 """Get valid students for an opportunity.""" 

192 # pylint: disable=import-outside-toplevel 

193 from students.models import Student 

194 

195 students = Student().get_students() 

196 valid_students = [] 

197 for student in students: 

198 if "preferences" in student and opportunity_id in student["preferences"]: 

199 valid_students.append(student) 

200 return valid_students 

201 

202 def rank_preferences(self, opportunity_id, preferences): 

203 """Sets a opportunity preferences.""" 

204 from app import DATABASE_MANAGER 

205 

206 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", opportunity_id) 

207 

208 if not opportunity: 

209 return jsonify({"error": "Opportunity not found"}), 404 

210 

211 DATABASE_MANAGER.update_one_by_field( 

212 "opportunities", "_id", opportunity_id, {"preferences": preferences} 

213 ) 

214 return jsonify({"message": "Preferences updated"}), 200 

215 

216 def delete_all_opportunities(self, is_admin): 

217 """Deleting all opportunities.""" 

218 if is_admin: 

219 return self.delete_all_opportunities_admin() 

220 

221 return self.delete_all_opportunities_employer() 

222 

223 def delete_all_opportunities_employer(self): 

224 """Deleting all opportunities.""" 

225 from app import DATABASE_MANAGER 

226 

227 opportunities = DATABASE_MANAGER.get_all_by_field( 

228 "opportunities", "employer_id", session["employer"]["_id"] 

229 ) 

230 opportunity_ids = set(opportunity["_id"] for opportunity in opportunities) 

231 

232 for opportunity in opportunities: 

233 DATABASE_MANAGER.delete_by_id("opportunities", opportunity["_id"]) 

234 

235 students = DATABASE_MANAGER.get_all("students") 

236 

237 student_updates = [] 

238 for student in students: 

239 if "preferences" in student: 

240 new_preferences = [ 

241 preference 

242 for preference in student["preferences"] 

243 if preference not in opportunity_ids 

244 ] 

245 if new_preferences != student["preferences"]: 

246 student_updates.append((student["_id"], new_preferences)) 

247 

248 for student_id, new_preferences in student_updates: 

249 DATABASE_MANAGER.update_one_by_id( 

250 "students", student_id, {"preferences": new_preferences} 

251 ) 

252 

253 return jsonify({"message": "All opportunities deleted"}), 200 

254 

255 def delete_all_opportunities_admin(self): 

256 """Deleting all opportunities.""" 

257 from app import DATABASE_MANAGER 

258 

259 students = DATABASE_MANAGER.get_all("students") 

260 

261 for student in students: 

262 if "preferences" in student: 

263 DATABASE_MANAGER.delete_field_by_id( 

264 "students", student["_id"], "preferences" 

265 ) 

266 

267 DATABASE_MANAGER.delete_all("opportunities") 

268 

269 return jsonify({"message": "All opportunities deleted"}), 200 

270 

271 def download_opportunities(self, is_admin): 

272 """Download all opportunities.""" 

273 from app import DATABASE_MANAGER 

274 

275 opportunities_data = [] 

276 if is_admin: 

277 employers = DATABASE_MANAGER.get_all("employers") 

278 employers_dict = { 

279 employer["_id"]: employer["email"] for employer in employers 

280 } 

281 opportunities = DATABASE_MANAGER.get_all("opportunities") 

282 else: 

283 opportunities = DATABASE_MANAGER.get_all_by_field( 

284 "opportunities", "employer_id", session["employer"]["_id"] 

285 ) 

286 

287 for opportunity in opportunities: 

288 opportunity_data = { 

289 "Title": opportunity.pop("title"), 

290 "Description": opportunity.pop("description"), 

291 "URL": opportunity.pop("url"), 

292 "Modules_required": ",".join(opportunity.pop("modules_required")), 

293 "Courses_required": ",".join(opportunity.pop("courses_required")), 

294 "Spots_available": opportunity.pop("spots_available"), 

295 "Location": opportunity.pop("location"), 

296 "Duration": opportunity.pop("duration"), 

297 } 

298 if is_admin: 

299 employer_email = employers_dict.get(opportunity["employer_id"]) 

300 if employer_email: 

301 opportunity_data["Employer_email"] = employer_email 

302 del opportunity["_id"] 

303 del opportunity["employer_id"] 

304 opportunities_data.append(opportunity_data) 

305 

306 df = pd.DataFrame(opportunities_data) 

307 with tempfile.NamedTemporaryFile(suffix=".xlsx") as temp_file: 

308 df.to_excel(temp_file.name, index=False) 

309 temp_file_path = temp_file.name 

310 

311 return send_file( 

312 temp_file_path, as_attachment=True, download_name="opportunities.xlsx" 

313 ) 

314 

315 def upload_opportunities(self, file, is_admin): 

316 """Upload opportunities from an Excel file.""" 

317 from app import DATABASE_MANAGER 

318 

319 expected_columns = { 

320 "Title", 

321 "Description", 

322 "URL", 

323 "Modules_required", 

324 "Courses_required", 

325 "Spots_available", 

326 "Location", 

327 "Duration", 

328 } 

329 if is_admin: 

330 expected_columns.add("Employer_email") 

331 

332 try: 

333 df = handlers.excel_verifier_and_reader(file, expected_columns) 

334 opportunities = df.to_dict(orient="records") 

335 

336 email_to_employers_map = { 

337 employer["email"].lower(): employer["_id"] 

338 for employer in DATABASE_MANAGER.get_all("employers") 

339 } 

340 modules = set( 

341 module["module_id"] for module in DATABASE_MANAGER.get_all("modules") 

342 ) 

343 courses = set( 

344 course["course_id"] for course in DATABASE_MANAGER.get_all("courses") 

345 ) 

346 clean_data = [] 

347 for i, opportunity in enumerate(opportunities): 

348 temp = { 

349 "_id": uuid.uuid4().hex, 

350 "title": opportunity["Title"], 

351 "description": opportunity["Description"], 

352 "url": opportunity["URL"], 

353 "spots_available": int(opportunity["Spots_available"]), 

354 "location": opportunity["Location"], 

355 "duration": opportunity["Duration"], 

356 } 

357 

358 if ( 

359 not isinstance(temp["spots_available"], int) 

360 or temp["spots_available"] < 1 

361 ): 

362 raise ValueError( 

363 f"Invalid spots available value in opportunity: {temp['title']}, row {i+2}" 

364 ) 

365 if is_admin: 

366 employer_id = email_to_employers_map.get( 

367 opportunity["Employer_email"].lower().strip() 

368 ) 

369 if employer_id: 

370 temp["employer_id"] = employer_id 

371 else: 

372 raise ValueError( 

373 f"Employer email {opportunity['Employer_email']} not found " 

374 f"in database at row {i+2}" 

375 ) 

376 else: 

377 temp["employer_id"] = session["employer"]["_id"] 

378 

379 if not temp["duration"]: 

380 raise ValueError( 

381 f"Duration is required and cannot be empty in opportunity: " 

382 f"{temp['title']}, row {i+2}" 

383 ) 

384 if temp["duration"] not in set( 

385 [ 

386 "1_day", 

387 "1_week", 

388 "1_month", 

389 "3_months", 

390 "6_months", 

391 "12_months", 

392 ] 

393 ): 

394 raise ValueError( 

395 f"Invalid duration value in opportunity: {temp['title']}, row {i+2}" 

396 ) 

397 modules_required_string = opportunity["Modules_required"] 

398 if not isinstance(modules_required_string, str): 

399 temp["modules_required"] = [] 

400 else: 

401 temp["modules_required"] = [ 

402 module.strip() 

403 for module in modules_required_string.replace('"', "").split( 

404 "," 

405 ) 

406 ] 

407 

408 courses_required_string = opportunity["Courses_required"] 

409 if not isinstance(courses_required_string, str): 

410 temp["courses_required"] = [] 

411 else: 

412 temp["courses_required"] = [ 

413 course.strip() 

414 for course in courses_required_string.replace('"', "").split( 

415 "," 

416 ) 

417 ] 

418 

419 if not set(temp["modules_required"]).issubset(modules): 

420 raise ValueError( 

421 f"Invalid module(s) in opportunity: {temp['title']}, row {i+2}" 

422 ) 

423 if not set(temp["courses_required"]).issubset(courses): 

424 raise ValueError( 

425 f"Invalid course(s) in opportunity: {temp['title']}, row {i+2}" 

426 ) 

427 if not isinstance(temp["title"], str) or not temp["title"].strip(): 

428 raise ValueError( 

429 f"Title is required and cannot be empty in opportunity at row {i+2}" 

430 ) 

431 if ( 

432 not isinstance(temp["description"], str) 

433 or not temp["description"].strip() 

434 ): 

435 raise ValueError( 

436 f"Description is required and cannot be empty in opportunity at row {i+2}" 

437 ) 

438 if not isinstance(temp["location"], str): 

439 temp["location"] = "" 

440 if not isinstance(temp["url"], str): 

441 temp["url"] = "" 

442 

443 temp["title"] = temp["title"].strip() 

444 temp["description"] = temp["description"].strip() 

445 temp["url"] = ( 

446 temp["url"].strip() if isinstance(temp["url"], str) else "" 

447 ) 

448 temp["location"] = ( 

449 temp["location"].strip() 

450 if isinstance(temp["location"], str) 

451 else "" 

452 ) 

453 

454 temp["title"] = escape(temp["title"]) 

455 temp["description"] = escape(temp["description"]) 

456 temp["url"] = escape(temp["url"]) 

457 temp["location"] = escape(temp["location"]) 

458 temp["duration"] = escape(temp["duration"]) 

459 temp["modules_required"] = [ 

460 escape(module) for module in temp["modules_required"] 

461 ] 

462 temp["courses_required"] = [ 

463 escape(course) for course in temp["courses_required"] 

464 ] 

465 clean_data.append(temp) 

466 

467 if clean_data: 

468 DATABASE_MANAGER.insert_many("opportunities", clean_data) 

469 

470 return jsonify({"message": "Opportunities uploaded successfully"}), 200 

471 

472 except Exception as e: 

473 print(f"[ERROR] Failed to upload opportunities: {e}") 

474 return jsonify({"error": f"Failed to upload opportunities: {e}"}), 400