【发布时间】:2019-12-01 23:43:14
【问题描述】:
我目前尝试使用 zeroMQ 设置一个简单的 Push-Pull-Socket 架构,而 Metatrader 4 (MQL) 充当 publisher,而我的 Python 后端充当 consumer。
我每秒从 Metatrader 4 终端推送数据,效果很好。但是,我很难在 pull socket 上接收数据。一旦我尝试从网络中提取该数据,原子script 包就会引发错误address already in use。
在开发过程中,我在本地计算机上同时运行 MT 4 终端和 Python 脚本。
元交易者 4:
extern string PROJECT_NAME = "Dashex.Feeder";
extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "*";
extern int PUSH_PORT = 32220;
extern string t0 = "--- Feeder Parameters ---";
input string DID = "insert your DID here";
extern string t1 = "--- ZeroMQ Configuration ---";
extern bool Publish_MarketData = false;
// ZeroMQ environment //
// CREATE ZeroMQ Context
Context context(PROJECT_NAME);
// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);
string Publish_Symbols[7] = {
"EURUSD","GBPUSD","USDJPY","USDCAD","AUDUSD","NZDUSD","USDCHF"
};
//+------------------------------------------------------------------+
//| Expert initialization function |
//+------------------------------------------------------------------+
int OnInit()
{
//---
EventSetTimer(1); // Set Millisecond Timer to get client socket input
context.setBlocky(false);
// Send responses to PULL_PORT that client is listening on.
Print("[PUSH] Connecting MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pushSocket.connect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
pushSocket.setSendHighWaterMark(1);
pushSocket.setLinger(0);
//---
return(INIT_SUCCEEDED);
}
//+------------------------------------------------------------------+
//| Expert deinitialization function |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
//---
Print("[PUSH] Disconnecting MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
// Shutdown ZeroMQ Context
context.shutdown();
context.destroy(0);
EventKillTimer();
}
//+------------------------------------------------------------------+
//| Expert tick function |
//+------------------------------------------------------------------+
void OnTimer()
{
/*
Use this OnTimer() function to send market data to consumer.
*/
if(!IsStopped() && Publish_MarketData == true)
{
for(int s = 0; s < ArraySize(Publish_Symbols); s++)
{
// Python clients can subscribe to a price feed by setting
// socket options to the symbol name. For example:
string _tick = GetBidAsk(Publish_Symbols[s]);
Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUSH Socket");
ZmqMsg reply(StringFormat("%s %s", Publish_Symbols[s], _tick));
pushSocket.send(reply, true);
}
}
}
//+------------------------------------------------------------------+
string GetBidAsk(string symbol) {
MqlTick last_tick;
if(SymbolInfoTick(symbol,last_tick))
{
return(StringFormat("%f;%f", last_tick.bid, last_tick.ask));
}
// Default
return "";
}
推送数据按预期工作:
基于 Python 的拉式套接字:
import zmq
import time
context = zmq.Context()
zmq_socket = context.socket(zmq.PULL)
zmq_socket.bind("tcp://*:32220")
time.sleep(1)
while True:
result = zmq_socket.recv()
print(result)
time.sleep(1)
这是script 在控制台中报告的内容:
Netstat 输出:
注意:当我终止 Metatrader 推送脚本和 python 脚本时,端口在 netstats 中仍被标记为“已侦听”。当我在两个实例中将端口更改为 32225 (或任何其他)并重新运行它们时,我再次收到相同的错误。如果我首先运行拉动实例,我会在原子script 中弹出一个沙漏,然后当我运行 MT4 推动实例时,拉动侧没有任何反应。然后,当我重新运行拉取实例时,我再次遇到相同的错误。
更新:
后台的python.exe 实例占用了端口。我关闭了python执行,端口再次被释放。当我现在运行我的拉取实例时,我会收到以下控制台反馈:
1.)
2.)
然后我运行推送实例,它工作得很好。
3.)
pull 实例仍然显示沙漏,并且不会将任何数据打印到控制台中:
4.)
当我重新运行拉取实例时,它会引发错误 address in use,这现在是有意义的,因为 Python 仍在后台使用该端口。
但是为什么pull端没有打印任何数据?我是否必须更改pull客户端代码才能“抓取”推送的数据?
【问题讨论】:
-
您确定没有任何东西已经在端口
32200上侦听吗?起初我以为错误可能是由于套接字仍处于TIME_WAIT状态,但我可以连续多次运行相当于您的接收器而不会出现错误。 -
我现在在端口
32200上推送数据的推送套接字和绑定到同一个套接字的拉套接字。但不是在拉面打印结果,而是告诉我address is in use(添加了netstats pic)。我确信在端口 32220 上没有其他监听,因为我只在重启后启动推送和拉取实例,没有其他程序等。 -
@larsks 我更新了最初的帖子。我解决了端口问题,但无法“看到”拉方推送的数据。我是否需要明智地更改拉取实例代码才能“抓取”推送的数据并将其打印到控制台?谢谢!