Bootcamp Tutorial 3: Mastering Decorators & Database Communication in Flask API
Introduction
In this tutorial, we’ll dive deeper into decorators, advanced API communication, and database operations. By the end of this guide, you’ll be able to:
- Implement custom decorators for role-based access control
- Enhance database communication with SQLAlchemy
- Perform CRUD operations on users and tasks with Flask-RESTful
- Securely assign tasks and manage user approvals
1. Understanding Python Decorators
What is a Decorator?
A decorator in Python is a function that modifies another function’s behavior without changing its structure. It allows us to reuse functionality, making our code cleaner and more efficient.
Example of a Basic Decorator
1
2
3
4
5
6
7
8
9
10
11
12
def my_decorator(func):
def wrapper():
print("Something before the function runs")
func()
print("Something after the function runs")
return wrapper
@my_decorator
def say_hello():
print("Hello, World!")
say_hello()
Output:
1
2
3
Something before the function runs
Hello, World!
Something after the function runs
Now, let’s implement a decorator for role-based access control in our Flask API.
2. Implementing Role-Based Access Control (RBAC)
Helper Function: Get Current User
Before enforcing access control, we need a helper function to fetch the currently logged-in user.
1
2
3
def get_current_user():
username = get_jwt_identity()
return User.query.filter_by(username=username).first()
This function retrieves the user from the JWT token identity.
Decorator: Restrict Access by Role
We create a role_required
decorator that restricts API access based on user roles.
1
2
3
4
5
6
7
8
9
def role_required(required_roles):
def decorator(func):
def wrapper(*args, **kwargs):
user = get_current_user()
if user is None or user.role not in required_roles:
return jsonify({"message": "Unauthorized access"}), 403
return func(*args, **kwargs)
return wrapper
return decorator
How It Works
- It checks if the current user has the required role.
- If unauthorized, it returns a 403 Forbidden response.
3. Advanced Database Communication with API
Managing User Approvals (Admin Only)
In a multi-role system, we want admins to approve or reject users before they can access tasks.
Fetching Pending Approvals
1
2
3
4
5
6
class UserApprovalResource(Resource):
@jwt_required()
@role_required(["admin"])
def get(self):
users = User.query.filter_by(is_approved=False).all()
return jsonify([user.serialize() for user in users])
🔹 Only admins can retrieve the list of users pending approval.
Approving Users
1
2
3
4
5
6
7
8
9
10
@jwt_required()
@role_required(["admin"])
def put(self, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({"message": "User not found"})
user.is_approved = True
db.session.commit()
return jsonify({"message": "User approved successfully"})
🔹 Admins can approve users by setting is_approved = True
.
Rejecting Users
1
2
3
4
5
6
7
8
9
10
@jwt_required()
@role_required(["admin"])
def delete(self, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({"message": "User not found"})
db.session.delete(user)
db.session.commit()
return jsonify({"message": "User rejected and removed"})
🔹 Unapproved users can be deleted from the database.
4. Task Management APIs
CRUD Operations on Tasks
Fetch All Tasks or a Single Task
1
2
3
4
5
6
7
8
9
class TaskResource(Resource):
@jwt_required()
@role_required(["admin", "manager"])
def get(self, task_id=None):
if task_id:
task = Task.query.get(task_id)
return jsonify(task.serialize() if task else {"message": "Task not found"})
tasks = Task.query.all()
return jsonify([task.serialize() for task in tasks])
🔹 Admins & Managers can fetch tasks.
Create a New Task
1
2
3
4
5
6
7
8
9
10
11
12
@jwt_required()
@role_required(["manager", "admin"])
def post(self):
data = request.get_json()
new_task = Task(
title=data['title'],
description=data.get('description'),
deadline=datetime.strptime(data['deadline'], '%Y-%m-%d') if data.get('deadline') else None
)
db.session.add(new_task)
db.session.commit()
return jsonify({"message": "Task created successfully"})
🔹 Managers & Admins can create tasks.
Update Task Details
1
2
3
4
5
6
7
8
9
10
11
12
13
@jwt_required()
@role_required(["manager", "admin"])
def put(self, task_id):
task = Task.query.get(task_id)
if not task:
return jsonify({"message": "Task not found"})
data = request.get_json()
task.title = data.get('title', task.title)
task.description = data.get('description', task.description)
task.status = data.get('status', task.status)
db.session.commit()
return jsonify({"message": "Task updated successfully"})
🔹 Managers & Admins can edit tasks.
Delete a Task (Admin Only)
1
2
3
4
5
6
7
8
9
10
@jwt_required()
@role_required(["admin"])
def delete(self, task_id):
task = Task.query.get(task_id)
if not task:
return jsonify({"message": "Task not found"})
db.session.delete(task)
db.session.commit()
return jsonify({"message": "Task deleted successfully"})
🔹 Only Admins can delete tasks.
5. Assigning Tasks to Users
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class AssignTaskResource(Resource):
@jwt_required()
@role_required(["manager"])
def put(self, task_id):
data = request.get_json()
user_id = data.get("user_id")
task = Task.query.get(task_id)
user = User.query.get(user_id)
if not task or not user:
return jsonify({"message": "Task or User not found"})
task.assigned_user_id = user.id
db.session.commit()
return jsonify({"message": "Task assigned successfully"})
🔹 Managers can assign tasks to employees.
6. Generating API Statistics
1
2
3
4
5
6
7
8
9
10
11
12
13
class StatsResource(Resource):
@jwt_required()
@role_required(["admin", "manager"])
def get(self):
total_users = User.query.count()
total_tasks = Task.query.count()
completed_tasks = Task.query.filter_by(status="completed").count()
return jsonify({
"total_users": total_users,
"total_tasks": total_tasks,
"completed_tasks": completed_tasks
})
🔹 Admins & Managers can view system-wide stats.
Endpoint | Method | Roles Allowed | Description |
---|---|---|---|
/tasks | GET | Admin, Manager | Fetch all tasks |
/task/<task_id> | GET | Admin, Manager | Fetch a single task |
/task/<task_id> | PUT | Manager, Admin | Update a task |
/task/<task_id> | DELETE | Admin | Delete a task |
/users/pending | GET | Admin | Fetch unapproved users |
/users/<user_id>/approve | PUT | Admin | Approve a user |
/users/<user_id>/reject | DELETE | Admin | Reject and remove a user |
/task/<task_id>/assign | PUT | Manager | Assign a task to a user |
/stats | GET | Admin, Manager | View statistics |
Final code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, make_response
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from werkzeug.security import generate_password_hash, check_password_hash
from flask_restful import Api, Resource
# ================================================= app configuration =========================================================
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
app.config["JWT_SECRET_KEY"] = "aStrongSecretKey"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(hours=24)
db = SQLAlchemy(app)
jwt = JWTManager(app)
api = Api(app)
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# ================================================= models =========================================================
# User Model
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
role = db.Column(db.String(20), default="employee") # employee, manager, admin
is_approved = db.Column(db.Boolean, default=False) # Admin approval required
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
# Task Model
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text, nullable=True)
status = db.Column(db.String(20), default="pending") # pending, in_progress, completed
deadline = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
assigned_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
with app.app_context():
db.create_all()
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# Helper Function to Check Role
def get_current_user():
username = get_jwt_identity()
return User.query.filter_by(username=username).first()
# decorator Function to Check Role
def role_required(required_roles):
def decorator(func):
def wrapper(*args, **kwargs):
user = get_current_user()
if user is None or user.role not in required_roles:
return jsonify({"message": "Unauthorized access"}), 403
return func(*args, **kwargs)
return wrapper
return decorator
# ================================================= Authentication =========================================================
# User Signup
class SignupResource(Resource):
def post(self):
data = request.get_json()
username = data['username']
email = data['email']
password = data['password']
if User.query.filter_by(username=username).first() or User.query.filter_by(email=email).first():
return jsonify({'message': 'Username or Email already exists'})
hashed_password = generate_password_hash(password)
new_user = User(username=username, email=email, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
return jsonify({'message': 'User registered successfully'})
# User Login
class LoginResource(Resource):
def post(self):
data = request.get_json()
email = data['email']
password = data['password']
user = User.query.filter_by(email=email).first()
if user and check_password_hash(user.password_hash, password):
access_token = create_access_token(identity=user.username)
return jsonify({"msg": "successfully logged in", "token": access_token})
return jsonify({"msg": "email or password incorrect."})
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# ================================================= User Verification =========================================================
# User Approval (Admin only)
class UserApprovalResource(Resource):
@jwt_required()
@role_required(["admin"])
def get(self):
users = User.query.filter_by(is_approved=False).all()
return jsonify([user.serialize() for user in users])
@jwt_required()
@role_required(["admin"])
def put(self, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({"message": "User not found"})
user.is_approved = True
db.session.commit()
return jsonify({"message": "User approved successfully"})
@jwt_required()
@role_required(["admin"])
def delete(self, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({"message": "User not found"})
db.session.delete(user)
db.session.commit()
return jsonify({"message": "User rejected and removed"})
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# ================================================= Task Management =========================================================
# Fetch all tasks, create a new task, update, delete tasks
class TaskResource(Resource):
@jwt_required()
@role_required(["admin", "manager"])
def get(self, task_id=None):
if task_id:
task = Task.query.get(task_id)
return jsonify(task.serialize() if task else {"message": "Task not found"})
tasks = Task.query.all()
return jsonify([task.serialize() for task in tasks])
@jwt_required()
@role_required(["manager", "admin"])
def post(self):
data = request.get_json()
new_task = Task(
title=data['title'],
description=data.get('description'),
deadline=datetime.strptime(data['deadline'], '%Y-%m-%d') if data.get('deadline') else None
)
db.session.add(new_task)
db.session.commit()
return jsonify({"message": "Task created successfully"})
@jwt_required()
@role_required(["manager", "admin"])
def put(self, task_id):
task = Task.query.get(task_id)
if not task:
return jsonify({"message": "Task not found"})
data = request.get_json()
task.title = data.get('title', task.title)
task.description = data.get('description', task.description)
task.status = data.get('status', task.status)
db.session.commit()
return jsonify({"message": "Task updated successfully"})
@jwt_required()
@role_required(["admin"])
def delete(self, task_id):
task = Task.query.get(task_id)
if not task:
return jsonify({"message": "Task not found"})
db.session.delete(task)
db.session.commit()
return jsonify({"message": "Task deleted successfully"})
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# Assign Task to User
class AssignTaskResource(Resource):
@jwt_required()
@role_required(["manager"])
def put(self, task_id):
data = request.get_json()
user_id = data.get("user_id")
task = Task.query.get(task_id)
user = User.query.get(user_id)
if not task or not user:
return jsonify({"message": "Task or User not found"})
task.assigned_user_id = user.id
db.session.commit()
return jsonify({"message": "Task assigned successfully"})
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# Stats API
class StatsResource(Resource):
@jwt_required()
@role_required(["admin", "manager"])
def get(self):
total_users = User.query.count()
total_tasks = Task.query.count()
completed_tasks = Task.query.filter_by(status="completed").count()
return jsonify({
"total_users": total_users,
"total_tasks": total_tasks,
"completed_tasks": completed_tasks
})
# ================================================= XXXXXXXXXXXXXXXXXX =========================================================
# Register API Endpoints
api.add_resource(SignupResource, '/register')
api.add_resource(LoginResource, '/login')
api.add_resource(TaskResource, '/tasks', '/task/<int:task_id>')
api.add_resource(UserApprovalResource, '/users/pending', '/users/<int:user_id>/approve', '/users/<int:user_id>/reject')
api.add_resource(StatsResource, '/stats')
api.add_resource(AssignTaskResource, '/task/<int:task_id>/assign')
if __name__ == '__main__':
app.run(debug=True, port=5000, host="127.0.0.1")
🚀 Next: