wechat-automation

安装量: 164
排名: #5277

安装

npx skills add https://github.com/cacr92/wereply --skill wechat-automation
WeChat Automation Skill
Expert guidance for WeChat monitoring and automation using wxauto (Windows) and Accessibility API (macOS).
Overview
WeReply uses Platform-specific Agents to monitor WeChat conversations and control the input box:
Windows Agent
Python 3.12 + wxauto v4
macOS Agent
Swift + Accessibility API
Communication
JSON protocol via stdin/stdout with Rust Orchestrator Architecture Pattern 微信窗口 ↓ (UI Automation) Platform Agent ├→ 监听消息(定时轮询) ├→ 提取消息内容 ├→ 发送到 Orchestrator (JSON via stdout) └→ 接收命令 (JSON via stdin) ↓ 控制输入框(写入建议) Windows Agent - wxauto v4 Installation and Setup

安装依赖

pip install wxauto == 4.0 .0

确保微信已登录且窗口可见

Message Monitoring Pattern import json import time import sys from wxauto import WeChat class WeChatMonitor : def init ( self , interval_ms : int = 500 ) : """ 初始化微信监听器 Args: interval_ms: 监听间隔(毫秒),默认 500ms """ self . wechat = WeChat ( ) self . interval_ms = interval_ms self . last_message_id = None def start_monitoring ( self ) : """开始监听微信消息""" try : while True :

获取当前聊天窗口的最新消息

messages

self . wechat . GetAllMessage ( ) if messages and len ( messages )

0 : latest_message = messages [ - 1 ]

检查是否是新消息(避免重复处理)

message_id

self . _generate_message_id ( latest_message ) if message_id != self . last_message_id : self . last_message_id = message_id self . _send_message_to_orchestrator ( latest_message )

间隔等待

time . sleep ( self . interval_ms / 1000.0 ) except KeyboardInterrupt : self . _send_error ( "监听被用户中断" ) except Exception as e : self . _send_error ( f"监听错误: { str ( e ) } " ) def _generate_message_id ( self , message ) -

str : """生成消息唯一ID(用于去重)"""

结合时间戳、发送者、内容生成ID

content

message . get ( 'content' , '' ) sender = message . get ( 'sender' , '' ) timestamp = message . get ( 'time' , '' ) return f" { sender } : { timestamp } : { hash ( content ) } " def _send_message_to_orchestrator ( self , message ) : """ 发送消息到 Rust Orchestrator 格式: { "type": "MessageNew", "content": "消息内容", "sender": "发送者", "timestamp": "2024-01-23T10:30:00" } """ payload = { "type" : "MessageNew" , "content" : message . get ( 'content' , '' ) , "sender" : message . get ( 'sender' , '' ) , "timestamp" : message . get ( 'time' , '' ) }

输出到 stdout(Rust 会读取)

print ( json . dumps ( payload , ensure_ascii = False ) , flush = True ) def _send_error ( self , error_message : str ) : """发送错误信息到 Orchestrator""" payload = { "type" : "Error" , "message" : error_message } print ( json . dumps ( payload , ensure_ascii = False ) , flush = True )

使用示例

if name == 'main' : monitor = WeChatMonitor ( interval_ms = 500 ) monitor . start_monitoring ( ) Input Box Control Pattern class WeChatInputWriter : def init ( self ) : self . wechat = WeChat ( ) def write_to_input ( self , content : str ) -

bool : """ 写入内容到微信输入框 Args: content: 要写入的文本 Returns: bool: 写入是否成功 """ try :

使用 wxauto 写入输入框

self . wechat . SendMsg ( content ) return True except Exception as e : self . _send_error ( f"写入失败: { str ( e ) } " ) return False def clear_input ( self ) -

bool : """清空输入框""" try :

wxauto v4 提供的清空方法

