| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- # -*- coding:utf-8 -*-
- # /usr/bin/env python
- """
- Date: 2025/2/26 21:52
- Desc: HTTP 模式主文件
- """
- import os
- import json
- import logging
- import urllib.parse
- from logging.handlers import TimedRotatingFileHandler
- from fastapi import APIRouter
- from fastapi import status
- from fastapi import Request
- from fastapi.responses import JSONResponse
- import ak as ak
- app_core = APIRouter()
- # 日志目录和文件名
- log_dir = "logs"
- log_name = "app.log"
- # 确保日志目录存在
- if not os.path.exists(log_dir):
- os.makedirs(log_dir)
- # 日志文件路径
- log_file_path = os.path.join(log_dir, log_name)
- # 创建一个日志记录器
- logger = logging.getLogger(name='AKLog')
- logger.setLevel(logging.INFO)
- # 创建一个TimedRotatingFileHandler来进行日志轮转
- handler = TimedRotatingFileHandler(
- filename=log_file_path, when='midnight', interval=1, backupCount=7, encoding='utf-8'
- )
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- @app_core.get(path="/public/{item_id}", description="公开接口", summary="该接口主要提供公开访问来获取数据")
- def root(request: Request, item_id: str):
- """
- 接收请求参数及接口名称并返回 JSON 数据
- 此处由于 AKShare 的请求中是同步模式,所以这边在定义 root 函数中没有使用 asyncio 来定义,这样可以开启多线程访问
- :param request: 请求信息
- :type request: Request
- :param item_id: 必选参数; 测试接口名 stock_dxsyl_em 来获取 打新收益率 数据
- :type item_id: str
- :return: 指定 接口名称 和 参数 的数据
- :rtype: json
- """
- # 获取支持的接口列表
- interface_list = dir(ak)
- decode_params = urllib.parse.unquote(str(request.query_params))
- if item_id not in interface_list:
- logger.info("未找到该接口,请检查是否拼写有误")
- return JSONResponse(
- status_code=status.HTTP_404_NOT_FOUND,
- content={
- "error": "未找到该接口,请检查是否拼写有误"
- },
- )
- if "cookie" in decode_params:
- eval_str = (
- decode_params.split(sep="=", maxsplit=1)[0]
- + "='"
- + decode_params.split(sep="=", maxsplit=1)[1]
- + "'"
- )
- eval_str = eval_str.replace("+", " ")
- else:
- eval_str = decode_params.replace("&", '", ').replace("=", '="') + '"'
- eval_str = eval_str.replace("+", " ") # 处理传递的参数中带空格的情况
- try:
- received_df = eval("ak." + item_id + ("()" if not bool(request.query_params) else f"({eval_str})"))
- if received_df is None:
- logger.info("该接口返回数据为空,请确认参数是否正确")
- return JSONResponse(
- status_code=status.HTTP_404_NOT_FOUND,
- content={"error": "该接口返回数据为空,请确认参数是否正确"},
- )
- temp_df = received_df.to_json(orient="records", date_format="iso")
- except KeyError as e:
- logger.info(
- f"请输入正确的参数,错误 {e}")
- return JSONResponse(
- status_code=status.HTTP_404_NOT_FOUND,
- content={
- "error": f"请输入正确的参数,错误 {e}"
- },
- )
- logger.info(f"获取到 {item_id} 的数据")
- return JSONResponse(status_code=status.HTTP_200_OK, content=json.loads(temp_df))
|