2025-08-20 11:54:40 +01:00

87 lines
2.9 KiB
Python

import grpc
from server.grpc.proto import Book_pb2, Book_pb2_grpc
from utils import get_size_from_dict, NetworkingSize
class TrafficInterceptor(grpc.UnaryUnaryClientInterceptor):
def __init__(self):
self.last_call_info = None
def intercept_unary_unary(self, continuation, client_call_details, request):
request_size = len(request.SerializeToString())
response = continuation(client_call_details, request)
response_size = len(response.result().SerializeToString())
self.last_call_info = {
'request_bytes': request_size,
'response_bytes': response_size
}
return response
class GrpcClient:
def __init__(self, address='localhost:50051'):
self.traffic_interceptor = TrafficInterceptor()
base_channel = grpc.insecure_channel(address, options=[
('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1),
])
self.channel = grpc.intercept_channel(
base_channel,
self.traffic_interceptor
)
self.stub = Book_pb2_grpc.BookServiceStub(self.channel)
def get_list(self, pages=1, per_page=10000, list_data_limit=30):
try:
request = Book_pb2.GetListRequest(pages=pages, per_page=per_page, list_data_limit=list_data_limit)
response = self.stub.GetList(request)
return response, get_size_from_dict(self.traffic_interceptor.last_call_info)
except grpc.RpcError as e:
print(f"Error: {e}")
def add_books(self, books, test_only=False):
try:
request = Book_pb2.AddBookRequest(books=books, test_only=test_only)
response = self.stub.AddBooks(request)
return response, get_size_from_dict(self.traffic_interceptor.last_call_info)
except grpc.RpcError as e:
print(f"Error: {e}")
def delete_books(self, book_ids, delete_last_count=-1):
try:
request = Book_pb2.DeleteBookRequest(book_ids=book_ids, delete_last_count=delete_last_count)
response = self.stub.DeleteBooks(request)
return response, get_size_from_dict(self.traffic_interceptor.last_call_info)
except grpc.RpcError as e:
print(f"Error: {e}")
def update_book(self, book):
try:
request = Book_pb2.UpdateBookRequest(book=book)
response = self.stub.UpdateBook(request)
return response, get_size_from_dict(self.traffic_interceptor.last_call_info)
except grpc.RpcError as e:
print(f"Error: {e}")
def ping(self):
try:
self.stub.Ping(Book_pb2.Empty())
except grpc.RpcError as e:
print(f"Error: {e}")
def close(self):
# 程序结束时关闭连接
self.channel.close()
def run_client():
GrpcClient().get_list(1, 10000)
if __name__ == '__main__':
run_client()