1 通过 pip 安装 wechat-python-sdk , Requests 以及 Tornado
pip install tornado
pip install wechat-sdk
pip install requests
2 订阅号申请
要搭建订阅号,首先需要在微信公众平台官网进行注册,注册网址: 微信公众平台。
目前个人用户可以免费申请微信订阅号,虽然很多权限申请不到,但是基本的消息回复是没有问题的。
- 服务器接入
具体的接入步骤可以参考官网上的接入指南。
本订阅号的配置为:
进行修改配置,提交时,需要验证服务器地址的有效性
wechat.py
import tornado.escape
import tornado.web
from wechat_sdk import WechatConf
conf = WechatConf(
token=\'your_token\', # 你的公众号Token
appid=\'your_appid\', # 你的公众号的AppID
appsecret=\'your_appsecret\', # 你的公众号的AppSecret
encrypt_mode=\'safe\', # 可选项:normal/compatible/safe,分别对应于 明文/兼容/安全 模式
encoding_aes_key=\'your_encoding_aes_key\' # 如果传入此值则必须保证同时传入 token, appid
)
from wechat_sdk import WechatBasic
wechat = WechatBasic(conf=conf)
class WX(tornado.web.RequestHandler):
def get(self):
signature = self.get_argument(\'signature\', \'default\')
timestamp = self.get_argument(\'timestamp\', \'default\')
nonce = self.get_argument(\'nonce\', \'default\')
echostr = self.get_argument(\'echostr\', \'default\')
if signature != \'default\' and timestamp != \'default\' and nonce != \'default\' and echostr != \'default\' \
and wechat.check_signature(signature, timestamp, nonce):
self.write(echostr)
else:
self.write(\'Not Open\')
wechat_main.py
#!/usr/bin/env python
#coding:utf-8
import tornado.web
import tornado.httpserver
from tornado.options import define, options
import os
import wechat
settings = {
\'static_path\': os.path.join(os.path.dirname(__file__), \'static\'),
\'template_path\': os.path.join(os.path.dirname(__file__), \'view\'),
\'cookie_secret\': \'xxxxxxxxxxx\',
\'login_url\': \'/\',
\'session_secret\': "xxxxxxxxxxxxxxxxxxxxxxx",
\'session_timeout\': 3600,
\'port\': 8888,
\'wx_token\': \'your_token\',
}
web_handlers = [
(r\'/wechat\', wechat.WX),
]
#define("port", default=settings[\'port\'], help="run on the given port", type=int)
from tornado.options import define, options
define ("port", default=8888, help="run on the given port", type=int)
if __name__ == \'__main__\':
app = tornado.web.Application(web_handlers, **settings)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
cookie_secret session_secret 可以随便填写;
配置好程序源代码后运行,确认运行无误后再在公众号设置页面点击 提交 ,如果程序运行没问题,会显示接入成功。
3 接入图灵机器人
要接入图灵机器人,首先需要在官网申请API Key。
class TulingAutoReply:
def __init__(self, tuling_key, tuling_url):
self.key = tuling_key
self.url = tuling_url
def reply(self, unicode_str):
body = {\'key\': self.key, \'info\': unicode_str.encode(\'utf-8\')}
r = requests.post(self.url, data=body)
r.encoding = \'utf-8\'
resp = r.text
if resp is None or len(resp) == 0:
return None
try:
js = json.loads(resp)
if js[\'code\'] == 100000:
return js[\'text\'].replace(\'<br>\', \'\n\')
elif js[\'code\'] == 200000:
return js[\'url\']
else:
return None
except Exception:
traceback.print_exc()
return None
4 编写公众号自动回复代码
auto_reply = TulingAutoReply(key, url) # key和url填入自己申请到的图灵key以及图灵请求url
class WX(tornado.web.RequestHandler):
def wx_proc_msg(self, body):
try:
wechat.parse_data(body)
except ParseError:
print(\'Invalid Body Text\')
return
if isinstance(wechat.message, TextMessage): # 消息为文本消息
content = wechat.message.content
reply = auto_reply.reply(content)
if reply is not None:
return wechat.response_text(content=reply)
else:
return wechat.response_text(content=u"不知道你说的什么")
return wechat.response_text(content=u\'知道了\')
def post(self):
signature = self.get_argument(\'signature\', \'default\')
timestamp = self.get_argument(\'timestamp\', \'default\')
nonce = self.get_argument(\'nonce\', \'default\')
if signature != \'default\' and timestamp != \'default\' and nonce != \'default\' \
and wechat.check_signature(signature, timestamp, nonce):
body = self.request.body.decode(\'utf-8\')
try:
result = self.wx_proc_msg(body)
if result is not None:
self.write(result)
except IOError as e:
return
最终 wechat.py 代码如下:
import tornado.escape
import tornado.web
#from goose import Goose, ParseError
import json
import requests
import traceback
from wechat_sdk import WechatConf
conf = WechatConf(
token=\'your_token\', # 你的公众号Token
appid=\'your_appid\', # 你的公众号的AppID
appsecret=\'your_appsecret\', # 你的公众号的AppSecret
encrypt_mode=\'safe\', # 可选项:normal/compatible/safe,分别对应于 明文/兼容/安全 模式
encoding_aes_key=\'your_encoding_aes_key\' # 如果传入此值则必须保证同时传入 token, appid
)
from wechat_sdk import WechatBasic
wechat = WechatBasic(conf=conf)
class TulingAutoReply:
def __init__(self, tuling_key, tuling_url):
self.key = tuling_key
self.url = tuling_url
def reply(self, unicode_str):
body = {\'key\': self.key, \'info\': unicode_str.encode(\'utf-8\')}
r = requests.post(self.url, data=body)
r.encoding = \'utf-8\'
resp = r.text
if resp is None or len(resp) == 0:
return None
try:
js = json.loads(resp)
if js[\'code\'] == 100000:
return js[\'text\'].replace(\'<br>\', \'\n\')
elif js[\'code\'] == 200000:
return js[\'url\']
else:
return None
except Exception:
traceback.print_exc()
return None
auto_reply = TulingAutoReply(key, url) # key和url填入自己申请到的图灵key以及图灵请求url
class WX(tornado.web.RequestHandler):
def wx_proc_msg(self, body):
try:
wechat.parse_data(body)
except ParseError:
print(\'Invalid Body Text\')
return
if isinstance(wechat.message, TextMessage): # 消息为文本消息
content = wechat.message.content
reply = auto_reply.reply(content)
if reply is not None:
return wechat.response_text(content=reply)
else:
return wechat.response_text(content=u"不知道你说的什么")
return wechat.response_text(content=u\'知道了\')
def post(self):
signature = self.get_argument(\'signature\', \'default\')
timestamp = self.get_argument(\'timestamp\', \'default\')
nonce = self.get_argument(\'nonce\', \'default\')
if signature != \'default\' and timestamp != \'default\' and nonce != \'default\' \
and wechat.check_signature(signature, timestamp, nonce):
body = self.request.body.decode(\'utf-8\')
try:
result = self.wx_proc_msg(body)
if result is not None:
self.write(result)
except IOError as e:
return