| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 
 | import urllib3import datetime
 import time
 import requests
 import jmespath
 from pywchat import Sender
 
 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 
 
 class BondsCalendar(object):
 """可转债日历类(集思录)
 
 通过集思录中的投资日历模块, 获取本周和当天的可转债申购信息
 """
 HEADERS = {
 'authority': 'www.jisilu.cn',
 'accept': 'application/json, text/javascript, */*; q=0.01',
 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
 'referer': 'https://www.jisilu.cn/data/calendar/',
 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
 'sec-ch-ua-mobile': '?0',
 'sec-ch-ua-platform': '"Windows"',
 'sec-fetch-dest': 'empty',
 'sec-fetch-mode': 'cors',
 'sec-fetch-site': 'same-origin',
 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41',
 'x-requested-with': 'XMLHttpRequest',
 }
 
 def __init__(self):
 self.session = requests.session()
 self.session.headers = BondsCalendar.HEADERS
 self.domain = "https://www.jisilu.cn"
 self.day = datetime.date.today()
 
 @staticmethod
 def get_calendar():
 """
 获取今天的零时时间戳和24时的时间戳
 :return: tuple: start_tamp, end_tamp
 """
 
 dt = time.strftime("%Y-%m-%d")
 
 start_array = time.strptime(f"{dt} 00:00:00", "%Y-%m-%d %H:%M:%S")
 day_start_tamp = int(time.mktime(start_array))
 
 end_array = time.strptime(f"{dt} 23:59:59", "%Y-%m-%d %H:%M:%S")
 day_end_tamp = int(time.mktime(end_array))
 
 return day_start_tamp, day_end_tamp
 
 def get_api(self, start_tamp, end_tamp):
 """
 获取某一时间范围内的数据
 :param start_tamp: timetamp(10):开始时间
 :param end_tamp: timetamp(10):结束时间
 :return: json: response
 """
 
 API = "/data/calendar/get_calendar_data/"
 params = {
 "qtype": "CNV",
 "start": start_tamp,
 "end": end_tamp,
 "_": int(time.time() * 1000)
 }
 try:
 rsp = self.session.get(self.domain + API, params=params, verify=False)
 rsp.raise_for_status()
 return rsp.json()
 
 except Exception as e:
 print(e)
 
 @staticmethod
 def handle_rsp(rsp):
 """
 获取response中含申购日的title信息并提取申购名称
 :param rsp: json: json化的响应
 :return: str: 处理好的消息
 """
 str_lists = jmespath.search("[?contains(title,'【申购日】') ==`true`].title", rsp)
 str_iterators = map(lambda x: x.lstrip("【申购日】"), str_lists)
 str_name = "\n".join(str_iterators)
 return str_name
 
 def message_format(self, info):
 """
 消息组装格式化
 :param info: str:需要组装的消息
 :return: str: 发送的格式化消息
 """
 week = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期天"]
 template = f"今天({self.day}) {week[self.day.weekday()]}\n\n可转债申购列表:\n"
 return template + info
 
 def send_msg(self, msg):
 data = {
 "msgtype": "text",
 "text": {
 "content": msg
 }
 }
 
 requests.post("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}",
 json=data,
 headers={"Content-Type": "application/json"})
 
 def run(self):
 tamps = self.get_calendar()
 rsp = self.get_api(*tamps)
 bonds_names = self.handle_rsp(rsp)
 
 
 if not bonds_names:
 return
 msg_info = self.message_format(bonds_names)
 self.send_msg(msg_info)
 
 
 bond = BondsCalendar()
 bond.run()
 
 |