112 lines
4.0 KiB
Python
112 lines
4.0 KiB
Python
from flask_sqlalchemy import SQLAlchemy
|
|
from flask import Flask
|
|
from flask_cors import CORS
|
|
from enum import Enum
|
|
from faker import Faker
|
|
import random
|
|
|
|
app = Flask(__name__)
|
|
# app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///rest_server_books.db'
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:liuyanfeng66@localhost:5432/BookRestGrpcCompare'
|
|
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
|
|
'pool_size': 30, # 主连接数
|
|
'max_overflow': 50, # 额外连接数
|
|
'pool_pre_ping': True, # 检测失效连接,自动重连
|
|
# 'connect_args': {
|
|
# 'check_same_thread': False,
|
|
# 'timeout': 30 # 文件锁最长等待秒数,视情况可调
|
|
# }
|
|
}
|
|
CORS(app, supports_credentials=True)
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
|
|
class BaseModel(db.Model):
|
|
__abstract__ = True
|
|
|
|
def to_dict(self, exclude: set = None):
|
|
exclude = exclude or set()
|
|
ret = {}
|
|
for c in self.__table__.columns:
|
|
if c.name not in exclude:
|
|
curr_item = getattr(self, c.name)
|
|
if isinstance(curr_item, Enum):
|
|
ret[c.name] = curr_item.value
|
|
else:
|
|
ret[c.name] = curr_item
|
|
|
|
return ret
|
|
|
|
|
|
class Books(BaseModel):
|
|
__tablename__ = 'books'
|
|
id = db.Column(db.BigInteger, primary_key=True, autoincrement=True)
|
|
isbn: str = db.Column(db.String(20), nullable=True, index=True)
|
|
barcode: str = db.Column(db.String(50), nullable=True)
|
|
|
|
title: str = db.Column(db.Text, nullable=False)
|
|
subtitle: str = db.Column(db.Text, nullable=True)
|
|
author: str = db.Column(db.Text, nullable=False)
|
|
translator: str = db.Column(db.Text, nullable=True)
|
|
editor: str = db.Column(db.Text, nullable=True)
|
|
publisher: str = db.Column(db.String(200), nullable=True)
|
|
publication_date = db.Column(db.BigInteger, nullable=True)
|
|
edition: str = db.Column(db.String(50), nullable=True)
|
|
pages: int = db.Column(db.Integer, nullable=True)
|
|
language: str = db.Column(db.String(50), nullable=True)
|
|
|
|
category_id: int = db.Column(db.Integer, nullable=True, index=True)
|
|
subject: str = db.Column(db.String(200), nullable=True)
|
|
keywords: str = db.Column(db.Text, nullable=True)
|
|
description: str = db.Column(db.Text, nullable=True)
|
|
abstract: str = db.Column(db.Text, nullable=True)
|
|
|
|
format: str = db.Column(db.String(50), nullable=True)
|
|
binding: str = db.Column(db.String(20), nullable=True)
|
|
weight: float = db.Column(db.Float, nullable=True)
|
|
cover_image: str = db.Column(db.String(500), nullable=True)
|
|
|
|
|
|
def generate_random_book_data(book_id: int, fake: Faker) -> dict:
|
|
return {
|
|
'id': book_id,
|
|
'isbn': fake.isbn13(separator='-'),
|
|
'barcode': fake.ean(length=13),
|
|
'title': fake.sentence(nb_words=6),
|
|
'subtitle': fake.sentence(nb_words=3),
|
|
'author': fake.name(),
|
|
'translator': fake.name(),
|
|
'editor': fake.name(),
|
|
'publisher': fake.company(),
|
|
'publication_date': random.randint(-2190472676, 1754206067),
|
|
'edition': fake.word(),
|
|
'pages': random.randint(100, 1000),
|
|
'language': random.choice(['English', 'Français', 'Deutsch']),
|
|
'category_id': random.randint(1, 100),
|
|
'subject': fake.word(),
|
|
'keywords': ", ".join(fake.words(nb=5, unique=True, ext_word_list=None)),
|
|
'description': fake.text(max_nb_chars=200),
|
|
'abstract': fake.text(max_nb_chars=100),
|
|
'format': random.choice(['16mo', '8vo', '4to', 'Folio', 'Quarto']),
|
|
'binding': random.choice(['expensive', 'cheap', 'paperback', 'hardcover']),
|
|
'weight': random.uniform(100, 2000),
|
|
'cover_image': "https://example.com/cover_image.jpg"
|
|
}
|
|
|
|
def check_and_generate_random_data():
|
|
db.create_all()
|
|
|
|
if Books.query.count() == 0:
|
|
fake = Faker('en_GB')
|
|
for i in range(10000):
|
|
book = Books(
|
|
**generate_random_book_data(i, fake)
|
|
)
|
|
db.session.add(book)
|
|
db.session.commit()
|
|
|
|
|
|
# with app.app_context():
|
|
# check_and_generate_random_data()
|