Coverage for employers/models.py: 84%
140 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 14:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 14:02 +0000
1"""Employer model."""
3from html import escape
4import tempfile
5import uuid
6from flask import redirect, jsonify, session, send_file
7import pandas as pd
8from core import email_handler, handlers
11class Employers:
12 """Employer class."""
14 def start_session(self):
15 """Starts a session."""
16 session["employer_logged_in"] = True
17 return redirect("/employers/home", code=200)
19 def register_employer(self, employer):
20 """Adding new employer."""
21 from app import DATABASE_MANAGER
23 if DATABASE_MANAGER.get_one_by_field_strict(
24 "employers", "email", employer["email"].lower()
25 ):
26 return jsonify({"error": "Email already in use"}), 400
28 existing_employer = DATABASE_MANAGER.get_one_by_field_strict(
29 "employers", "company_name", employer["company_name"]
30 )
31 employer["email"] = employer["email"].lower()
33 if existing_employer:
34 return jsonify({"error": "Company name already exists"}), 400
36 DATABASE_MANAGER.insert("employers", employer)
38 if employer:
39 return jsonify(employer), 200
41 return jsonify({"error": "Employer not added"}), 400
43 def get_company_name(self, _id):
44 """Get company name"""
45 from app import DATABASE_MANAGER
47 employer = DATABASE_MANAGER.get_one_by_id("employers", _id)
48 if not employer:
49 return ""
51 return employer["company_name"]
53 def employer_login(self, email):
54 """Logs in the employer."""
55 handlers.clear_session_save_theme()
56 from app import DATABASE_MANAGER
58 employer = DATABASE_MANAGER.get_by_email("employers", email)
59 if employer:
60 email_handler.send_otp(employer["email"])
61 session["employer"] = employer
62 return jsonify({"message": "OTP sent"}), 200
64 return jsonify({"error": "Email not found"}), 404
66 def get_employers(self):
67 """Gets all employers."""
68 from app import DATABASE_MANAGER
70 employers = DATABASE_MANAGER.get_all("employers")
72 return employers
74 def get_employer_by_id(self, employer_id):
75 """Retrieves an employer by its ID."""
76 from app import DATABASE_MANAGER
78 employer = DATABASE_MANAGER.get_one_by_id("employers", employer_id)
79 if employer:
80 return employer
81 return None
83 def delete_employer_by_id(self, _id):
84 """Deletes an employer."""
85 from app import DATABASE_MANAGER
86 from opportunities.models import Opportunity
88 employer = DATABASE_MANAGER.get_one_by_id("employers", _id)
89 if not employer:
90 return jsonify({"error": "Employer not found"}), 404
91 DATABASE_MANAGER.delete_by_id("employers", _id)
93 opportunities = DATABASE_MANAGER.get_all("opportunities")
94 for opportunity in opportunities:
95 if opportunity["employer_id"] == _id:
96 Opportunity().delete_opportunity_by_id(opportunity["_id"])
98 return jsonify({"message": "Employer deleted"}), 200
100 def update_employer(self, employer_id, update_data):
101 """Updates an employer in the database."""
102 from app import DATABASE_MANAGER
104 employer = DATABASE_MANAGER.get_one_by_id("employers", employer_id)
105 if not employer:
106 return jsonify({"error": "Employer not found"}), 404
108 DATABASE_MANAGER.update_one_by_id("employers", employer_id, update_data)
110 return jsonify({"message": "Employer updated successfully"}), 200
112 def rank_preferences(self, opportunity_id, preferences):
113 """Sets a students preferences."""
114 from app import DATABASE_MANAGER
116 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", opportunity_id)
118 if not opportunity:
119 return jsonify({"error": "Opportunity not found"}), 404
121 DATABASE_MANAGER.update_one_by_id(
122 "opportunities", opportunity_id, {"preferences": preferences}
123 )
124 return jsonify({"message": "Preferences updated"}), 200
126 def get_company_email_by_id(self, _id):
127 """Get company email by id"""
128 from app import DATABASE_MANAGER
130 employer = DATABASE_MANAGER.get_one_by_id("employers", _id)
131 if not employer:
132 return ""
134 return employer["email"]
136 def delete_all_employers(self):
137 """Deletes all employers."""
138 from app import DATABASE_MANAGER
140 DATABASE_MANAGER.delete_all("employers")
142 DATABASE_MANAGER.delete_all("opportunities")
144 students = DATABASE_MANAGER.get_all("students")
145 for student in students:
146 if "preferences" in student:
147 DATABASE_MANAGER.delete_field_by_id(
148 "students", student["_id"], "preferences"
149 )
151 return jsonify({"message": "All employers deleted"}), 200
153 def download_all_employers(self):
154 """Downloads all employers."""
155 from app import DATABASE_MANAGER
157 employers = DATABASE_MANAGER.get_all("employers")
158 for employer in employers:
159 employer.pop("_id")
160 employer["Company_name"] = employer.pop("company_name")
161 employer["Email"] = employer.pop("email")
163 # Convert employers to DataFrame
164 df = pd.DataFrame(employers)
166 with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp:
167 df.to_excel(tmp.name, index=False)
168 tmp_path = tmp.name
170 return send_file(
171 tmp_path, as_attachment=True, download_name="employers.xlsx"
172 )
174 def upload_employers(self, file):
175 """Uploads employers."""
177 from app import DATABASE_MANAGER
179 # Read the file
180 try:
181 df = handlers.excel_verifier_and_reader(file, {"Company_name", "Email"})
182 except ValueError as e:
183 return jsonify({"error": f"Failed to read file: {str(e)}"}), 400
185 # Convert DataFrame to list of dictionaries
186 employers = df.to_dict(orient="records")
188 current_employers = DATABASE_MANAGER.get_all("employers")
190 current_employer_names = set(
191 employer["company_name"].lower() for employer in current_employers
192 )
193 current_employer_emails = set(
194 employer["email"].lower() for employer in current_employers
195 )
197 emails = set()
198 company_names = set()
199 clean_data = []
200 for i, employer in enumerate(employers):
201 temp = {
202 "_id": uuid.uuid4().hex,
203 "company_name": escape(employer["Company_name"]),
204 "email": escape(employer["Email"]),
205 }
206 if not temp["company_name"] or not temp["email"]:
207 return jsonify({"error": "Company name and email are required"}), 400
208 temp["email"] = temp["email"].lower()
209 if temp["company_name"].lower() in current_employer_names:
210 return (
211 jsonify(
212 {
213 "error": f"Company name {temp['company_name']} already exists as row {i+2}"
214 }
215 ),
216 400,
217 )
218 if temp["email"].lower() in current_employer_emails:
219 return (
220 jsonify(
221 {"error": f"Email {temp['email']} already exists as row {i+2}"}
222 ),
223 400,
224 )
225 if temp["email"].lower() in emails:
226 return (
227 jsonify(
228 {"error": f"Email {temp['email']} already exists as row {i+2}"}
229 ),
230 400,
231 )
232 if temp["company_name"].lower() in company_names:
233 return (
234 jsonify(
235 {
236 "error": f"Company name {temp['company_name']} already exists as row {i+2}"
237 }
238 ),
239 400,
240 )
242 clean_data.append(temp)
243 emails.add(temp["email"].lower())
244 company_names.add(temp["company_name"].lower())
246 DATABASE_MANAGER.insert_many("employers", clean_data)
248 return (
249 jsonify({"message": f"{len(clean_data)} employers uploaded successfully"}),
250 200,
251 )
253 def get_deadlines_for_employer_dashboard(self):
254 from app import DEADLINE_MANAGER
256 if not DEADLINE_MANAGER.is_past_details_deadline():
257 return (
258 "Add Any Number of Opportunities Deadline",
259 DEADLINE_MANAGER.get_details_deadline(),
260 "Add your placements now!",
261 )
263 if not DEADLINE_MANAGER.is_past_student_ranking_deadline():
264 return (
265 "Students Ranking Opportunities/Roles Deadline",
266 DEADLINE_MANAGER.get_student_ranking_deadline(),
267 "Wait till this deadline passes.",
268 )
270 if not DEADLINE_MANAGER.is_past_opportunities_ranking_deadline():
271 return (
272 "Ranking Student For Individual Opportunities/Roles Deadline",
273 DEADLINE_MANAGER.get_opportunities_ranking_deadline(),
274 "Rank students for your placements now!",
275 )
277 return "All Deadlines Passed", None, "All deadlines have passed."