【问题标题】:Serial Port Json Server - A porthole to TinyG - Programmers interface串行端口 Json 服务器 - TinyG 的端口 - 程序员接口
【发布时间】:2022-11-11 20:30:49
【问题描述】:

第一次在 StackExchange 作为提问者!希望我做对了。如果这是 TLDR,则为 Apols。

对于熟悉 TinyG SPJS 的人来说,你会知道我的上下文。对于其他人:“某人”创建了一个名为 SPJS(串行端口 Json 服务器)here 的程序,它有一个 websocket 接口。除其他外,它主要管理两件事

  1. 进出 TinyG CNC 控制器的串行端口流量。它管理完整性(不丢失数据)和命令缓冲,以确保向 TinyG 持续提供 G 代码命令,以确保平滑的 cnc 运动控制。
  2. 多个 pc CAM 客户端以 pub sub websocket 接口的形式。

    我的问题是 2 倍:

    1. 有谁知道在哪里可以找到我的 SPJS 协议 真的会感激吗?

    2. 我想使用 Python 编写自己的客户端。但是我在连接时遇到了一个问题:

      import asyncio
      
      import websockets as ws
      
      async def WebsocSPJSConnection():
         uri = "ws://localhost:8989"
         async with  ws.connect(uri) as webs:
            print(webs.recv())
      

      如果姓名== "主要的“: asyncio.run(WebsocSPJSConnection())

      我知道服务器在 SPJS 端看到连接尝试和失败,但是客户端只是超时:

      2022/10/25 12:07:48 cayenn.go:438: TCP Received  GET / HTTP/1.1
      Host: localhost:8988
      Upgrade: websocket
      Connection: Upgrade
      Sec-WebSocket-Key: Q3SDgBW/JHBXDE6ctEdC/g==
      Sec-WebSocket-Version: 13
      Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
      User-Agent: Python/3.9 websockets/10.3
      
        from  [::1]:60677
      Checking if from me [::1]<>10.71.193.1412022/10/25 12:07:48 cayenn.go:475: Err unmarshalling TCP inbound message from device. err: <nil>
      

      Wireshark 显示: Wireshark Trace

      似乎 SPJS 正在从我的客户那里收到一条未编组的消息。但是这是一个连接命令,除了上面的连接命令和 .有什么想法吗?

      注意 另一个客户端中的 Javascript 服务器似乎连接正确。如果我在端口 8988 上创建一个虚拟 python 服务器,一切正常。

【问题讨论】:

  • 我有一个doh时刻。我想到了。和所有问题一样,它的根源是令人尴尬的琐碎。我使用了错误的uri。它应该是“ws://localhost:8989/ws”。作为忏悔,当我完成后我会发布我的代码供其他人使用。

标签: connection webserver cnc


【解决方案1】:

按照承诺。我的忏悔是连接到 TinyG SPJS 并在 SPJS 上打开 com4 以连接到 TinyG 板的代码。原谅,这是一个更大代码的 sn-p,但它是独立的。

.kv 文件:

    CNCMainScreenBoxLayout:

<CNCMainScreenBoxLayout>:
    orientation: "horizontal"
    spacing: "20dp"
    BoxLayout:
        orientation: "vertical"
        Label:
            id: Status_Label
            size_hint: 1,0.09
            text: root.StatusText
            #on_update: root.on_Update()

    Button:
        background_color: (0, 0, 0, 0)
        size_hint: .4,1
        on_press: root.on_Click_Emergency_pressed()
        on_release: root.on_Click_Emergency_released()
        Image:
            id: EmergencyStop_Image
            source: "imagesEmergencyStop_released.png"
            size: self.parent.size
            background: None
            y: self.parent.y
            x: self.parent.x
            allow_stretch: True

和python文件:

from kivy.uix.widget import Widget
from kivy.app import App
from kivy.lang import Builder
from kivy.properties import StringProperty, ObjectProperty
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.uix.boxlayout import BoxLayout
import websocket
import _thread
import json
from kivy.support import install_twisted_reactor
install_twisted_reactor()

class CNCMainScreenBoxLayout(BoxLayout):
    StatusText = StringProperty("")

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def Set_Status(self,displaytxt):
        self.StatusText = displaytxt

    def Get_URI(self):
        return "ws://localhost:8989/ws"

    def on_Click_Emergency_pressed(self):
        self.ids.EmergencyStop_Image.source = "imagesEmergencyStop_Pressed.png"
        self.StatusText = "EMERGENCY STOP"

    def on_Click_Emergency_released(self):
        self.ids.EmergencyStop_Image.source = "imagesEmergencyStop_Released.png"

class KivyWebSocket(websocket.WebSocketApp):
    def __init__(self, *args, **kwargs):
        super(KivyWebSocket, self).__init__(*args, **kwargs)
        self.logger = Logger
        self.logger.info('WebSocket: logger initialized')

class WebSocketTest(App):
    ws = None
    MainScreen = None
    url = None
    layout = ObjectProperty(None)
    StoredSettingsStore = None

    def __init__(self, **kwargs):
        super(WebSocketTest, self).__init__(**kwargs)
        self.MainScreen = Builder.load_file("CNCViewSO.kv")
        self.layout = CNCMainScreenBoxLayout()
        self.url = "ws://localhost:8989/ws"
        self.ws = KivyWebSocket(self.url,
                           on_message=self.on_ws_message,
                           on_error=self.on_ws_error,
                           on_open=self.on_ws_open,
                           on_close=self.on_ws_close, )

        app = App.get_running_app()
        Clock.schedule_once(app.ws_connection)
        self.logger = Logger
        self.logger.info('App: initiallzed')

    def build(self):
        ##self.layout = CNCMainScreenBoxLayout()
        self.logger.info('Build: initiallzed')
        return self.layout

    def on_ws_message(self, ws, message):
        #self.layout.the_btn.text = message
        decoded = json.loads(message)
        #print(decoded)
        for keys in decoded:
            #print(keys)
            if keys == "Version":
                print("Version is the version of the SPJS
	" + decoded[keys])
                self.layout.StatusText = "Connected to SPJS Version " + decoded[keys] + " at " + str(self.url)
            elif keys == "Hostname":
                print("Hostname is the hostname of the computer the SPJS is running on
	" + decoded[keys])
            elif keys == "Commands":
                print("Commands you can send to the server")
                for cmd in decoded[keys]:
                    print("	command --> 	", str(cmd))
                self.ws_send_ComConnect("Com4")
            elif keys == "Cmd":
                print(keys)

            else:
                print("Unhandled key: " + keys)


    def on_ws_error(self, ws, error):
        self.logger.info('WebSocket: [ERROR]  {}'.format(error))

    def ws_connection(self, dt, **kwargs):
        # start a new thread connected to the web socket

        _thread.start_new_thread(self.ws.run_forever, ())

    def ws_send_ComConnect(self,comPort):
        Comstring = "Open " + comPort + " 115200 tinyg"
        self.ws.send(Comstring)

    def on_ws_open(self, ws):
        pass
        """def run(*args):
            for i in range(1, 13):
                time.sleep(1)
                ws.send('version')
            time.sleep(10)
            ws.close()
        _thread.start_new_thread(run, ())"""

    def on_ws_close(self, ws):
        self.layout.StatusText = '### closed ###'

# Press the green button in the gutter to run the script.
if __name__ == '__main__':

    WebSocketTest().run()

要运行,您需要在 ./images 中为按钮创建一个图像文件。以下是按钮文件:

emergencystop_released.png emergencystop_pressed.png

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-09
    • 1970-01-01
    • 1970-01-01
    • 2020-12-12
    相关资源
    最近更新 更多