from flask import Flask, request, jsonify import time import psutil from database import app, db, Books import argparse def performance_test(is_get=False): def decorator(func): def wrapper(*args, **kwargs): process = psutil.Process() cpu_count = psutil.cpu_count() # 1. parse request process.cpu_percent() # cpu_start = process.cpu_times() parse_start = time.perf_counter() if is_get: data = request.args.to_dict() else: data = request.get_json() parse_end = time.perf_counter() cpu_percent_mid = process.cpu_percent() # cpu_mid = process.cpu_times() # process result = func(*args, **kwargs, request_data=data) # 2. serialize response serialize_start = time.perf_counter() process.cpu_percent() # cpu_serialize_start = process.cpu_times() response_data = jsonify(result) serialize_end = time.perf_counter() cpu_end_percent = process.cpu_percent() # cpu_end = process.cpu_times() parse_time = parse_end - parse_start # parse_cpu_time = (cpu_mid.user - cpu_start.user) + (cpu_mid.system - cpu_start.system) server_serialize_time = serialize_end - serialize_start # server_serialize_cpu_time = (cpu_end.user - cpu_serialize_start.user) + (cpu_end.system - cpu_serialize_start.system) # timing timing_data = { 'server_deserialize': { 'time': parse_time, 'cpu': cpu_percent_mid / cpu_count # / parse_time * 100 }, 'server_serialize': { 'time': server_serialize_time, 'cpu': cpu_end_percent / cpu_count # / server_serialize_time * 100 }, 'server_protocol_total_time': parse_end - parse_start + serialize_end - serialize_start, 'response_data': result, } # print(f"Server timing: {timing_data}") return jsonify(timing_data), response_data.status_code wrapper.__name__ = func.__name__ return wrapper return decorator @app.route('/api/get_list', methods=['GET']) @performance_test(is_get=True) def get_list(request_data): page_count = int(request_data.get("pages", 1)) per_page = int(request_data.get("per_page", 100)) list_data_limit = int(request_data.get("list_data_limit", per_page)) if list_data_limit <= 0: list_data_limit = per_page if per_page <= 0: return [] data = Books.query.order_by(Books.id).paginate(page=page_count, per_page=min(per_page, list_data_limit), error_out=False) books = [book.to_dict() for book in data.items] if len(books) < per_page: books = books * (per_page // len(books)) + books[:per_page % len(books)] return books @app.route('/api/add_book', methods=['POST']) @performance_test(is_get=False) def add_book(request_data): books_data = request_data.get('books', []) test_only = request_data.get('test_only', False) if test_only: return {"message": "Books added successfully"} for book in books_data: new_book = Books(**book) db.session.add(new_book) db.session.commit() return {"message": "Books added successfully"} @app.route('/api/delete_books', methods=['DELETE']) @performance_test(is_get=False) def delete_books(request_data): book_ids = request_data.get('book_ids', []) delete_last_count = request_data.get('delete_last_count', -1) if delete_last_count > 0: last_ids = db.session.query(Books.id).order_by(Books.id.desc()).limit(delete_last_count).subquery() Books.query.filter(Books.id.in_( db.session.query(last_ids.c.id) )).delete(synchronize_session=False) else: Books.query.filter(Books.id.in_(book_ids)).delete(synchronize_session=False) db.session.commit() return {"message": "Books deleted successfully"} @app.route('/api/update_book', methods=['PUT']) @performance_test(is_get=False) def update_book(request_data): book_data = request_data.get('book', {}) book_id = book_data.get('id') book = db.session.get(Books, book_id) if not book: return {"message": "Book not found"} for key, value in book_data.items(): if key == 'id': continue setattr(book, key, value) db.session.commit() return {"message": "Book updated successfully"} @app.route('/api/ping', methods=['GET']) def ping(): return jsonify({"message": "pong"}) if __name__ == '__main__': arg_parse = argparse.ArgumentParser(description="Run REST server") arg_parse.add_argument("--port", "-p", type=int, default=9500, help="Port to run the server on") args = arg_parse.parse_args() app.run("0.0.0.0", port=args.port)