#include #include #include "freertos/FreeRTOS.h" #include "freertos/semphr.h" #include "esp_check.h" #include "esp_err.h" #include "esp_event.h" #include "esp_log.h" #include "mqtt_client.h" #include "cJSON.h" #include "relay_ctrl.h" #include "agri_env.h" static const char *TAG = "agri_env"; /** * @brief 农业环境模块上下文结构体 * 包含 MQTT 客户端句柄、连接状态以及用于线程安全的互斥锁 */ typedef struct { SemaphoreHandle_t lock; /*!< 互斥锁,保护共享资源 */ esp_mqtt_client_handle_t mqtt_client; /*!< ESP MQTT 客户端句柄 */ bool mqtt_connected; /*!< MQTT 连接状态标识 */ } agri_env_ctx_t; static agri_env_ctx_t s_ctx; static agri_env_mqtt_cmd_cb_t s_mqtt_cmd_cb = NULL; void agri_env_set_mqtt_cmd_cb(agri_env_mqtt_cmd_cb_t cb) { s_mqtt_cmd_cb = cb; } /** * @brief 规范化 MQTT 代理 URI * * 将 menuconfig 中的原始字符串处理为标准形式(例如加上 mqtt:// 前缀或默认端口 1883) * * @param buffer 存储结果的缓冲区 * @param buffer_size 缓冲区大小 * @param was_prefixed [out] 如果进行了修整或添加前缀,则设为 true * @return const char* 返回规范化后的字符串指针,失败返回 NULL */ static const char *agri_env_get_normalized_mqtt_uri(char *buffer, size_t buffer_size, bool *was_prefixed) { const char *uri = CONFIG_AGRI_ENV_MQTT_BROKER_URI; if (was_prefixed != NULL) { *was_prefixed = false; } if (uri == NULL || uri[0] == '\0') { return NULL; } // 去除前导空白字符 while (*uri == ' ' || *uri == '\t' || *uri == '\r' || *uri == '\n') { ++uri; } size_t uri_len = strlen(uri); // 去除尾部空白字符 while (uri_len > 0 && (uri[uri_len - 1] == ' ' || uri[uri_len - 1] == '\t' || uri[uri_len - 1] == '\r' || uri[uri_len - 1] == '\n')) { --uri_len; } if (uri_len == 0) { return NULL; } // 如果已经包含协议前缀 (如 mqtt://), 直接复制并返回 if (strstr(uri, "://") != NULL) { if (uri_len + 1 >= buffer_size) { return NULL; } memcpy(buffer, uri, uri_len); buffer[uri_len] = '\0'; return buffer; } // 自动补全协议前缀和默认端口 const bool has_port = memchr(uri, ':', uri_len) != NULL; int written; if (has_port) { written = snprintf(buffer, buffer_size, "mqtt://%.*s", (int)uri_len, uri); } else { written = snprintf(buffer, buffer_size, "mqtt://%.*s:1883", (int)uri_len, uri); } if (written < 0 || written >= (int)buffer_size) { return NULL; } if (was_prefixed != NULL) { *was_prefixed = true; } return buffer; } /** * @brief MQTT 事件处理回调函数 * * 处理连接、断开、接收数据等事件 */ static void agri_env_mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { (void)handler_args; (void)base; esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data; if (event == NULL) { return; } if (event_id == MQTT_EVENT_CONNECTED) { xSemaphoreTake(s_ctx.lock, portMAX_DELAY); s_ctx.mqtt_connected = true; xSemaphoreGive(s_ctx.lock); ESP_LOGI(TAG, "MQTT 已连接"); // 连接成功后订阅配置的主题 if (strlen(CONFIG_AGRI_ENV_MQTT_SUBSCRIBE_TOPIC) > 0) { esp_mqtt_client_subscribe(s_ctx.mqtt_client, CONFIG_AGRI_ENV_MQTT_SUBSCRIBE_TOPIC, 0); ESP_LOGI(TAG, "已订阅主题: %s", CONFIG_AGRI_ENV_MQTT_SUBSCRIBE_TOPIC); } } else if (event_id == MQTT_EVENT_DISCONNECTED) { xSemaphoreTake(s_ctx.lock, portMAX_DELAY); s_ctx.mqtt_connected = false; xSemaphoreGive(s_ctx.lock); ESP_LOGW(TAG, "MQTT 已断开连接"); } else if (event_id == MQTT_EVENT_DATA) { ESP_LOGI(TAG, "收到 MQTT 消息: topic=%.*s payload=%.*s", event->topic_len, event->topic, event->data_len, event->data); if (s_mqtt_cmd_cb != NULL) { s_mqtt_cmd_cb(event->topic, event->data, event->data_len); } } } /** * @brief 启动 MQTT 客户端 */ esp_err_t agri_env_mqtt_start(void) { if (s_ctx.lock == NULL) { s_ctx.lock = xSemaphoreCreateMutex(); ESP_RETURN_ON_FALSE(s_ctx.lock != NULL, ESP_ERR_NO_MEM, TAG, "互斥锁创建失败"); } if (s_ctx.mqtt_client != NULL) { return ESP_OK; } char mqtt_uri[256]; bool mqtt_uri_prefixed = false; const char *normalized_uri = agri_env_get_normalized_mqtt_uri(mqtt_uri, sizeof(mqtt_uri), &mqtt_uri_prefixed); if (normalized_uri == NULL) { ESP_LOGW(TAG, "MQTT Broker URI 为空,请在 menuconfig 中填写"); return ESP_ERR_INVALID_STATE; } if (mqtt_uri_prefixed) { ESP_LOGW(TAG, "MQTT Broker URI 已规范化为: %s", normalized_uri); } // MQTT 客户端配置 const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = normalized_uri, .credentials.client_id = CONFIG_AGRI_ENV_MQTT_CLIENT_ID, .credentials.username = CONFIG_AGRI_ENV_MQTT_USERNAME, .credentials.authentication.password = CONFIG_AGRI_ENV_MQTT_PASSWORD, .session.protocol_ver = MQTT_PROTOCOL_V_3_1, // 使用 MQTT v3.1 协议 }; s_ctx.mqtt_client = esp_mqtt_client_init(&mqtt_cfg); ESP_RETURN_ON_FALSE(s_ctx.mqtt_client != NULL, ESP_FAIL, TAG, "MQTT 客户端初始化失败"); // 注册所有 MQTT 事件的回调 ESP_RETURN_ON_ERROR( esp_mqtt_client_register_event(s_ctx.mqtt_client, MQTT_EVENT_ANY, agri_env_mqtt_event_handler, NULL), TAG, "MQTT 注册事件失败"); // 启动 MQTT 客户端任务 ESP_RETURN_ON_ERROR(esp_mqtt_client_start(s_ctx.mqtt_client), TAG, "MQTT 启动失败"); return ESP_OK; } /** * @brief 停止并销毁 MQTT 客户端 */ esp_err_t agri_env_mqtt_stop(void) { if (s_ctx.mqtt_client == NULL) { return ESP_OK; } esp_err_t err = esp_mqtt_client_stop(s_ctx.mqtt_client); if (err != ESP_OK) { return err; } err = esp_mqtt_client_destroy(s_ctx.mqtt_client); if (err != ESP_OK) { return err; } xSemaphoreTake(s_ctx.lock, portMAX_DELAY); s_ctx.mqtt_client = NULL; s_ctx.mqtt_connected = false; xSemaphoreGive(s_ctx.lock); return ESP_OK; } /** * @brief 检查 MQTT 是否已成功连接 * * @return true 已连接, false 未连接 */ bool agri_env_mqtt_is_connected(void) { bool connected = false; if (s_ctx.lock == NULL) { return false; } xSemaphoreTake(s_ctx.lock, portMAX_DELAY); connected = s_ctx.mqtt_connected; xSemaphoreGive(s_ctx.lock); return connected; } /** * @brief 发布数据到指定主题 * * 当前处于“仅 MQTT”模式,发布内容为固定的 JSON 数据:{"mode":"mqtt_only"} * * @param topic 目标主题 * @param qos 服务质量等级 * @param retain 保留消息标识 * @return esp_err_t 成功返回 ESP_OK,失败返回相应错误码 */ esp_err_t agri_env_mqtt_publish(const char *topic, const char *payload, int qos, int retain) { ESP_RETURN_ON_FALSE(topic != NULL && topic[0] != '\0', ESP_ERR_INVALID_ARG, TAG, "主题为空"); ESP_RETURN_ON_FALSE(payload != NULL, ESP_ERR_INVALID_ARG, TAG, "内容为空"); ESP_RETURN_ON_FALSE(s_ctx.mqtt_client != NULL, ESP_ERR_INVALID_STATE, TAG, "MQTT 客户端未启动"); ESP_RETURN_ON_FALSE(agri_env_mqtt_is_connected(), ESP_ERR_INVALID_STATE, TAG, "MQTT 未连接"); int msg_id = esp_mqtt_client_publish(s_ctx.mqtt_client, topic, payload, 0, qos, retain); ESP_RETURN_ON_FALSE(msg_id >= 0, ESP_FAIL, TAG, "MQTT 发布失败"); return ESP_OK; } esp_err_t agri_env_mqtt_publish_latest(const char *topic, int qos, int retain) { static const char *payload = "{\"mode\":\"mqtt_only\"}"; return agri_env_mqtt_publish(topic, payload, qos, retain); }