Coverage for opportunities/models.py: 94%
229 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 14:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 14:02 +0000
1"""
2Opportunity model.
3"""
5from html import escape
6import tempfile
7import uuid
8from flask import jsonify, send_file, session
9import pandas as pd
10from core import handlers
11from employers.models import Employers
14class Opportunity:
15 """Opportunity class."""
17 def add_update_opportunity(
18 self,
19 opportunity,
20 is_admin=False,
21 ):
22 """Adding new opportunity."""
23 from app import DATABASE_MANAGER
25 find_opportunity = DATABASE_MANAGER.get_one_by_id(
26 "opportunities", opportunity["_id"]
27 )
28 if find_opportunity and not is_admin:
29 if find_opportunity["employer_id"] != session["employer"]["_id"]:
30 return jsonify({"error": "Unauthorized Access."}), 401
31 DATABASE_MANAGER.delete_by_id("opportunities", opportunity["_id"])
33 DATABASE_MANAGER.insert("opportunities", opportunity)
35 if opportunity:
36 return jsonify(opportunity), 200
38 return jsonify({"error": "Opportunity not added"}), 400
40 def get_opportunities_for_search(self, _id):
41 """Get opportunities for search."""
42 from app import DATABASE_MANAGER
44 opportunities = DATABASE_MANAGER.get_all("opportunities")
45 employers_map = {
46 employer["_id"]: employer["company_name"]
47 for employer in Employers().get_employers()
48 }
49 for opportunity in opportunities:
50 if "preferences" in opportunity:
51 opportunity["ranked"] = True
52 opportunity["company_name"] = employers_map.get(
53 opportunity["employer_id"], "Unknown Company"
54 )
56 if not _id:
57 return opportunities
59 filtered_opportunities = [
60 opportunity
61 for opportunity in opportunities
62 if opportunity["employer_id"] == _id
63 ]
64 return filtered_opportunities
66 def get_opportunities_by_title(self, title):
67 """Fetch opportunities by title."""
68 from app import DATABASE_MANAGER
70 try:
71 if not title:
72 print("[DEBUG] No title provided.")
73 return []
75 query = {"title": {"$regex": title, "$options": "i"}}
76 print(f"[DEBUG] Query for title: {query}")
78 opportunities = DATABASE_MANAGER.get_all_by_field(
79 "opportunities", "title", {"$regex": title, "$options": "i"}
80 )
81 print(f"[DEBUG] Opportunities found: {len(opportunities)}")
82 return opportunities
83 except Exception as e:
84 print(f"[ERROR] Failed to fetch opportunities by title: {e}")
85 return []
87 def get_opportunities_by_company(self, company_name):
88 """Fetch opportunities by company."""
89 from app import DATABASE_MANAGER
91 try:
92 if not company_name:
93 print("[DEBUG] No company name provided.")
94 return []
96 # Find the employer by exact company name
97 company = DATABASE_MANAGER.get_one_by_field(
98 "employers", "company_name", company_name
99 )
101 if not company:
102 print(f"[DEBUG] No company found for name: {company_name}")
103 return []
105 # Use the employer's _id to search for opportunities
106 employer_id = company["_id"]
107 print(f"[DEBUG] Employer ID found: {employer_id}")
109 # Query the opportunities collection with employer_id
111 opportunities = DATABASE_MANAGER.get_all_by_field(
112 "opportunities", "employer_id", employer_id
113 )
114 print(f"[DEBUG] Opportunities found: {len(opportunities)}")
116 return opportunities
117 except Exception as e:
118 print(f"[ERROR] Failed to fetch opportunities by company: {e}")
119 return []
121 def get_opportunity_by_company_id(self, company_id):
122 """Get opportunity by company ID."""
123 from app import DATABASE_MANAGER
125 opportunities = DATABASE_MANAGER.get_all_by_field(
126 "opportunities", "employer_id", company_id
127 )
128 return opportunities
130 def get_opportunity_by_id(self, _id):
131 """Getting opportunity."""
132 from app import DATABASE_MANAGER
134 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", _id)
136 if opportunity:
137 return opportunity
139 return None
141 def get_employer_by_id(self, _id):
142 """Get employer_id by ID."""
143 opportunity = self.get_opportunity_by_id(_id)
144 if not opportunity:
145 return ""
146 return opportunity["employer_id"]
148 def get_opportunities(self):
149 """Getting all opportunities."""
150 from app import DATABASE_MANAGER
152 return DATABASE_MANAGER.get_all("opportunities")
154 def get_opportunities_by_duration(self, duration):
155 """Getting all opportunities that match duration."""
156 from app import DATABASE_MANAGER
158 duration_list = [d.strip().replace('"', "") for d in duration[1:-1].split(",")]
159 data = DATABASE_MANAGER.get_all_by_in_list(
160 "opportunities", "duration", duration_list
161 )
163 return jsonify(data), 200
165 def delete_opportunity_by_id(self, opportunity_id):
166 """Deleting opportunity."""
167 from app import DATABASE_MANAGER
169 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", opportunity_id)
171 if (
172 handlers.get_user_type() == "employer"
173 and opportunity["employer_id"] != session["employer"]["_id"]
174 ):
175 return jsonify({"error": "Unauthorized Access."}), 401
176 if not opportunity:
177 return jsonify({"error": "Opportunity not found"}), 404
179 DATABASE_MANAGER.delete_by_id("opportunities", opportunity_id)
181 students = DATABASE_MANAGER.get_all("students")
183 for student in students:
184 if "preferences" in student and opportunity_id in student["preferences"]:
185 student["preferences"].remove(opportunity_id)
186 DATABASE_MANAGER.update_one_by_id("students", student["_id"], student)
188 return jsonify({"message": "Opportunity deleted"}), 200
190 def get_valid_students(self, opportunity_id):
191 """Get valid students for an opportunity."""
192 # pylint: disable=import-outside-toplevel
193 from students.models import Student
195 students = Student().get_students()
196 valid_students = []
197 for student in students:
198 if "preferences" in student and opportunity_id in student["preferences"]:
199 valid_students.append(student)
200 return valid_students
202 def rank_preferences(self, opportunity_id, preferences):
203 """Sets a opportunity preferences."""
204 from app import DATABASE_MANAGER
206 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", opportunity_id)
208 if not opportunity:
209 return jsonify({"error": "Opportunity not found"}), 404
211 DATABASE_MANAGER.update_one_by_field(
212 "opportunities", "_id", opportunity_id, {"preferences": preferences}
213 )
214 return jsonify({"message": "Preferences updated"}), 200
216 def delete_all_opportunities(self, is_admin):
217 """Deleting all opportunities."""
218 if is_admin:
219 return self.delete_all_opportunities_admin()
221 return self.delete_all_opportunities_employer()
223 def delete_all_opportunities_employer(self):
224 """Deleting all opportunities."""
225 from app import DATABASE_MANAGER
227 opportunities = DATABASE_MANAGER.get_all_by_field(
228 "opportunities", "employer_id", session["employer"]["_id"]
229 )
230 opportunity_ids = set(opportunity["_id"] for opportunity in opportunities)
232 for opportunity in opportunities:
233 DATABASE_MANAGER.delete_by_id("opportunities", opportunity["_id"])
235 students = DATABASE_MANAGER.get_all("students")
237 student_updates = []
238 for student in students:
239 if "preferences" in student:
240 new_preferences = [
241 preference
242 for preference in student["preferences"]
243 if preference not in opportunity_ids
244 ]
245 if new_preferences != student["preferences"]:
246 student_updates.append((student["_id"], new_preferences))
248 for student_id, new_preferences in student_updates:
249 DATABASE_MANAGER.update_one_by_id(
250 "students", student_id, {"preferences": new_preferences}
251 )
253 return jsonify({"message": "All opportunities deleted"}), 200
255 def delete_all_opportunities_admin(self):
256 """Deleting all opportunities."""
257 from app import DATABASE_MANAGER
259 students = DATABASE_MANAGER.get_all("students")
261 for student in students:
262 if "preferences" in student:
263 DATABASE_MANAGER.delete_field_by_id(
264 "students", student["_id"], "preferences"
265 )
267 DATABASE_MANAGER.delete_all("opportunities")
269 return jsonify({"message": "All opportunities deleted"}), 200
271 def download_opportunities(self, is_admin):
272 """Download all opportunities."""
273 from app import DATABASE_MANAGER
275 opportunities_data = []
276 if is_admin:
277 employers = DATABASE_MANAGER.get_all("employers")
278 employers_dict = {
279 employer["_id"]: employer["email"] for employer in employers
280 }
281 opportunities = DATABASE_MANAGER.get_all("opportunities")
282 else:
283 opportunities = DATABASE_MANAGER.get_all_by_field(
284 "opportunities", "employer_id", session["employer"]["_id"]
285 )
287 for opportunity in opportunities:
288 opportunity_data = {
289 "Title": opportunity.pop("title"),
290 "Description": opportunity.pop("description"),
291 "URL": opportunity.pop("url"),
292 "Modules_required": ",".join(opportunity.pop("modules_required")),
293 "Courses_required": ",".join(opportunity.pop("courses_required")),
294 "Spots_available": opportunity.pop("spots_available"),
295 "Location": opportunity.pop("location"),
296 "Duration": opportunity.pop("duration"),
297 }
298 if is_admin:
299 employer_email = employers_dict.get(opportunity["employer_id"])
300 if employer_email:
301 opportunity_data["Employer_email"] = employer_email
302 del opportunity["_id"]
303 del opportunity["employer_id"]
304 opportunities_data.append(opportunity_data)
306 df = pd.DataFrame(opportunities_data)
307 with tempfile.NamedTemporaryFile(suffix=".xlsx") as temp_file:
308 df.to_excel(temp_file.name, index=False)
309 temp_file_path = temp_file.name
311 return send_file(
312 temp_file_path, as_attachment=True, download_name="opportunities.xlsx"
313 )
315 def upload_opportunities(self, file, is_admin):
316 """Upload opportunities from an Excel file."""
317 from app import DATABASE_MANAGER
319 expected_columns = {
320 "Title",
321 "Description",
322 "URL",
323 "Modules_required",
324 "Courses_required",
325 "Spots_available",
326 "Location",
327 "Duration",
328 }
329 if is_admin:
330 expected_columns.add("Employer_email")
332 try:
333 df = handlers.excel_verifier_and_reader(file, expected_columns)
334 opportunities = df.to_dict(orient="records")
336 email_to_employers_map = {
337 employer["email"].lower(): employer["_id"]
338 for employer in DATABASE_MANAGER.get_all("employers")
339 }
340 modules = set(
341 module["module_id"] for module in DATABASE_MANAGER.get_all("modules")
342 )
343 courses = set(
344 course["course_id"] for course in DATABASE_MANAGER.get_all("courses")
345 )
346 clean_data = []
347 for i, opportunity in enumerate(opportunities):
348 temp = {
349 "_id": uuid.uuid4().hex,
350 "title": opportunity["Title"],
351 "description": opportunity["Description"],
352 "url": opportunity["URL"],
353 "spots_available": int(opportunity["Spots_available"]),
354 "location": opportunity["Location"],
355 "duration": opportunity["Duration"],
356 }
358 if (
359 not isinstance(temp["spots_available"], int)
360 or temp["spots_available"] < 1
361 ):
362 raise ValueError(
363 f"Invalid spots available value in opportunity: {temp['title']}, row {i+2}"
364 )
365 if is_admin:
366 employer_id = email_to_employers_map.get(
367 opportunity["Employer_email"].lower().strip()
368 )
369 if employer_id:
370 temp["employer_id"] = employer_id
371 else:
372 raise ValueError(
373 f"Employer email {opportunity['Employer_email']} not found "
374 f"in database at row {i+2}"
375 )
376 else:
377 temp["employer_id"] = session["employer"]["_id"]
379 if not temp["duration"]:
380 raise ValueError(
381 f"Duration is required and cannot be empty in opportunity: "
382 f"{temp['title']}, row {i+2}"
383 )
384 if temp["duration"] not in set(
385 [
386 "1_day",
387 "1_week",
388 "1_month",
389 "3_months",
390 "6_months",
391 "12_months",
392 ]
393 ):
394 raise ValueError(
395 f"Invalid duration value in opportunity: {temp['title']}, row {i+2}"
396 )
397 modules_required_string = opportunity["Modules_required"]
398 if not isinstance(modules_required_string, str):
399 temp["modules_required"] = []
400 else:
401 temp["modules_required"] = [
402 module.strip()
403 for module in modules_required_string.replace('"', "").split(
404 ","
405 )
406 ]
408 courses_required_string = opportunity["Courses_required"]
409 if not isinstance(courses_required_string, str):
410 temp["courses_required"] = []
411 else:
412 temp["courses_required"] = [
413 course.strip()
414 for course in courses_required_string.replace('"', "").split(
415 ","
416 )
417 ]
419 if not set(temp["modules_required"]).issubset(modules):
420 raise ValueError(
421 f"Invalid module(s) in opportunity: {temp['title']}, row {i+2}"
422 )
423 if not set(temp["courses_required"]).issubset(courses):
424 raise ValueError(
425 f"Invalid course(s) in opportunity: {temp['title']}, row {i+2}"
426 )
427 if not isinstance(temp["title"], str) or not temp["title"].strip():
428 raise ValueError(
429 f"Title is required and cannot be empty in opportunity at row {i+2}"
430 )
431 if (
432 not isinstance(temp["description"], str)
433 or not temp["description"].strip()
434 ):
435 raise ValueError(
436 f"Description is required and cannot be empty in opportunity at row {i+2}"
437 )
438 if not isinstance(temp["location"], str):
439 temp["location"] = ""
440 if not isinstance(temp["url"], str):
441 temp["url"] = ""
443 temp["title"] = temp["title"].strip()
444 temp["description"] = temp["description"].strip()
445 temp["url"] = (
446 temp["url"].strip() if isinstance(temp["url"], str) else ""
447 )
448 temp["location"] = (
449 temp["location"].strip()
450 if isinstance(temp["location"], str)
451 else ""
452 )
454 temp["title"] = escape(temp["title"])
455 temp["description"] = escape(temp["description"])
456 temp["url"] = escape(temp["url"])
457 temp["location"] = escape(temp["location"])
458 temp["duration"] = escape(temp["duration"])
459 temp["modules_required"] = [
460 escape(module) for module in temp["modules_required"]
461 ]
462 temp["courses_required"] = [
463 escape(course) for course in temp["courses_required"]
464 ]
465 clean_data.append(temp)
467 if clean_data:
468 DATABASE_MANAGER.insert_many("opportunities", clean_data)
470 return jsonify({"message": "Opportunities uploaded successfully"}), 200
472 except Exception as e:
473 print(f"[ERROR] Failed to upload opportunities: {e}")
474 return jsonify({"error": f"Failed to upload opportunities: {e}"}), 400