添加农业环境模块,集成 MQTT 客户端功能,支持配置参数和数据发布

This commit is contained in:
Wang Beihong
2026-04-22 01:21:26 +08:00
parent ffdb7065e3
commit 811b47d274
8 changed files with 425 additions and 38 deletions

View File

@@ -0,0 +1,5 @@
idf_component_register(
SRCS "agri_env.c"
INCLUDE_DIRS "include"
REQUIRES driver mqtt cjson esp_timer esp_event relay_ctrl
)

View File

@@ -0,0 +1,26 @@
menu "MQTT 配置参数"
config AGRI_ENV_MQTT_BROKER_URI
string "MQTT 服务器地址"
default ""
config AGRI_ENV_MQTT_USERNAME
string "MQTT 用户名"
default ""
config AGRI_ENV_MQTT_PASSWORD
string "MQTT 密码"
default ""
config AGRI_ENV_MQTT_CLIENT_ID
string "MQTT Client ID"
default "agri-env-monitor"
config AGRI_ENV_MQTT_PUBLISH_TOPIC
string "MQTT 发布主题"
default "agri/env/data"
config AGRI_ENV_MQTT_SUBSCRIBE_TOPIC
string "MQTT 订阅主题"
default "agri/env/cmd"
endmenu

View File

@@ -0,0 +1,261 @@
#include <stdio.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/semphr.h"
#include "esp_check.h"
#include "esp_err.h"
#include "esp_event.h"
#include "esp_log.h"
#include "mqtt_client.h"
#include "cJSON.h"
#include "relay_ctrl.h"
#include "agri_env.h"
static const char *TAG = "agri_env";
/**
* @brief 农业环境模块上下文结构体
* 包含 MQTT 客户端句柄、连接状态以及用于线程安全的互斥锁
*/
typedef struct {
SemaphoreHandle_t lock; /*!< 互斥锁,保护共享资源 */
esp_mqtt_client_handle_t mqtt_client; /*!< ESP MQTT 客户端句柄 */
bool mqtt_connected; /*!< MQTT 连接状态标识 */
} agri_env_ctx_t;
static agri_env_ctx_t s_ctx;
/**
* @brief 规范化 MQTT 代理 URI
*
* 将 menuconfig 中的原始字符串处理为标准形式(例如加上 mqtt:// 前缀或默认端口 1883
*
* @param buffer 存储结果的缓冲区
* @param buffer_size 缓冲区大小
* @param was_prefixed [out] 如果进行了修整或添加前缀,则设为 true
* @return const char* 返回规范化后的字符串指针,失败返回 NULL
*/
static const char *agri_env_get_normalized_mqtt_uri(char *buffer, size_t buffer_size, bool *was_prefixed)
{
const char *uri = CONFIG_AGRI_ENV_MQTT_BROKER_URI;
if (was_prefixed != NULL) {
*was_prefixed = false;
}
if (uri == NULL || uri[0] == '\0') {
return NULL;
}
// 去除前导空白字符
while (*uri == ' ' || *uri == '\t' || *uri == '\r' || *uri == '\n') {
++uri;
}
size_t uri_len = strlen(uri);
// 去除尾部空白字符
while (uri_len > 0 && (uri[uri_len - 1] == ' ' || uri[uri_len - 1] == '\t' || uri[uri_len - 1] == '\r' || uri[uri_len - 1] == '\n')) {
--uri_len;
}
if (uri_len == 0) {
return NULL;
}
// 如果已经包含协议前缀 (如 mqtt://), 直接复制并返回
if (strstr(uri, "://") != NULL) {
if (uri_len + 1 >= buffer_size) {
return NULL;
}
memcpy(buffer, uri, uri_len);
buffer[uri_len] = '\0';
return buffer;
}
// 自动补全协议前缀和默认端口
const bool has_port = memchr(uri, ':', uri_len) != NULL;
int written;
if (has_port) {
written = snprintf(buffer, buffer_size, "mqtt://%.*s", (int)uri_len, uri);
} else {
written = snprintf(buffer, buffer_size, "mqtt://%.*s:1883", (int)uri_len, uri);
}
if (written < 0 || written >= (int)buffer_size) {
return NULL;
}
if (was_prefixed != NULL) {
*was_prefixed = true;
}
return buffer;
}
/**
* @brief MQTT 事件处理回调函数
*
* 处理连接、断开、接收数据等事件
*/
static void agri_env_mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
(void)handler_args;
(void)base;
esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data;
if (event == NULL) {
return;
}
if (event_id == MQTT_EVENT_CONNECTED) {
xSemaphoreTake(s_ctx.lock, portMAX_DELAY);
s_ctx.mqtt_connected = true;
xSemaphoreGive(s_ctx.lock);
ESP_LOGI(TAG, "MQTT 已连接");
// 连接成功后订阅配置的主题
if (strlen(CONFIG_AGRI_ENV_MQTT_SUBSCRIBE_TOPIC) > 0) {
esp_mqtt_client_subscribe(s_ctx.mqtt_client, CONFIG_AGRI_ENV_MQTT_SUBSCRIBE_TOPIC, 0);
ESP_LOGI(TAG, "已订阅主题: %s", CONFIG_AGRI_ENV_MQTT_SUBSCRIBE_TOPIC);
}
} else if (event_id == MQTT_EVENT_DISCONNECTED) {
xSemaphoreTake(s_ctx.lock, portMAX_DELAY);
s_ctx.mqtt_connected = false;
xSemaphoreGive(s_ctx.lock);
ESP_LOGW(TAG, "MQTT 已断开连接");
} else if (event_id == MQTT_EVENT_DATA) {
ESP_LOGI(TAG, "收到 MQTT 消息: topic=%.*s payload=%.*s",
event->topic_len,
event->topic,
event->data_len,
event->data);
}
}
/**
* @brief 启动 MQTT 客户端
*/
esp_err_t agri_env_mqtt_start(void)
{
if (s_ctx.lock == NULL) {
s_ctx.lock = xSemaphoreCreateMutex();
ESP_RETURN_ON_FALSE(s_ctx.lock != NULL, ESP_ERR_NO_MEM, TAG, "互斥锁创建失败");
}
if (s_ctx.mqtt_client != NULL) {
return ESP_OK;
}
char mqtt_uri[256];
bool mqtt_uri_prefixed = false;
const char *normalized_uri = agri_env_get_normalized_mqtt_uri(mqtt_uri, sizeof(mqtt_uri), &mqtt_uri_prefixed);
if (normalized_uri == NULL) {
ESP_LOGW(TAG, "MQTT Broker URI 为空,请在 menuconfig 中填写");
return ESP_ERR_INVALID_STATE;
}
if (mqtt_uri_prefixed) {
ESP_LOGW(TAG, "MQTT Broker URI 已规范化为: %s", normalized_uri);
}
// MQTT 客户端配置
const esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = normalized_uri,
.credentials.client_id = CONFIG_AGRI_ENV_MQTT_CLIENT_ID,
.credentials.username = CONFIG_AGRI_ENV_MQTT_USERNAME,
.credentials.authentication.password = CONFIG_AGRI_ENV_MQTT_PASSWORD,
.session.protocol_ver = MQTT_PROTOCOL_V_3_1, // 使用 MQTT v3.1 协议
};
s_ctx.mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
ESP_RETURN_ON_FALSE(s_ctx.mqtt_client != NULL, ESP_FAIL, TAG, "MQTT 客户端初始化失败");
// 注册所有 MQTT 事件的回调
ESP_RETURN_ON_ERROR(
esp_mqtt_client_register_event(s_ctx.mqtt_client, MQTT_EVENT_ANY, agri_env_mqtt_event_handler, NULL),
TAG,
"MQTT 注册事件失败");
// 启动 MQTT 客户端任务
ESP_RETURN_ON_ERROR(esp_mqtt_client_start(s_ctx.mqtt_client), TAG, "MQTT 启动失败");
return ESP_OK;
}
/**
* @brief 停止并销毁 MQTT 客户端
*/
esp_err_t agri_env_mqtt_stop(void)
{
if (s_ctx.mqtt_client == NULL) {
return ESP_OK;
}
esp_err_t err = esp_mqtt_client_stop(s_ctx.mqtt_client);
if (err != ESP_OK) {
return err;
}
err = esp_mqtt_client_destroy(s_ctx.mqtt_client);
if (err != ESP_OK) {
return err;
}
xSemaphoreTake(s_ctx.lock, portMAX_DELAY);
s_ctx.mqtt_client = NULL;
s_ctx.mqtt_connected = false;
xSemaphoreGive(s_ctx.lock);
return ESP_OK;
}
/**
* @brief 检查 MQTT 是否已成功连接
*
* @return true 已连接, false 未连接
*/
bool agri_env_mqtt_is_connected(void)
{
bool connected = false;
if (s_ctx.lock == NULL) {
return false;
}
xSemaphoreTake(s_ctx.lock, portMAX_DELAY);
connected = s_ctx.mqtt_connected;
xSemaphoreGive(s_ctx.lock);
return connected;
}
/**
* @brief 发布数据到指定主题
*
* 当前处于“仅 MQTT”模式发布内容为固定的 JSON 数据:{"mode":"mqtt_only"}
*
* @param topic 目标主题
* @param qos 服务质量等级
* @param retain 保留消息标识
* @return esp_err_t 成功返回 ESP_OK失败返回相应错误码
*/
esp_err_t agri_env_mqtt_publish(const char *topic, const char *payload, int qos, int retain)
{
ESP_RETURN_ON_FALSE(topic != NULL && topic[0] != '\0', ESP_ERR_INVALID_ARG, TAG, "主题为空");
ESP_RETURN_ON_FALSE(payload != NULL, ESP_ERR_INVALID_ARG, TAG, "内容为空");
ESP_RETURN_ON_FALSE(s_ctx.mqtt_client != NULL, ESP_ERR_INVALID_STATE, TAG, "MQTT 客户端未启动");
ESP_RETURN_ON_FALSE(agri_env_mqtt_is_connected(), ESP_ERR_INVALID_STATE, TAG, "MQTT 未连接");
int msg_id = esp_mqtt_client_publish(s_ctx.mqtt_client, topic, payload, 0, qos, retain);
ESP_RETURN_ON_FALSE(msg_id >= 0, ESP_FAIL, TAG, "MQTT 发布失败");
return ESP_OK;
}
esp_err_t agri_env_mqtt_publish_latest(const char *topic, int qos, int retain)
{
static const char *payload = "{\"mode\":\"mqtt_only\"}";
return agri_env_mqtt_publish(topic, payload, qos, retain);
}

View File

@@ -0,0 +1,25 @@
#pragma once
#include <stdbool.h>
#include <stdint.h>
#include "esp_err.h"
#ifdef __cplusplus
extern "C" {
#endif
/* 启动 MQTT 客户端连接。 */
esp_err_t agri_env_mqtt_start(void);
/* 停止 MQTT 客户端连接并释放资源。 */
esp_err_t agri_env_mqtt_stop(void);
/* 查询 MQTT 当前是否已连接。 */
bool agri_env_mqtt_is_connected(void);
/* 发布指定的 JSON 载荷到指定主题。 */
esp_err_t agri_env_mqtt_publish(const char *topic, const char *payload, int qos, int retain);
/* 发布固定 MQTT-only 心跳载荷到指定主题。 */
esp_err_t agri_env_mqtt_publish_latest(const char *topic, int qos, int retain);
#ifdef __cplusplus
}
#endif