Coverage for employers/models.py: 84%

140 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 14:02 +0000

1"""Employer model.""" 

2 

3from html import escape 

4import tempfile 

5import uuid 

6from flask import redirect, jsonify, session, send_file 

7import pandas as pd 

8from core import email_handler, handlers 

9 

10 

11class Employers: 

12 """Employer class.""" 

13 

14 def start_session(self): 

15 """Starts a session.""" 

16 session["employer_logged_in"] = True 

17 return redirect("/employers/home", code=200) 

18 

19 def register_employer(self, employer): 

20 """Adding new employer.""" 

21 from app import DATABASE_MANAGER 

22 

23 if DATABASE_MANAGER.get_one_by_field_strict( 

24 "employers", "email", employer["email"].lower() 

25 ): 

26 return jsonify({"error": "Email already in use"}), 400 

27 

28 existing_employer = DATABASE_MANAGER.get_one_by_field_strict( 

29 "employers", "company_name", employer["company_name"] 

30 ) 

31 employer["email"] = employer["email"].lower() 

32 

33 if existing_employer: 

34 return jsonify({"error": "Company name already exists"}), 400 

35 

36 DATABASE_MANAGER.insert("employers", employer) 

37 

38 if employer: 

39 return jsonify(employer), 200 

40 

41 return jsonify({"error": "Employer not added"}), 400 

42 

43 def get_company_name(self, _id): 

44 """Get company name""" 

45 from app import DATABASE_MANAGER 

46 

47 employer = DATABASE_MANAGER.get_one_by_id("employers", _id) 

48 if not employer: 

49 return "" 

50 

51 return employer["company_name"] 

52 

53 def employer_login(self, email): 

54 """Logs in the employer.""" 

55 handlers.clear_session_save_theme() 

56 from app import DATABASE_MANAGER 

57 

58 employer = DATABASE_MANAGER.get_by_email("employers", email) 

59 if employer: 

60 email_handler.send_otp(employer["email"]) 

61 session["employer"] = employer 

62 return jsonify({"message": "OTP sent"}), 200 

63 

64 return jsonify({"error": "Email not found"}), 404 

65 

66 def get_employers(self): 

67 """Gets all employers.""" 

68 from app import DATABASE_MANAGER 

69 

70 employers = DATABASE_MANAGER.get_all("employers") 

71 

72 return employers 

73 

74 def get_employer_by_id(self, employer_id): 

75 """Retrieves an employer by its ID.""" 

76 from app import DATABASE_MANAGER 

77 

78 employer = DATABASE_MANAGER.get_one_by_id("employers", employer_id) 

79 if employer: 

80 return employer 

81 return None 

82 

83 def delete_employer_by_id(self, _id): 

84 """Deletes an employer.""" 

85 from app import DATABASE_MANAGER 

86 from opportunities.models import Opportunity 

87 

88 employer = DATABASE_MANAGER.get_one_by_id("employers", _id) 

89 if not employer: 

90 return jsonify({"error": "Employer not found"}), 404 

91 DATABASE_MANAGER.delete_by_id("employers", _id) 

92 

93 opportunities = DATABASE_MANAGER.get_all("opportunities") 

94 for opportunity in opportunities: 

95 if opportunity["employer_id"] == _id: 

96 Opportunity().delete_opportunity_by_id(opportunity["_id"]) 

97 

98 return jsonify({"message": "Employer deleted"}), 200 

99 

100 def update_employer(self, employer_id, update_data): 

101 """Updates an employer in the database.""" 

102 from app import DATABASE_MANAGER 

103 

104 employer = DATABASE_MANAGER.get_one_by_id("employers", employer_id) 

105 if not employer: 

106 return jsonify({"error": "Employer not found"}), 404 

107 

108 DATABASE_MANAGER.update_one_by_id("employers", employer_id, update_data) 

109 

110 return jsonify({"message": "Employer updated successfully"}), 200 

111 

112 def rank_preferences(self, opportunity_id, preferences): 

113 """Sets a students preferences.""" 

114 from app import DATABASE_MANAGER 

115 

116 opportunity = DATABASE_MANAGER.get_one_by_id("opportunities", opportunity_id) 

117 

118 if not opportunity: 

119 return jsonify({"error": "Opportunity not found"}), 404 

120 

121 DATABASE_MANAGER.update_one_by_id( 

122 "opportunities", opportunity_id, {"preferences": preferences} 

123 ) 

124 return jsonify({"message": "Preferences updated"}), 200 

125 

126 def get_company_email_by_id(self, _id): 

127 """Get company email by id""" 

128 from app import DATABASE_MANAGER 

129 

130 employer = DATABASE_MANAGER.get_one_by_id("employers", _id) 

131 if not employer: 

132 return "" 

133 

134 return employer["email"] 

135 

136 def delete_all_employers(self): 

137 """Deletes all employers.""" 

