本文为 其乐用户(UID:1302508) 发布的原创文章,转摘前请联系该用户获得许可
本帖最后由 1302508 于 2024-10-5 23:22 编辑
站里没找到可以用的和asf交互的qq机器人。
这里的前提是已经能部署了一个可以提供正向ws的qq机器人,并且你的asf开启了ipc
第一行写ws地址,第二行是你的qq号,第三行是asf地址加端口,第四行是asf的ipc的密码,
- 注意下面python代码的这块部分
- NAPCAT_WS_URL = os.environ.get('NAPCAT_WS_URL')
- TARGET_USER_ID = os.environ.get('TARGET_USER_ID')
- ASF_API_URL = os.environ.get('ASF_API_URL')
- ASF_API_KEY = os.environ.get('ASF_API_KEY')
- 上述代码代码修改成下面的形式,就可以直接运行了,不需要别的文件了,要记得改成自己的地址
- NAPCAT_WS_URL = "ws://1.1.1.1:3001"
- TARGET_USER_ID = "123141270"
- ASF_API_URL = "http://1.1.1.1:1242/Api"
- ASF_API_KEY = "casdasq"
复制代码
文件夹下放三个文件,三个代码块为三个文件
main.py
- import asyncio
- import websockets
- import json
- import requests
- import os
- # 配置 WebSocket 和 ASF API 的相关信息
- NAPCAT_WS_URL = os.environ.get('NAPCAT_WS_URL')
- TARGET_USER_ID = os.environ.get('TARGET_USER_ID')
- ASF_API_URL = os.environ.get('ASF_API_URL')
- ASF_API_KEY = os.environ.get('ASF_API_KEY')
- # 发送消息给指定用户
- async def send_message_to_user(user_id, message):
- async with websockets.connect(NAPCAT_WS_URL) as ws:
- # 构造发送消息的 JSON 数据
- data = {
- "action": "send_msg",
- "params": {
- "user_id": user_id,
- "message": message
- }
- }
- await ws.send(json.dumps(data))
- # 调用 ASF 的 play API
- def asf_play(botname, gameid):
- url = f"{ASF_API_URL}/Command"
- headers = {
- "Authentication": ASF_API_KEY,
- "Content-Type": "application/json"
- }
- payload = {
- "Command": f"play {botname} {gameid}"
- }
- response = requests.post(url, headers=headers, json=payload)
- return response.json()
- # 调用 ASF 的 stop API (实际上是 play 0 命令)
- def asf_stop(botname):
- url = f"{ASF_API_URL}/Command"
- headers = {
- "Authentication": ASF_API_KEY,
- "Content-Type": "application/json"
- }
- payload = {
- "Command": f"play {botname} 0"
- }
- response = requests.post(url, headers=headers, json=payload)
- return response.json()
- # 调用 ASF 的 redeem API
- def asf_redeem(botname, key):
- url = f"{ASF_API_URL}/Command"
- headers = {
- "Authentication": ASF_API_KEY,
- "Content-Type": "application/json"
- }
- payload = {
- "Command": f"redeem {botname} {key}"
- }
- response = requests.post(url, headers=headers, json=payload)
- return response.json()
- # 处理接收到的消息
- async def handle_message(message):
- try:
- msg_data = json.loads(message)
- user_id = str(msg_data.get("user_id"))
- msg_content = msg_data.get("message")
- # 只处理来自指定用户的消息
- if user_id == TARGET_USER_ID:
- if msg_content.startswith("asf play"):
- parts = msg_content.split()
- if len(parts) == 4:
- botname = parts[2]
- gameid = parts[3]
- # 调用 ASF 的 play API
- result = asf_play(botname, gameid)
- # 把结果发送给用户
- await send_message_to_user(user_id, f"ASF Play Result: {result}")
- elif msg_content.startswith("asf stop"):
- parts = msg_content.split()
- if len(parts) == 4:
- botname = parts[2]
- # 调用 ASF 的 stop API (实际上是 play 0)
- result = asf_stop(botname)
- # 把结果发送给用户
- await send_message_to_user(user_id, f"ASF Stop Result: {result}")
- elif msg_content.startswith("asf"):
- parts = msg_content.split()
- if len(parts) == 3:
- botname = parts[1]
- key = parts[2]
- # 调用 ASF 的 redeem API
- result = asf_redeem(botname, key)
- # 把结果发送给用户
- await send_message_to_user(user_id, f"ASF Redeem Result: {result}")
- except Exception as e:
- print(f"Error handling message: {e}")
- # 监听 WebSocket 消息
- async def listen_to_ws():
- async with websockets.connect(NAPCAT_WS_URL) as ws:
- while True:
- try:
- message = await ws.recv()
- await handle_message(message)
- except websockets.ConnectionClosed:
- print("Connection closed, attempting to reconnect...")
- await asyncio.sleep(5)
- await listen_to_ws() # 尝试重新连接
- except Exception as e:
- print(f"Error: {e}")
- # 主函数
- async def main():
- await listen_to_ws()
- # 运行事件循环
- if __name__ == "__main__":
- asyncio.run(main())
复制代码 Dockerfile
- FROM python:3.12.4
- WORKDIR /app
- COPY . /app
- RUN pip install -r requirements.txt
- ENV NAPCAT_WS_URL="ws://localhost:3001"
- ENV TARGET_USER_ID="default_user_id"
- ENV ASF_API_URL="http://localhost:1242/Api"
- ENV ASF_API_KEY="default_key"
- CMD ["python", "main.py"]
复制代码
requirements.txt
- websockets==13.1
- requests==2.32.3
复制代码
在三个文件都有的目录下执行下述代码,既可打包成docker镜像
- docker build -t asf-bot .
复制代码- docker run -d --name asf-bot \
- -e NAPCAT_WS_URL="ws://1.1.1.1:3001" \
- -e TARGET_USER_ID="12312312" \
- -e ASF_API_URL="http://1.1.1.1:1242/Api" \
- -e ASF_API_KEY="123123123" \
- asf-bot
复制代码 再执行上述的就可以运行了,注意这里需要改成自己的地址和密码
asf play [botname] [appid]就是开始游戏
asf stop [botname] [任意数字]就是停止所有游戏
asf [botname] [key] 就是激活游戏
目前只有这两个功能,够和群友从群里抢明key了(
感谢4ks提供的激活码,供我完成激活码测试的工作
|