farmbot开发入门教程-消息代理示例
说明:
介绍使用 Python 直接与 FarmBot 通信
这些示例使用了CeleryScript和Message Broker中的概念。
显示 FarmBot 通信
通过 Python 或 FarmBot Web App 前端监听 FarmBot 的活动和请求。在浏览其余示例时,运行此程序会很有帮助。FarmBot 和 Web App 之间的所有通信都包含 FarmBot 状态更新;查看此示例的输出将显示下一个示例中可供跟踪的许多数据字段。
通过Python
import json
from datetime import datetime
import paho.mqtt.client as mqtt
# TOKEN = ...
def on_connect(client, *_args):
# subscribe to all channels
client.subscribe(f"bot/{TOKEN['token']['unencoded']['bot']}/#")
print('connected')
def on_message(_client, _userdata, msg):
print('-' * 100)
# print channel
print(f'{msg.topic} ({datetime.now().strftime("%Y-%m-%d %H:%M:%S")})\n')
# print message
print(json.dumps(json.loads(msg.payload), indent=4))
client = mqtt.Client()
client.username_pw_set(
TOKEN['token']['unencoded']['bot'],
password=TOKEN['token']['encoded'])
client.connect(TOKEN['token']['unencoded']['mqtt'])
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
- 如果您的 FarmBot 在线,您应该会看到状态更新流。输入 <Ctrl + C> 停止。
跟踪特定数据更新
- 通过 FarmBot Python 库
import json
from farmbot import Farmbot
# TOKEN = ...
fb = Farmbot(json.dumps(TOKEN))
class MyHandler():
def on_connect(self, bot, _mqtt_client): print('connected')
def on_change(self, _bot, state):
x = state['location_data']['position']['x'] or 0
y = state['location_data']['position']['y'] or 0
z = state['location_data']['position']['z'] or 0
print(f'Current position: ({x:.2f}, {y:.2f}, {z:.2f})')
def on_log(*args): pass
def on_error(*args): pass
def on_response(*args): pass
fb.connect(MyHandler())
如果您的 FarmBot 在线,您应该会看到位置报告。输入 <Ctrl + C> 停止。
通过Python
import json
from datetime import datetime
import paho.mqtt.client as mqtt
# TOKEN = ...
def on_connect(client, *_args):
# subscribe to the status channel
client.subscribe(f"bot/{TOKEN['token']['unencoded']['bot']}/status")
print('connected')
def on_message(_client, _userdata, msg):
state = json.loads(msg.payload)
x = state['location_data']['position']['x'] or 0
y = state['location_data']['position']['y'] or 0
z = state['location_data']['position']['z'] or 0
print(f'Current position: ({x:.2f}, {y:.2f}, {z:.2f})')
client = mqtt.Client()
client.username_pw_set(
TOKEN['token']['unencoded']['bot'],
password=TOKEN['token']['encoded'])
client.connect(TOKEN['token']['unencoded']['mqtt'])
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
如果您的 FarmBot 在线,您应该会看到位置报告。输入 <Ctrl + C> 停止。
实施速率限制。如果您计划发送多条消息(5 分钟内发送超过 10 条消息),请参阅标题为发送多条消息的部分。
发送一条消息
- 通过 FarmBot Python 库
import json
from farmbot import Farmbot
# TOKEN = ...
fb = Farmbot(json.dumps(TOKEN))
class MyHandler():
def on_connect(self, bot, _mqtt_client):
print('connected')
bot.send_message('Hello World!')
print('message sent')
def on_change(*args): pass
def on_log(*args): pass
def on_error(*args): pass
def on_response(*args): pass
fb.connect(MyHandler())
您应该在输出中看到“已连接”和“已发送消息”。输入 <Ctrl + C> 停止。
有关 FarmBot Python 库可用的 FarmBot 命令列表,请参阅支持的命令列表。
通过Python
import json
import paho.mqtt.publish as publish
# TOKEN = ...
# prepare the Celery Script command
message = {
"kind": "rpc_request",
"args": {
"label": "abcdef"
},
"body": [{
'kind': 'send_message',
'args': {
'message': 'Hello World!',
'message_type': 'success'
}
}]
}
# send the command to the device
publish.single(
f'bot/{TOKEN['token']['unencoded']['bot']}/from_clients',
payload=json.dumps(message),
hostname=TOKEN['token']['unencoded']['mqtt'],
auth={
'username': TOKEN['token']['unencoded']['bot'],
'password': TOKEN['token']['encoded']
}
)
print('message sent')
您应该在输出中看到“已连接”和“消息已发送”。
有关 Celery Script 命令的列表,请参阅可用节点列表。
发送多条消息
- 通过 FarmBot Python 库
import json
from time import sleep
from threading import Thread
from farmbot import Farmbot
# TOKEN = ...
fb = Farmbot(json.dumps(TOKEN))
class MyHandler():
def __init__(self, bot):
self.bot = bot
self.connected = False
def on_connect(self, bot, _mqtt_client):
print('connected')
self.connected = True
def on_change(*args): pass
def on_log(*args): pass
def on_error(*args): pass
def on_response(*args): pass
def disconnect(self):
self.bot._connection.mqtt.disconnect()
print('disconnected')
print()
handler = MyHandler(fb)
Thread(target=fb.connect, name="thread", args=[handler]).start()
while not handler.connected:
print('.', end='', flush=True)
sleep(0.1)
handler.bot.send_message('Hello World!')
handler.bot.send_message('second message')
print('messages sent')
handler.disconnect()
您应该在输出中看到“已连接”、“已发送消息”和“已断开连接”。
通过Python
import json
from time import sleep
import paho.mqtt.client as mqtt
# TOKEN = ...
# prepare the Celery Script command
def prepare_message(message):
return {
"kind": "rpc_request",
"args": {
"label": "abcdef"
},
"body": [{
'kind': 'send_message',
'args': {
'message': message,
'message_type': 'success'
}
}]
}
client = mqtt.Client()
client.username_pw_set(
TOKEN['token']['unencoded']['bot'],
password=TOKEN['token']['encoded'])
client.connect(TOKEN['token']['unencoded']['mqtt'])
print('connected')
channel = f'bot/{TOKEN['token']['unencoded']['bot']}/from_clients'
client.publish(channel, json.dumps(prepare_message('Hello World!')))
client.publish(channel, json.dumps(prepare_message('second message')))
print('messages sent')
sleep(1)
client.disconnect()
print('disconnected')
- 您应该在输出中看到“已连接”、“已发送消息”和“已断开连接”。
其他 FarmBot Python 库示例:
- 移至坐标并发送消息
- 使用键盘输入移动 FarmBot
获取最新文章: 扫一扫右上角的二维码加入“创客智造”公众号