【发布时间】:2015-08-22 08:39:41
【问题描述】:
我正在尝试连接到 wss://api.poloniex.com 并订阅ticker。我在 python 中找不到任何工作示例。我曾尝试使用高速公路/扭曲和 websocket-client 0.32.0。
这样做的目的是获取实时代码数据并将其存储在 mysql 数据库中。
到目前为止,我已尝试使用库文档中提供的示例。它们适用于 localhost 或测试服务器,但如果我更改为 wss://api.poloniex.com,我会收到一堆错误。
这是我使用 websocket-client 0.32.0 的尝试:
from websocket import create_connection
ws = create_connection("wss://api.poloniex.com")
ws.send("ticker")
result = ws.recv()
print "Received '%s'" % result
ws.close()
这是使用高速公路/扭曲:
from autobahn.twisted.websocket import WebSocketClientProtocol
from autobahn.twisted.websocket import WebSocketClientFactory
class MyClientProtocol(WebSocketClientProtocol):
def onConnect(self, response):
print("Server connected: {0}".format(response.peer))
def onOpen(self):
print("WebSocket connection open.")
def hello():
self.sendMessage(u"ticker".encode('utf8'))
self.sendMessage(b"\x00\x01\x03\x04", isBinary=True)
self.factory.reactor.callLater(1, hello)
# start sending messages every second ..
hello()
def onMessage(self, payload, isBinary):
if isBinary:
print("Binary message received: {0} bytes".format(len(payload)))
else:
print("Text message received: {0}".format(payload.decode('utf8')))
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
if __name__ == '__main__':
import sys
from twisted.python import log
from twisted.internet import reactor
log.startLogging(sys.stdout)
factory = WebSocketClientFactory("wss://api.poloniex.com", debug=False)
factory.protocol = MyClientProtocol
reactor.connectTCP("wss://api.poloniex.com", 9000, factory)
reactor.run()
非常感谢一个完整但简单的示例,展示如何使用任何 python 库连接和订阅 websocket 推送 api。
【问题讨论】:
标签: api python-3.x websocket