import grpc from server.grpc.proto import Book_pb2, Book_pb2_grpc from utils import get_size_from_dict, NetworkingSize class TrafficInterceptor(grpc.UnaryUnaryClientInterceptor): def __init__(self): self.last_call_info = None def intercept_unary_unary(self, continuation, client_call_details, request): request_size = len(request.SerializeToString()) response = continuation(client_call_details, request) response_size = len(response.result().SerializeToString()) self.last_call_info = { 'request_bytes': request_size, 'response_bytes': response_size } return response class GrpcClient: def __init__(self, address='localhost:50051'): self.traffic_interceptor = TrafficInterceptor() base_channel = grpc.insecure_channel(address, options=[ ('grpc.max_receive_message_length', -1), ('grpc.max_send_message_length', -1), ]) self.channel = grpc.intercept_channel( base_channel, self.traffic_interceptor ) self.stub = Book_pb2_grpc.BookServiceStub(self.channel) def get_list(self, pages=1, per_page=10000, list_data_limit=30): try: request = Book_pb2.GetListRequest(pages=pages, per_page=per_page, list_data_limit=list_data_limit) response = self.stub.GetList(request) return response, get_size_from_dict(self.traffic_interceptor.last_call_info) except grpc.RpcError as e: print(f"Error: {e}") def add_books(self, books, test_only=False): try: request = Book_pb2.AddBookRequest(books=books, test_only=test_only) response = self.stub.AddBooks(request) return response, get_size_from_dict(self.traffic_interceptor.last_call_info) except grpc.RpcError as e: print(f"Error: {e}") def delete_books(self, book_ids, delete_last_count=-1): try: request = Book_pb2.DeleteBookRequest(book_ids=book_ids, delete_last_count=delete_last_count) response = self.stub.DeleteBooks(request) return response, get_size_from_dict(self.traffic_interceptor.last_call_info) except grpc.RpcError as e: print(f"Error: {e}") def update_book(self, book): try: request = Book_pb2.UpdateBookRequest(book=book) response = self.stub.UpdateBook(request) return response, get_size_from_dict(self.traffic_interceptor.last_call_info) except grpc.RpcError as e: print(f"Error: {e}") def ping(self): try: self.stub.Ping(Book_pb2.Empty()) except grpc.RpcError as e: print(f"Error: {e}") def close(self): # 程序结束时关闭连接 self.channel.close() def run_client(): GrpcClient().get_list(1, 10000) if __name__ == '__main__': run_client()