self . wechat . ClearMsg ( ) return True except Exception as e : self . _send_error ( f"清空失败: { str ( e ) } " ) return False def _send_error ( self , error_message : str ) : """发送错误到 Orchestrator""" payload = { "type" : "Error" , "message" : error_message } print ( json . dumps ( payload , ensure_ascii = False ) , flush = True ) Command Handling Pattern import sys import json import threading class AgentCommandHandler : def init ( self ) : self . input_writer = WeChatInputWriter ( ) self . running = True def start_command_listener ( self ) : """监听来自 Orchestrator 的命令(stdin)""" thread = threading . Thread ( target = self . _listen_commands , daemon = True ) thread . start ( ) def _listen_commands ( self ) : """从 stdin 读取命令""" try : for line in sys . stdin : if not self . running : break try : command = json . loads ( line . strip ( ) ) self . _handle_command ( command ) except json . JSONDecodeError : self . _send_error ( f"无效的 JSON 命令: { line } " ) except Exception as e : self . _send_error ( f"命令监听错误: { str ( e ) } " ) def _handle_command ( self , command : dict ) : """处理命令""" cmd_type = command . get ( 'type' ) if cmd_type == 'WriteInput' : content = command . get ( 'content' , '' ) success = self . input_writer . write_to_input ( content ) self . _send_response ( success ) elif cmd_type == 'ClearInput' : success = self . input_writer . clear_input ( ) self . _send_response ( success ) elif cmd_type == 'HealthCheck' : self . _send_health_status ( ) else : self . _send_error ( f"未知命令类型: { cmd_type } " ) def _send_response ( self , success : bool ) : """发送命令执行结果""" payload = { "type" : "CommandResponse" , "success" : success } print ( json . dumps ( payload , ensure_ascii = False ) , flush = True ) def _send_health_status ( self ) : """发送健康状态""" payload = { "type" : "HealthStatus" , "status" : "ok" , "agent_type" : "windows_wxauto" } print ( json . dumps ( payload , ensure_ascii = False ) , flush = True ) def _send_error ( self , error_message : str ) : """发送错误""" payload = { "type" : "Error" , "message" : error_message } print ( json . dumps ( payload , ensure_ascii = False ) , flush = True ) macOS Agent - Accessibility API Swift Implementation Pattern import Cocoa import ApplicationServices class WeChatMonitor { private var monitoringTimer : Timer ? private var lastMessageId : String ? private let intervalMs : Int init ( intervalMs : Int = 500 ) { self . intervalMs = intervalMs } func startMonitoring ( ) { // 请求 Accessibility 权限 if ! AXIsProcessTrusted ( ) { let options = [ kAXTrustedCheckOptionPrompt . takeUnretainedValue ( ) as String : true ] AXIsProcessTrustedWithOptions ( options as CFDictionary ) return } // 启动定时器 monitoringTimer = Timer . scheduledTimer ( withTimeInterval : TimeInterval ( intervalMs ) / 1000.0 , repeats : true ) { [ weak self ] _ in self ? . checkForNewMessages ( ) } RunLoop . main . run ( ) } private func checkForNewMessages ( ) { guard let wechatApp = getWeChatApplication ( ) else { return } // 使用 Accessibility API 获取消息 if let messages = extractMessages ( from : wechatApp ) { if let latestMessage = messages . last { let messageId = generateMessageId ( message : latestMessage ) if messageId != lastMessageId { lastMessageId = messageId sendMessageToOrchestrator ( message : latestMessage ) } } } } private func getWeChatApplication ( ) -> AXUIElement ? { let runningApps = NSWorkspace . shared . runningApplications guard let wechatApp = runningApps . first ( where : { $0 . bundleIdentifier == "com.tencent.xinWeChat" } ) else { return nil } return AXUIElementCreateApplication ( wechatApp . processIdentifier ) } private func extractMessages ( from app : AXUIElement ) -> [ [ String : String ] ] ? { // 使用 Accessibility API 提取消息列表 // 这需要深入分析微信的 UI 层次结构 var messagesValue : AnyObject ? let result = AXUIElementCopyAttributeValue ( app , kAXChildrenAttribute as CFString , & messagesValue ) guard result == . success , let windows = messagesValue as ? [ AXUIElement ] else { return nil } // 遍历窗口,找到聊天窗口,提取消息 // 具体实现需要根据微信的 UI 结构调整 return nil // Placeholder } private func generateMessageId ( message : [ String : String ] ) -> String { let content = message [ "content" ] ?? "" let sender = message [ "sender" ] ?? "" let timestamp = message [ "timestamp" ] ?? "" return " ( sender ) : ( timestamp ) : ( content . hashValue ) " } private func sendMessageToOrchestrator ( message : [ String : String ] ) { let payload : [ String : Any ] = [ "type" : "MessageNew" , "content" : message [ "content" ] ?? "" , "sender" : message [ "sender" ] ?? "" , "timestamp" : message [ "timestamp" ] ?? "" ] if let jsonData = try ? JSONSerialization . data ( withJSONObject : payload ) , let jsonString = String ( data : jsonData , encoding : . utf8 ) { print ( jsonString , terminator : "\n" ) fflush ( stdout ) } } } Input Writer (Swift) class WeChatInputWriter { func writeToInput ( content : String ) -> Bool { guard let wechatApp = getWeChatApplication ( ) else { sendError ( message : "未找到微信应用" ) return false } // 查找输入框 guard let inputField = findInputField ( in : wechatApp ) else { sendError ( message : "未找到输入框" ) return false } // 写入内容 var value = content as CFTypeRef let result = AXUIElementSetAttributeValue ( inputField , kAXValueAttribute as CFString , value ) if result == . success { return true } else { sendError ( message : "写入失败: ( result . rawValue ) " ) return false } } private func findInputField ( in app : AXUIElement ) -> AXUIElement ? { // 使用 Accessibility API 查找输入框 // 需要遍历 UI 层次结构找到输入框元素 return nil // Placeholder } private func getWeChatApplication ( ) -> AXUIElement ? { // 同上 return nil } private func sendError ( message : String ) { let payload : [ String : Any ] = [ "type" : "Error" , "message" : message ] if let jsonData = try ? JSONSerialization . data ( withJSONObject : payload ) , let jsonString = String ( data : jsonData , encoding : . utf8 ) { print ( jsonString , terminator : "\n" ) fflush ( stdout ) } } } Message Deduplication Strategy Time-based Deduplication from datetime import datetime , timedelta class MessageDeduplicator : def init ( self , window_seconds : int = 5 ) : """ 消息去重器 Args: window_seconds: 去重时间窗口(秒) """ self . seen_messages = { }

{message_id: timestamp}

self . window_seconds = window_seconds def is_duplicate ( self , message_id : str ) -

bool : """检查消息是否重复""" now = datetime . now ( )

清理过期的消息记录

self . _clean_old_messages ( now )

检查是否已见过

if message_id in self . seen_messages : return True

记录新消息

self . seen_messages [ message_id ] = now return False def _clean_old_messages ( self , now : datetime ) : """清理过期的消息记录""" cutoff = now - timedelta ( seconds = self . window_seconds ) self . seen_messages = { msg_id : timestamp for msg_id , timestamp in self . seen_messages . items ( ) if timestamp

cutoff } Performance Optimization Polling Interval Tuning class AdaptiveMonitor : def init ( self , min_interval_ms : int = 200 , max_interval_ms : int = 1000 ) : """ 自适应监听间隔 当有活跃消息时,使用较短间隔(200ms) 当长时间无消息时,逐渐增加到最大间隔(1000ms) """ self . min_interval = min_interval_ms / 1000.0 self . max_interval = max_interval_ms / 1000.0 self . current_interval = self . min_interval self . idle_count = 0 def get_next_interval ( self , has_new_message : bool ) -

float : """获取下次轮询间隔""" if has_new_message :

有新消息,使用最短间隔

self . current_interval = self . min_interval self . idle_count = 0 else :

无新消息,逐渐增加间隔

self . idle_count += 1 if self . idle_count

5 :

5次无消息后开始增加间隔

self . current_interval = min ( self . current_interval * 1.2 , self . max_interval ) return self . current_interval Memory Optimization import gc class MemoryEfficientMonitor : def init ( self ) : self . message_buffer_size = 100

只保留最近100条消息

self . message_buffer = [ ] def add_message ( self , message ) : """添加消息到缓冲区""" self . message_buffer . append ( message )

超过缓冲区大小,清理旧消息

if len ( self . message_buffer )

self . message_buffer_size : self . message_buffer = self . message_buffer [ - self . message_buffer_size : ] gc . collect ( )

触发垃圾回收

Error Handling and Recovery Graceful Degradation class RobustAgent : def init ( self ) : self . max_retries = 3 self . retry_delay_seconds = 2 def monitor_with_retry ( self ) : """带重试的监听""" retry_count = 0 while retry_count < self . max_retries : try : self . start_monitoring ( ) break

成功,跳出循环

except Exception as e : retry_count += 1 self . _send_error ( f"监听失败 (尝试 { retry_count } / { self . max_retries } ): { str ( e ) } " ) if retry_count < self . max_retries : time . sleep ( self . retry_delay_seconds ) else : self . _send_error ( "监听失败次数过多,Agent 退出" ) sys . exit ( 1 ) Health Check class HealthMonitor : def init ( self ) : self . last_heartbeat = time . time ( ) self . heartbeat_interval = 10

每10秒发送一次心跳

def send_heartbeat ( self ) : """发送心跳到 Orchestrator""" payload = { "type" : "Heartbeat" , "timestamp" : time . time ( ) , "status" : "ok" } print ( json . dumps ( payload , ensure_ascii = False ) , flush = True ) self . last_heartbeat = time . time ( ) Security Considerations Input Validation def validate_command ( command : dict ) -

bool : """验证来自 Orchestrator 的命令"""

检查命令类型

if 'type' not in command : return False cmd_type = command [ 'type' ]

只接受预定义的命令类型

valid_types

[ 'WriteInput' , 'ClearInput' , 'HealthCheck' , 'Stop' ] if cmd_type not in valid_types : return False

验证内容长度(防止恶意超长内容)

if cmd_type == 'WriteInput' : content = command . get ( 'content' , '' ) if len ( content )

10000 :

最大10KB

return False return True Privacy Protection def sanitize_message_for_logging ( message : dict ) -

dict : """清理消息中的敏感信息(用于日志)""" sanitized = message . copy ( )

不记录完整的消息内容

if 'content' in sanitized : content = sanitized [ 'content' ] if len ( content )

50 : sanitized [ 'content' ] = content [ : 50 ] + '...' return sanitized Testing Guidelines Unit Testing import unittest from unittest . mock import Mock , patch class TestWeChatMonitor ( unittest . TestCase ) : def test_message_deduplication ( self ) : """测试消息去重""" deduplicator = MessageDeduplicator ( window_seconds = 5 ) message_id = "test_message_1"

第一次应该不是重复

self . assertFalse ( deduplicator . is_duplicate ( message_id ) )

第二次应该是重复

self . assertTrue ( deduplicator . is_duplicate ( message_id ) ) @patch ( 'wxauto.WeChat' ) def test_monitor_initialization ( self , mock_wechat ) : """测试监听器初始化""" monitor = WeChatMonitor ( interval_ms = 500 ) self . assertEqual ( monitor . interval_ms , 500 ) self . assertIsNone ( monitor . last_message_id ) When to Use This Skill Activate this skill when: Implementing WeChat message monitoring Developing Platform Agents (Windows/macOS) Working with wxauto or Accessibility API Handling message extraction and deduplication Implementing input box control Optimizing Agent performance Handling Agent errors and recovery Setting up IPC communication with Orchestrator

返回排行榜