摘要
本文簡單介紹如何移植MQTT適合群體
適用于潤和Hi3861開發(fā)板的開發(fā)人員。1、MQTT介紹
MQTT 是當前最主流的物聯(lián)網通信協(xié)議,需要物聯(lián)網云平臺,例如華為云、阿里云、移動OneNET都支持mqtt。而Hi3861則是一款專為IoT應用場景打造的芯片。本節(jié)主要講如何在鴻蒙系統(tǒng)中通過移植第3方軟件包 paho mqtt去實現(xiàn)MQTT協(xié)議功能,最后會給出測試驗證。為后續(xù)的物聯(lián)網項目打好基礎。友情預告,本節(jié)內容較多,源碼也貼出來了,大家最好先看一遍,然后再操作一次。
已經移植好的MQTT源碼:
2、MQTT移植
如果不想要自己移植的,可以跳過本節(jié)。MQTT 全稱為 Message Queuing Telemetry Transport(消息隊列遙測傳輸)是一種基于發(fā)布/訂閱范式的二進制“輕量級”消息協(xié)議,由IB公司發(fā)布。針對于網絡受限和嵌入式設備而設計的一種數(shù)據(jù)傳輸協(xié)議。MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網、小型設備、移動應用等方面有較廣泛的應用。MQTT模型如圖所示。更多MQTT協(xié)議的介紹見這篇文章:MQTT 協(xié)議開發(fā)入門
paho mqtt-c 是基于C語言實現(xiàn)的MQTT客戶端,非常適合用在嵌入式設備上。首先下載源碼:
下載之后解壓,會得到這么一個文件夾:
如何在鴻蒙系統(tǒng)中移植 Paho-MQTT 實現(xiàn)MQTT協(xié)議-鴻蒙HarmonyOS技術社區(qū)
我們在鴻蒙系統(tǒng)源碼的 third_party 文件夾下創(chuàng)建一個 pahomqtt 文件夾,然后把解壓后的所有文件都拷貝到 pahomqtt 文件夾下。
下一步,我們在pahomqtt 文件夾下面新建BUILD.gn文件,用來構建編譯。其內容如下:
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import("http://build/lite/config/component/lite_component.gni")
import("http://build/lite/ndk/ndk.gni")
config("pahomqtt_config") {
include_dirs = [
"MQTTPacket/src",
"MQTTClient-C/src",
"MQTTClient-C/src/liteOS",
"http://kernel/liteos_m/components/cmsis/2.0",
]
}
pahomqtt_sources = [
"MQTTClient-C/src/liteOS/MQTTLiteOS.c",
"MQTTClient-C/src/MQTTClient.c",
"MQTTPacket/src/MQTTConnectClient.c",
"MQTTPacket/src/MQTTConnectServer.c",
"MQTTPacket/src/MQTTDeserializePublish.c",
"MQTTPacket/src/MQTTFormat.c",
"MQTTPacket/src/MQTTPacket.c",
"MQTTPacket/src/MQTTSerializePublish.c",
"MQTTPacket/src/MQTTSubscribeClient.c",
"MQTTPacket/src/MQTTSubscribeServer.c",
"MQTTPacket/src/MQTTUnsubscribeClient.c",
"MQTTPacket/src/MQTTUnsubscribeServer.c",
]
lite_library("pahomqtt_static") {
target_type = "static_library"
sources = pahomqtt_sources
public_configs = [ ":pahomqtt_config" ]
}
lite_library("pahomqtt_shared") {
target_type = "shared_library"
sources = pahomqtt_sources
public_configs = [ ":pahomqtt_config" ]
}
ndk_lib("pahomqtt_ndk") {
if (board_name != "hi3861v100") {
lib_extension = ".so"
deps = [
":pahomqtt_shared"
]
} else {
deps = [
":pahomqtt_static"
]
}
head_files = [
"http://third_party/pahomqtt"
]
}
向右滑動查看完整代碼
2)移植
我們使用到的是MQTTClient-C的代碼,該代碼支持多線程。
(1)創(chuàng)建LiteOS文件夾
MQTT已經提供了Linux和freertos的移植,這里我們參考,新建文件夾:third_partypahomqttMQTTClient-CsrcliteOS,里面存放兩個文件:MQTTLiteOS.c 和 MQTTLiteOS.h,內容如下:
//用來創(chuàng)建線程
int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
{
int rc = 0;
thread = thread;
osThreadAttr_t attr;
attr.name = "MQTTTask";
attr.attr_bits = 0U;
attr.cb_mem = NULL;
attr.cb_size = 0U;
attr.stack_mem = NULL;
attr.stack_size = 2048;
attr.priority = osThreadGetPriority(osThreadGetId());
rc = (int)osThreadNew((osThreadFunc_t)fn, arg, &attr);
return rc;
}
//定時器初始化
void TimerInit(Timer* timer)
{
timer->end_time = (struct timeval){0, 0};
}
char TimerIsExpired(Timer* timer)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
}
void TimerCountdownMS(Timer* timer, unsigned int timeout)
{
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
timeradd(&now, &interval, &timer->end_time);
}
void TimerCountdown(Timer* timer, unsigned int timeout)
{
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout, 0};
timeradd(&now, &interval, &timer->end_time);
}
int TimerLeftMS(Timer* timer)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
//printf("left %d ms ", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
}
void MutexInit(Mutex* mutex)
{
mutex->sem = osSemaphoreNew(1, 1, NULL);
}
int MutexLock(Mutex* mutex)
{
return osSemaphoreAcquire(mutex->sem, LOS_WAIT_FOREVER);
}
int MutexUnlock(Mutex* mutex)
{
return osSemaphoreRelease(mutex->sem);
}
//接受數(shù)據(jù)
int ohos_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
{
interval.tv_sec = 0;
interval.tv_usec = 100;
}
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
int bytes = 0;
while (bytes < len)
{
int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
if (rc == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
bytes = -1;
break;
}
else if (rc == 0)
{
bytes = 0;
break;
}
else
bytes += rc;
}
return bytes;
}
//寫數(shù)據(jù)
int ohos_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
struct timeval tv;
tv.tv_sec = 0; /* 30 Secs Timeout */
tv.tv_usec = timeout_ms * 1000; // Not init'ing this can cause strange errors
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
int rc = send(n->my_socket, buffer, len, 0);
return rc;
}
//網絡初始化
void NetworkInit(Network* n)
{
n->my_socket = 0;
n->mqttread = ohos_read;
n->mqttwrite = ohos_write;
}
//網絡連接
int NetworkConnect(Network* n, char* addr, int port)
{
int type = SOCK_STREAM;
struct sockaddr_in address;
int rc = -1;
sa_family_t family = AF_INET;
struct addrinfo *result = NULL;
struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
{
struct addrinfo* res = result;
/* prefer ip4 addresses */
while (res)
{
if (res->ai_family == AF_INET)
{
result = res;
break;
}
res = res->ai_next;
}
if (result->ai_family == AF_INET)
{
address.sin_port = htons(port);
address.sin_family = family = AF_INET;
address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
}
else
rc = -1;
freeaddrinfo(result);
}
if (rc == 0)
{
n->my_socket = socket(family, type, 0);
if (n->my_socket != -1)
rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
else
rc = -1;
}
return rc;
}
void NetworkDisconnect(Network* n)
{
close(n->my_socket);
}
向右滑動查看完整代碼
至此我們移植基本結束。
3、代碼測試
測試代碼比較好寫。主要是3個文件,內容我都貼出來了:(1)BUILD.gn文件內容:
static_library("mqtt_test") {
sources = [
"mqtt_test.c",
"mqtt_entry.c"
]
include_dirs = [
"http://utils/native/lite/include",
"http://kernel/liteos_m/components/cmsis/2.0",
"http://base/iot_hardware/interfaces/kits/wifiiot_lite",
"http://vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include",
"http://foundation/communication/interfaces/kits/wifi_lite/wifiservice",
"http://third_party/pahomqtt/MQTTPacket/src",
"http://third_party/pahomqtt/MQTTClient-C/src",
"http://third_party/pahomqtt/MQTTClient-C/src/liteOS",
]
#表示需要a_myparty 軟件包
deps = [
"http://third_party/pahomqtt:pahomqtt_static",
]
}
向右滑動查看完整代碼
(2)mqtt_entry.c文件
主要是進行熱點連接,因為我們要使用MQTT需要用到網絡。熱點連接的代碼之前在第9章已經講說,這里就不完全貼了,代碼倉庫也有,主要的代碼部分:
void wifi_sta_task(void *arg)
{
arg = arg;
//連接熱點
hi_wifi_start_sta();
while(wifi_ok_flg == 0)
{
usleep(30000);
}
usleep(2000000);
//開始進入MQTT測試
mqtt_test();
}
向右滑動查看完整代碼
(3)mqtt_test.c 文件則是編寫了一個簡單的MQTT測試代碼
其中測試用的mqtt服務器是我自己的服務器:5.196.95.208,大家也可以改成自己的。
static MQTTClient mq_client;
unsigned char *onenet_mqtt_buf;
unsigned char *onenet_mqtt_readbuf;
int buf_size;
Network n;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
//消息回調函數(shù)
void mqtt_callback(MessageData *msg_data)
{
size_t res_len = 0;
uint8_t *response_buf = NULL;
char topicname[45] = { "$crsp/" };
LOS_ASSERT(msg_data);
printf("topic %.*s receive a message ", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);
printf("message is %.*s ", msg_data->message->payloadlen, msg_data->message->payload);
}
int mqtt_connect(void)
{
int rc = 0;
NetworkInit(&n);
NetworkConnect(&n, "5.196.95.208", 1883);
buf_size = 2048;
onenet_mqtt_buf = (unsigned char *) malloc(buf_size);
onenet_mqtt_readbuf = (unsigned char *) malloc(buf_size);
if (!(onenet_mqtt_buf && onenet_mqtt_readbuf))
{
printf("No memory for MQTT client buffer!");
return -2;
}
MQTTClientInit(&mq_client, &n, 1000, onenet_mqtt_buf, buf_size, onenet_mqtt_readbuf, buf_size);
MQTTStartTask(&mq_client);
data.keepAliveInterval = 30;
data.cleansession = 1;
data.clientID.cstring = "ohos_hi3861";
data.username.cstring = "123456";
data.password.cstring = "222222";
data.keepAliveInterval = 10;
data.cleansession = 1;
mq_client.defaultMessageHandler = mqtt_callback;
//連接服務器
rc = MQTTConnect(&mq_client, &data);
//訂閱消息,并設置回調函數(shù)
MQTTSubscribe(&mq_client, "ohossub", 0, mqtt_callback);
while(1)
{
MQTTMessage message;
message.qos = QOS1;
message.retained = 0;
message.payload = (void *)"openharmony";
message.payloadlen = strlen("openharmony");
//發(fā)送消息
if (MQTTPublish(&mq_client, "ohospub", &message) < 0)
{
return -1;
}
}
return 0;
}
void mqtt_test(void)
{
mqtt_connect();
}
向右滑動查看完整代碼
到這里就完成了代碼部分,可以開始編譯了。
4、編譯
這里我們需要先下載一個 Windows電腦端的 MQTT客戶端,這樣我們就可以用電腦訂閱開發(fā)板的MQTT主題信息了。我們選擇這一個:
弄完后打開軟件,按圖操作:
此時我們去查看 我們電腦端的MQTT客戶端軟件,可以看到右邊已經有接收MQTT信息了,主題未 ohospub,消息內容為 openharmony,說明實驗成功。
電腦發(fā)送主題為ohossub,內容為123456,查看串口打印,可以看到也收到了數(shù)據(jù)。
本節(jié)移植MQTT的教程就到這里了,下一篇我們給大家分享:OneNET云接入,歡迎大家持續(xù)關注哦~
原文標題:OpenHarmony輕量系統(tǒng)開發(fā)【11】移植MQTT
文章出處:【微信公眾號:HarmonyOS官方合作社區(qū)】歡迎添加關注!文章轉載請注明出處。
審核編輯:湯梓紅
-
通信
+關注
關注
18文章
6070瀏覽量
136419 -
物聯(lián)網
+關注
關注
2913文章
44923瀏覽量
376984 -
MQTT
+關注
關注
5文章
653瀏覽量
22690
原文標題:OpenHarmony輕量系統(tǒng)開發(fā)【11】移植MQTT
文章出處:【微信號:HarmonyOS_Community,微信公眾號:電子發(fā)燒友開源社區(qū)】歡迎添加關注!文章轉載請注明出處。
發(fā)布評論請先 登錄
相關推薦
評論