138 from app import DATABASE_MANAGER 

139 

140 DATABASE_MANAGER.delete_all("employers") 

141 

142 DATABASE_MANAGER.delete_all("opportunities") 

143 

144 students = DATABASE_MANAGER.get_all("students") 

145 for student in students: 

146 if "preferences" in student: 

147 DATABASE_MANAGER.delete_field_by_id( 

148 "students", student["_id"], "preferences" 

149 ) 

150 

151 return jsonify({"message": "All employers deleted"}), 200 

152 

153 def download_all_employers(self): 

154 """Downloads all employers.""" 

155 from app import DATABASE_MANAGER 

156 

157 employers = DATABASE_MANAGER.get_all("employers") 

158 for employer in employers: 

159 employer.pop("_id") 

160 employer["Company_name"] = employer.pop("company_name") 

161 employer["Email"] = employer.pop("email") 

162 

163 # Convert employers to DataFrame 

164 df = pd.DataFrame(employers) 

165 

166 with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp: 

167 df.to_excel(tmp.name, index=False) 

168 tmp_path = tmp.name 

169 

170 return send_file( 

171 tmp_path, as_attachment=True, download_name="employers.xlsx" 

172 ) 

173 

174 def upload_employers(self, file): 

175 """Uploads employers.""" 

176 

177 from app import DATABASE_MANAGER 

178 

179 # Read the file 

180 try: 

181 df = handlers.excel_verifier_and_reader(file, {"Company_name", "Email"}) 

182 except ValueError as e: 

183 return jsonify({"error": f"Failed to read file: {str(e)}"}), 400 

184 

185 # Convert DataFrame to list of dictionaries 

186 employers = df.to_dict(orient="records") 

187 

188 current_employers = DATABASE_MANAGER.get_all("employers") 

189 

190 current_employer_names = set( 

191 employer["company_name"].lower() for employer in current_employers 

192 ) 

193 current_employer_emails = set( 

194 employer["email"].lower() for employer in current_employers 

195 ) 

196 

197 emails = set() 

198 company_names = set() 

199 clean_data = [] 

200 for i, employer in enumerate(employers): 

201 temp = { 

202 "_id": uuid.uuid4().hex, 

203 "company_name": escape(employer["Company_name"]), 

204 "email": escape(employer["Email"]), 

205 } 

206 if not temp["company_name"] or not temp["email"]: 

207 return jsonify({"error": "Company name and email are required"}), 400 

208 temp["email"] = temp["email"].lower() 

209 if temp["company_name"].lower() in current_employer_names: 

210 return ( 

211 jsonify( 

212 { 

213 "error": f"Company name {temp['company_name']} already exists as row {i+2}" 

214 } 

215 ), 

216 400, 

217 ) 

218 if temp["email"].lower() in current_employer_emails: 

219 return ( 

220 jsonify( 

221 {"error": f"Email {temp['email']} already exists as row {i+2}"} 

222 ), 

223 400, 

224 ) 

225 if temp["email"].lower() in emails: 

226 return ( 

227 jsonify( 

228 {"error": f"Email {temp['email']} already exists as row {i+2}"} 

229 ), 

230 400, 

231 ) 

232 if temp["company_name"].lower() in company_names: 

233 return ( 

234 jsonify( 

235 { 

236 "error": f"Company name {temp['company_name']} already exists as row {i+2}" 

237 } 

238 ), 

239 400, 

240 ) 

241 

242 clean_data.append(temp) 

243 emails.add(temp["email"].lower()) 

244 company_names.add(temp["company_name"].lower()) 

245 

246 DATABASE_MANAGER.insert_many("employers", clean_data) 

247 

248 return ( 

249 jsonify({"message": f"{len(clean_data)} employers uploaded successfully"}), 

250 200, 

251 ) 

252 

253 def get_deadlines_for_employer_dashboard(self): 

254 from app import DEADLINE_MANAGER 

255 

256 if not DEADLINE_MANAGER.is_past_details_deadline(): 

257 return ( 

258 "Add Any Number of Opportunities Deadline", 

259 DEADLINE_MANAGER.get_details_deadline(), 

260 "Add your placements now!", 

261 ) 

262 

263 if not DEADLINE_MANAGER.is_past_student_ranking_deadline(): 

264 return ( 

265 "Students Ranking Opportunities/Roles Deadline", 

266 DEADLINE_MANAGER.get_student_ranking_deadline(), 

267 "Wait till this deadline passes.", 

268 ) 

269 

270 if not DEADLINE_MANAGER.is_past_opportunities_ranking_deadline(): 

271 return ( 

272 "Ranking Student For Individual Opportunities/Roles Deadline", 

273 DEADLINE_MANAGER.get_opportunities_ranking_deadline(), 

274 "Rank students for your placements now!", 

275 ) 

276 

277 return "All Deadlines Passed", None, "All deadlines have passed."