Coverage for core/database_mongo_manager.py: 96%
79 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 14:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 14:02 +0000
1"""Module for managing MongoDB database operations."""
3import sys
4from dotenv import load_dotenv
5import pymongo
6from pymongo.errors import (
7 ConfigurationError,
8 OperationFailure,
9 ServerSelectionTimeoutError,
10)
11from colorama import Fore, Style
13from core import shared
14from .database_interface import DatabaseInterface
17class DatabaseMongoManager(DatabaseInterface):
18 """Class to manage MongoDB database operations."""
20 def __init__(self, connection, database):
21 """Initialize the DatabaseMongoManager class."""
22 super().__init__()
24 self.connect(connection, database)
26 def connect(self, connection, database):
27 """Connect to the MongoDB database."""
28 load_dotenv()
29 self.connection = pymongo.MongoClient(serverSelectionTimeoutMS=5000)
30 if (
31 shared.getenv("IS_GITHUB_ACTION") == "False"
32 and connection is not None
33 and shared.getenv("OFFLINE") != "True"
34 ):
35 self.connection = pymongo.MongoClient(
36 connection, serverSelectionTimeoutMS=5000
37 )
39 try:
40 self.connection.admin.command("ping")
41 print(
42 Fore.GREEN
43 + "Pinged your deployment. You successfully connected to MongoDB!"
44 + Style.RESET_ALL
45 )
46 except ConfigurationError as e:
47 print(Fore.RED + f"Configuration error: {e}" + Style.RESET_ALL)
48 sys.exit(1)
49 except OperationFailure as e:
50 print(Fore.RED + f"Operation failure: {e}" + Style.RESET_ALL)
51 sys.exit(1)
52 except ServerSelectionTimeoutError as e:
53 print(Fore.RED + f"Server selection timeout error: {e}" + Style.RESET_ALL)
54 sys.exit(1)
55 if database == "":
56 database = "cs3528_testing"
57 self.database = self.connection[database]
59 def get_all(self, table):
60 """Get all records from a table."""
61 return list(self.database[table].find())
63 def get_one_by_id(self, table, id_val):
64 """Get one record by ID."""
65 return self.database[table].find_one({"_id": id_val})
67 def insert(self, table, data):
68 """Insert a record into a table."""
69 return self.database[table].insert_one(data)
71 def update_one_by_id(self, table, id_val, data):
72 """Update a record
73 Args:
74 table: The table to update
75 id_val: The ID of the record to update
76 data: The data to update
77 """
78 return self.database[table].update_one({"_id": id_val}, {"$set": data})
80 def update_one_by_field(self, table, field, value, data):
81 """Update a record by field.
82 Args:
83 table: The table to update
84 field: The field to search
85 value: The value to search
86 data: The data to update
87 """
88 return self.database[table].update_one({field: value}, {"$set": data})
90 def increment(self, table, id_val, field, increment):
91 """Increment a field by a value.
92 Args:
93 table: The table to update
94 id_val: The ID of the record to update
95 field: The field to increment
96 increment: The value to increment by
97 """
98 return self.database[table].update_one(
99 {"_id": id_val}, {"$inc": {field: increment}}
100 )
102 def delete_by_id(self, table, id_val):
103 """Delete a record by ID.
104 Args:
105 table: The table to delete from
106 id_val: The ID of the record to delete
107 """
108 return self.database[table].delete_one({"_id": id_val})
110 def delete_one_by_field(self, table, field, value):
111 """Delete a record by field.
112 Args:
113 table: The table to delete from
114 field: The field to search
115 value: The value to search
116 """
117 return self.database[table].delete_one({field: value})
119 def delete_all_by_field(self, table, field, value):
120 """Delete all records by field.
121 Args:
122 table: The table to delete from
123 field: The field to search
124 value: The value to search
125 """
126 return self.database[table].delete_many({field: value})
128 def delete_all(self, table):
129 """Delete all records from a table.
130 Args:
131 table: The table to delete from
132 """
133 return self.database[table].delete_many({})
135 def get_by_email(self, table, email):
136 """Get a record by email.
137 Args:
138 table: The table to search
139 email: The email to search
140 """
141 return self.database[table].find_one({"email": email})
143 def delete_field_by_id(self, table, id_val, field):
144 """Delete a field by ID.
145 Args:
146 table: The table to update
147 id_val: The ID of the record to update
148 field: The field to delete
149 """
150 return self.database[table].update_one({"_id": id_val}, {"$unset": {field: ""}})
152 def get_one_by_field(self, table, field, value):
153 """Get one record by field."""
154 return self.database[table].find_one({field: value})
156 def get_one_by_field_strict(self, table, field, value):
157 """Get one record by field with strict matching."""
158 return self.database[table].find_one(
159 {field: {"$regex": f"^{value}$", "$options": "i"}}
160 )
162 def is_table(self, table):
163 """Check if a table exists."""
164 return table in self.table_list
166 def get_all_by_two_fields(self, table, field1, value1, field2, value2):
167 """Get all records by two fields."""
168 return list(self.database[table].find({field1: value1, field2: value2}))
170 def get_all_by_in_list(self, table, field, values_list):
171 """Get all records by a list of values."""
172 return list(self.database[table].find({field: {"$in": values_list}}))
174 def update_by_field(self, table, field, value, data):
175 """Update a record by field."""
176 return self.database[table].update_one({field: value}, {"$set": data})
178 def get_all_by_field(self, table, field, value):
179 """Get all records by field."""
180 return list(self.database[table].find({field: value}))
182 def create_index(self, table, field):
183 """Create an index on a field."""
184 return self.database[table].create_index(field)
186 def get_all_by_text_search(self, table, search_text):
187 """Get all records by text search."""
188 return list(self.database[table].find({"$text": {"$search": search_text}}))
190 def close_connection(self):
191 """Close the connection to the database."""
192 self.connection.close()
194 def delete_collection(self, table):
195 """Delete a collection."""
196 return self.database[table].drop()
198 def insert_many(self, table, data):
199 """Insert many records into a table."""
200 return self.database[table].insert_many(data)