< >
Home » Farmbot开发入门教程 » farmbot开发入门教程-消息代理示例

farmbot开发入门教程-消息代理示例

说明:

  • 介绍使用 Python 直接与 FarmBot 通信

  • 这些示例使用了CeleryScript和Message Broker中的概念。

显示 FarmBot 通信

  • 通过 Python 或 FarmBot Web App 前端监听 FarmBot 的活动和请求。在浏览其余示例时,运行此程序会很有帮助。FarmBot 和 Web App 之间的所有通信都包含 FarmBot 状态更新;查看此示例的输出将显示下一个示例中可供跟踪的许多数据字段。

  • 通过Python

import json
from datetime import datetime
import paho.mqtt.client as mqtt

# TOKEN = ...

def on_connect(client, *_args):
    # subscribe to all channels
    client.subscribe(f"bot/{TOKEN['token']['unencoded']['bot']}/#")
    print('connected')

def on_message(_client, _userdata, msg):
    print('-' * 100)
    # print channel
    print(f'{msg.topic} ({datetime.now().strftime("%Y-%m-%d %H:%M:%S")})\n')
    # print message
    print(json.dumps(json.loads(msg.payload), indent=4))

client = mqtt.Client()
client.username_pw_set(
    TOKEN['token']['unencoded']['bot'],
    password=TOKEN['token']['encoded'])
client.connect(TOKEN['token']['unencoded']['mqtt'])
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
  • 如果您的 FarmBot 在线,您应该会看到状态更新流。输入 <Ctrl + C> 停止。

跟踪特定数据更新

  • 通过 FarmBot Python 库
import json
from farmbot import Farmbot

# TOKEN = ...

fb = Farmbot(json.dumps(TOKEN))

class MyHandler():
    def on_connect(self, bot, _mqtt_client): print('connected')
    def on_change(self, _bot, state):
        x = state['location_data']['position']['x'] or 0
        y = state['location_data']['position']['y'] or 0
        z = state['location_data']['position']['z'] or 0
        print(f'Current position: ({x:.2f}, {y:.2f}, {z:.2f})')
    def on_log(*args): pass
    def on_error(*args): pass
    def on_response(*args): pass

fb.connect(MyHandler())

  • 如果您的 FarmBot 在线,您应该会看到位置报告。输入 <Ctrl + C> 停止。

  • 通过Python

import json
from datetime import datetime
import paho.mqtt.client as mqtt

# TOKEN = ...

def on_connect(client, *_args):
    # subscribe to the status channel
    client.subscribe(f"bot/{TOKEN['token']['unencoded']['bot']}/status")
    print('connected')

def on_message(_client, _userdata, msg):
    state = json.loads(msg.payload)
    x = state['location_data']['position']['x'] or 0
    y = state['location_data']['position']['y'] or 0
    z = state['location_data']['position']['z'] or 0
    print(f'Current position: ({x:.2f}, {y:.2f}, {z:.2f})')

client = mqtt.Client()
client.username_pw_set(
    TOKEN['token']['unencoded']['bot'],
    password=TOKEN['token']['encoded'])
client.connect(TOKEN['token']['unencoded']['mqtt'])
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
  • 如果您的 FarmBot 在线,您应该会看到位置报告。输入 <Ctrl + C> 停止。

  • 实施速率限制。如果您计划发送多条消息(5 分钟内发送超过 10 条消息),请参阅标题为发送多条消息的部分。

发送一条消息

  • 通过 FarmBot Python 库
import json
from farmbot import Farmbot

# TOKEN = ...

fb = Farmbot(json.dumps(TOKEN))

class MyHandler():
    def on_connect(self, bot, _mqtt_client):
        print('connected')
        bot.send_message('Hello World!')
        print('message sent')

    def on_change(*args): pass
    def on_log(*args): pass
    def on_error(*args): pass
    def on_response(*args): pass

fb.connect(MyHandler())
  • 您应该在输出中看到“已连接”和“已发送消息”。输入 <Ctrl + C> 停止。

  • 有关 FarmBot Python 库可用的 FarmBot 命令列表,请参阅支持的命令列表。

  • 通过Python

import json
import paho.mqtt.publish as publish

# TOKEN = ...

# prepare the Celery Script command
message = {
    "kind": "rpc_request",
    "args": {
        "label": "abcdef"
    },
    "body": [{
        'kind': 'send_message',
        'args': {
            'message': 'Hello World!',
            'message_type': 'success'
        }
    }]
}

# send the command to the device
publish.single(
    f'bot/{TOKEN['token']['unencoded']['bot']}/from_clients',
    payload=json.dumps(message),
    hostname=TOKEN['token']['unencoded']['mqtt'],
    auth={
        'username': TOKEN['token']['unencoded']['bot'],
        'password': TOKEN['token']['encoded']
        }
    )
print('message sent')
  • 您应该在输出中看到“已连接”和“消息已发送”。

  • 有关 Celery Script 命令的列表,请参阅可用节点列表。

发送多条消息

  • 通过 FarmBot Python 库
import json
from time import sleep
from threading import Thread
from farmbot import Farmbot

# TOKEN = ...

fb = Farmbot(json.dumps(TOKEN))

class MyHandler():
    def __init__(self, bot):
        self.bot = bot
        self.connected = False
    def on_connect(self, bot, _mqtt_client):
        print('connected')
        self.connected = True
    def on_change(*args): pass
    def on_log(*args): pass
    def on_error(*args): pass
    def on_response(*args): pass
    def disconnect(self):
        self.bot._connection.mqtt.disconnect()
        print('disconnected')
        print()

handler = MyHandler(fb)
Thread(target=fb.connect, name="thread", args=[handler]).start()
while not handler.connected:
    print('.', end='', flush=True)
    sleep(0.1)
handler.bot.send_message('Hello World!')
handler.bot.send_message('second message')
print('messages sent')
handler.disconnect()
  • 您应该在输出中看到“已连接”、“已发送消息”和“已断开连接”。

  • 通过Python

import json
from time import sleep
import paho.mqtt.client as mqtt

# TOKEN = ...

# prepare the Celery Script command
def prepare_message(message):
  return {
      "kind": "rpc_request",
      "args": {
          "label": "abcdef"
      },
      "body": [{
          'kind': 'send_message',
          'args': {
              'message': message,
              'message_type': 'success'
          }
      }]
  }

client = mqtt.Client()
client.username_pw_set(
    TOKEN['token']['unencoded']['bot'],
    password=TOKEN['token']['encoded'])
client.connect(TOKEN['token']['unencoded']['mqtt'])
print('connected')
channel = f'bot/{TOKEN['token']['unencoded']['bot']}/from_clients'
client.publish(channel, json.dumps(prepare_message('Hello World!')))
client.publish(channel, json.dumps(prepare_message('second message')))
print('messages sent')
sleep(1)
client.disconnect()
print('disconnected')
  • 您应该在输出中看到“已连接”、“已发送消息”和“已断开连接”。

其他 FarmBot Python 库示例:

  • 移至坐标并发送消息
  • 使用键盘输入移动 FarmBot

纠错,疑问,交流: 请进入讨论区点击加入Q群

获取最新文章: 扫一扫右上角的二维码加入“创客智造”公众号


标签: none