2025-08-20 11:54:40 +01:00

151 lines
4.9 KiB
Python

from flask import Flask, request, jsonify
import time
import psutil
from database import app, db, Books
import argparse
def performance_test(is_get=False):
def decorator(func):
def wrapper(*args, **kwargs):
process = psutil.Process()
cpu_count = psutil.cpu_count()
# 1. parse request
process.cpu_percent()
# cpu_start = process.cpu_times()
parse_start = time.perf_counter()
if is_get:
data = request.args.to_dict()
else:
data = request.get_json()
parse_end = time.perf_counter()
cpu_percent_mid = process.cpu_percent()
# cpu_mid = process.cpu_times()
# process
result = func(*args, **kwargs, request_data=data)
# 2. serialize response
serialize_start = time.perf_counter()
process.cpu_percent()
# cpu_serialize_start = process.cpu_times()
response_data = jsonify(result)
serialize_end = time.perf_counter()
cpu_end_percent = process.cpu_percent()
# cpu_end = process.cpu_times()
parse_time = parse_end - parse_start
# parse_cpu_time = (cpu_mid.user - cpu_start.user) + (cpu_mid.system - cpu_start.system)
server_serialize_time = serialize_end - serialize_start
# server_serialize_cpu_time = (cpu_end.user - cpu_serialize_start.user) + (cpu_end.system - cpu_serialize_start.system)
# timing
timing_data = {
'server_deserialize': {
'time': parse_time,
'cpu': cpu_percent_mid / cpu_count # / parse_time * 100
},
'server_serialize': {
'time': server_serialize_time,
'cpu': cpu_end_percent / cpu_count # / server_serialize_time * 100
},
'server_protocol_total_time': parse_end - parse_start + serialize_end - serialize_start,
'response_data': result,
}
# print(f"Server timing: {timing_data}")
return jsonify(timing_data), response_data.status_code
wrapper.__name__ = func.__name__
return wrapper
return decorator
@app.route('/api/get_list', methods=['GET'])
@performance_test(is_get=True)
def get_list(request_data):
page_count = int(request_data.get("pages", 1))
per_page = int(request_data.get("per_page", 100))
list_data_limit = int(request_data.get("list_data_limit", per_page))
if list_data_limit <= 0:
list_data_limit = per_page
if per_page <= 0:
return []
data = Books.query.order_by(Books.id).paginate(page=page_count, per_page=min(per_page, list_data_limit), error_out=False)
books = [book.to_dict() for book in data.items]
if len(books) < per_page:
books = books * (per_page // len(books)) + books[:per_page % len(books)]
return books
@app.route('/api/add_book', methods=['POST'])
@performance_test(is_get=False)
def add_book(request_data):
books_data = request_data.get('books', [])
test_only = request_data.get('test_only', False)
if test_only:
return {"message": "Books added successfully"}
for book in books_data:
new_book = Books(**book)
db.session.add(new_book)
db.session.commit()
return {"message": "Books added successfully"}
@app.route('/api/delete_books', methods=['DELETE'])
@performance_test(is_get=False)
def delete_books(request_data):
book_ids = request_data.get('book_ids', [])
delete_last_count = request_data.get('delete_last_count', -1)
if delete_last_count > 0:
last_ids = db.session.query(Books.id).order_by(Books.id.desc()).limit(delete_last_count).subquery()
Books.query.filter(Books.id.in_(
db.session.query(last_ids.c.id)
)).delete(synchronize_session=False)
else:
Books.query.filter(Books.id.in_(book_ids)).delete(synchronize_session=False)
db.session.commit()
return {"message": "Books deleted successfully"}
@app.route('/api/update_book', methods=['PUT'])
@performance_test(is_get=False)
def update_book(request_data):
book_data = request_data.get('book', {})
book_id = book_data.get('id')
book = db.session.get(Books, book_id)
if not book:
return {"message": "Book not found"}
for key, value in book_data.items():
if key == 'id':
continue
setattr(book, key, value)
db.session.commit()
return {"message": "Book updated successfully"}
@app.route('/api/ping', methods=['GET'])
def ping():
return jsonify({"message": "pong"})
if __name__ == '__main__':
arg_parse = argparse.ArgumentParser(description="Run REST server")
arg_parse.add_argument("--port", "-p", type=int, default=9500, help="Port to run the server on")
args = arg_parse.parse_args()
app.run("0.0.0.0", port=args.port)