forked from vincentdj/energyapi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
84 lines (71 loc) · 3.19 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from fastapi import FastAPI, HTTPException, Depends, Query, Body
from app.database import user_collection
from app.models import UserModel
from app.schema import UserCreate, UserUpdate
from app.auth import create_access_token, get_current_user, get_current_active_user, get_current_active_admin
from bson.objectid import ObjectId
from pymongo.errors import DuplicateKeyError
import os
from typing import Any
app = FastAPI()
# Update the MongoDB connection string to use an environment variable
MONGO_DETAILS = os.getenv("MONGO_URL", "mongodb://localhost:27017/mydatabase")
@app.get("/users/")
async def get_all_users():
users = []
async for user in user_collection.find({"role": "user"}, {"name": 1, "_id": 0}): # Filter by role and select only the name field
users.append(user["name"])
return users
@app.get("/admin/")
async def get_all_users(current_user: dict = Depends(get_current_active_admin)):
users = []
async for user in user_collection.find():
user["_id"] = str(user["_id"]) # Convert ObjectId to str
users.append(UserModel(**user))
return users
@app.get("/users/{user_id}")
async def get_user_by_id(user_id: str, current_user: dict = Depends(get_current_active_user)):
user = await user_collection.find_one({"_id": ObjectId(user_id)})
if user:
user["_id"] = str(user["_id"]) # Convert ObjectId to str
return UserModel(**user)
raise HTTPException(status_code=404, detail="User not found")
# NoSQL Injection vulnerability
@app.get("/search/")
async def search_users(name: str = Query(None), current_user: dict = Depends(get_current_active_admin)):
# Vulnerable to NoSQL Injection
query = {"name": {"$regex": name}} if name else {}
users = []
async for user in user_collection.find(query):
user["_id"] = str(user["_id"]) # Convert ObjectId to str
users.append(UserModel(**user))
return users
# NoSQL Injection vulnerability
@app.post("/login/")
async def login(name: str = Body(...), password: Any = Body(...)):
if not name or not password:
raise HTTPException(status_code=400, detail="Name and password must be provided")
if isinstance(password, str):
user = await user_collection.find_one({"name": {"$regex": name}, "password": password})
else:
user = await user_collection.find_one({"name": {"$regex": name}, "password": password})
if user:
token = create_access_token({
"sub": user["name"],
"role": user["role"]
})
return {"access_token": token, "token_type": "bearer"}
raise HTTPException(status_code=401, detail="Invalid credentials")
# NoSQL Injection vulnerability
@app.get("/login/")
async def login(name: str = Query(None), password: Any = Query(None)):
if not name or not password:
raise HTTPException(status_code=400, detail="Name and password must be provided")
user = await user_collection.find_one({"name": {"$regex": name}, "password": password})
if user:
token = create_access_token({
"sub": user["name"],
"role": user["role"]
})
return {"access_token": token, "token_type": "bearer"}
raise HTTPException(status_code=401, detail="Invalid credentials")