帮助文档
专业提供香港服务器、香港云服务器、香港高防服务器租用、香港云主机、台湾服务器、美国服务器、美国云服务器vps租用、韩国高防服务器租用、新加坡服务器、日本服务器租用 一站式全球网络解决方案提供商!专业运营维护IDC数据中心,提供高质量的服务器托管,服务器机房租用,服务器机柜租用,IDC机房机柜租用等服务,稳定、安全、高性能的云端计算服务,实时满足您的多样性业务需求。 香港大带宽稳定可靠,高级工程师提供基于服务器硬件、操作系统、网络、应用环境、安全的免费技术支持。
服务器资讯 / 香港服务器租用 / 香港VPS租用 / 香港云服务器 / 美国服务器租用 / 台湾服务器租用 / 日本服务器租用 / 官方公告 / 帮助文档
Django服务器通过AMQP获取阿里云物联网平台数据的方法
发布时间:2024-03-06 02:28:10   分类:帮助文档
Django服务器通过AMQP获取阿里云物联网平台数据的方法

毕业设计需要使用到阿里云物联网平台,最近正好正在开发django框架的服务端,记录一下
一、准备工作
在使用AMQP获取阿里云物联网平台的数据之前,我们需要安装几个python包:stomp.py和schedule
pip install stomp.py
pip install schedule
pip install django-environ
这些包在后面的部分需要用到
二、编写连接阿里云物联网平台的.py文件
这里我们直接使用阿里云提供的代码,但是我们做了一些小修改,而import的environ就是我们刚才安装的django-environ包。当然下面的文件你可以放在项目的任何地方,但是我推荐在根目录新建一个common文件夹来专门存放类似这样的服务的文件。
# encoding=utf-8
import time
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
# 阿里云使用os包,为了方便管理这里和setting一样使用django-environ
# import os
import environ
from graduation_project_web_end.settings import BASE_DIR

# 设置.env路径
env_file = BASE_DIR / '.env'
# 初始化环境变量
env = environ.Env()
# 读取.env文件
environ.Env.read_env(env_file=str(env_file))


def connect_and_subscribe(conn):
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
accessKey = env('accessKey')
accessSecret = env('accessSecret')
consumerGroupId = env('consumerGroupId')
# iotInstanceId:实例ID。
iotInstanceId = env('iotInstanceId')
clientId = env('clientId')
# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName组装方法,请参见AMQP客户端接入说明文档。
# 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))

conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# 清除历史连接检查任务,新建连接检查任务
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check, conn).tag('conn-check')


class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn

def on_error(self, frame):
print('received an error "%s"' % frame.body)

def on_message(self, frame):
print('received a message "%s"' % frame.body)

def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')

def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")

def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)


def current_time_millis():
return str(int(round(time.time() * 1000)))


def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")


# 检查连接,如果未连接则重新建连
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if (not conn.is_connected()):
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)


# 定时任务方法,检查连接状态
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(10)


# 接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
conn = stomp.Connection([(env('conn'), 61614)], heartbeats=(0, 300))
conn.set_ssl(for_hosts=[(env('conn'), 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e

# 异步线程运行定时任务,检查连接状态
thread = threading.Thread(target=connection_check_timer)
thread.start()

现在我们来解释一下这部分代码(不感兴趣的可以跳到下一个标题)
import environ
from graduation_project_web_end.settings import BASE_DIR

# 设置.env路径
env_file = BASE_DIR / '.env'
# 初始化环境变量
env = environ.Env()
# 读取.env文件
environ.Env.read_env(env_file=str(env_file))
首先我们通过设置env_file设置了.env文件的路径,告诉服务器.env文件在根目录下,然后我们初始化一下env,然后使用environ.Env.read_env(env_file=str(env_file))读取根目录下的.env文件。
后面就可以通过以下的形式使用env文件的内容了
accessKey = env('accessKey')
三、在根目录新建.env文件
.env文件的格式是,下面这样的,注意等号两边不能有空格
DEBUG=True
现在放一下.env文件编写完成后的样子,前三项是没有必要的,那个是我对setting.py部分参数的定义。其他项的编写规则我引用一下阿里云的文档,链接在这里:阿里云官方文档

试运行一下

连接物联网平台完成
四、如何将数据存储在模型层
方法及部分原理
注意到在前面的代码中有一个类被叫做MyListener,其中有一个on_message方法
def on_message(self, frame):
print('received a message "%s"' % frame.body)
这里其实就是阿里云将消息转发过来会触发的方法
在消息转发 > 服务端订阅中我们可以编辑订阅,勾选上设备上报消息
在设备管理 > 设备模拟器,我们测试一下属性上报(当然如果想尝试得先在物联网平台注册一个设备,这里只是演示)

我们成功收到了消息

编写models.py
from django.contrib.auth.models import User
from django.db import models


# Create your models here.
class Device(models.Model):
unique_id = models.CharField(max_length=100, unique=True)
name = models.CharField(max_length=100)
is_bind = models.BooleanField(default=False) # 表示设备是否被绑定

def __str__(self):
return self.name

class Meta:
db_table = '设备'
ordering = ['unique_id']
verbose_name = '设备'
verbose_name_plural = '设备'


class BindDevice(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='bind_devices')
device = models.ForeignKey(Device, on_delete=models.CASCADE, related_name='bind_users')
bind_time = models.DateTimeField(auto_now_add=True) # 绑定时间

def __str__(self):
return f"{self.user.username} - {self.device.name}"

class Meta:
db_table = '绑定设备'
unique_together = (('user', 'device'),) # 用户和设备的组合必须是唯一的
verbose_name = '绑定设备'
verbose_name_plural = '绑定设备'


class DeviceData(models.Model):
device = models.ForeignKey(Device, on_delete=models.CASCADE, related_name='data')
timestamp = models.DateTimeField(auto_now_add=True)
heart_rate = models.IntegerField(blank=True, null=True) # 心率
blood_oxygen = models.IntegerField(blank=True, null=True) # 血氧水平
temperature = models.FloatField(blank=True, null=True) # 温度
# 可根据需要添加更多字段

def __str__(self):
return f"{self.device.name} Data at {self.timestamp}"

class Meta:
db_table = '设备数据'
ordering = ['-timestamp'] # 最新的数据首先显示
verbose_name = '设备数据'
verbose_name_plural = '设备数据'


现在开始修改aliyuniot.py的MyListener的on_message方法
def on_message(self, frame):
# print('received a message "%s"' % frame.body)
# 解析消息
data = json.loads(frame.body)

# 从消息中提取数据
unique_id = data['iotId']
blood_oxygen = data['items']['blood_oxygen']['value']
temperature = data['items']['temperature']['value']
heart_rate = data['items']['heart_rate']['value']
blood_pressure = data['items']['blood_pressure']['value']
timestamp = datetime.fromtimestamp(data['items']['heart_rate']['time'] / 1000.0, tz=timezone.utc)

# 查找对应的设备实例
try:
device = Device.objects.get(unique_id=unique_id, is_bind=True)
# 创建DeviceData实例存储数据
DeviceData.objects.create(
device=device,
timestamp=timestamp,
heart_rate=heart_rate,
blood_oxygen=blood_oxygen,
temperature=temperature,
blood_pressure=str(blood_pressure) # 假设血压字段是字符串类型
)
print(f"设备 {unique_id} 的数据存储成功")
except Device.DoesNotExist:
print(f"设备 {unique_id} 未被绑定或不存在")
这个文件被我用作脚本运行,在这种情况下为了避免项目出错,我们在import模型层的时候应该注意到,必须首先设置django环境才能导入模型层,注意把setting的路径修改为自己的路径
# 设置django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'graduation_project_web_end.settings')
django.setup()

# 必须设置django环境之后才可以引入APP的models文件
from index.models import *
解释一下是怎么设置环境的
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'graduation_project_web_end.settings')
这行代码通过设置环境变量DJANGO_SETTING_MODULE来指定Django项目的配置文件(settings.py)。'graduation_project_web_end.settings'应该替换为你自己项目的设置模块路径。它的作用是告诉Django,当初始化框架时,应该使用哪个设置文件。setup则是按路径加载上面的环境。
五、完整代码
aliyuniot.py如下
# encoding=utf-8
import json
import os
import django
import time
import hashlib
import hmac
import base64
from datetime import datetime, timezone
import stomp
import ssl
import schedule
import threading
# 阿里云使用os包,为了方便管理这里和setting一样使用django-environ
# import os
import environ
from graduation_project_web_end.settings import BASE_DIR

# 设置django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'graduation_project_web_end.settings')
django.setup()

# 必须设置django环境之后才可以引入APP的models文件
from index.models import *

# 设置.env路径
env_file = BASE_DIR / '.env'
# 初始化环境变量
env = environ.Env()
# 读取.env文件
environ.Env.read_env(env_file=str(env_file))


def connect_and_subscribe(conn):
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
accessKey = env('accessKey')
accessSecret = env('accessSecret')
consumerGroupId = env('consumerGroupId')
# iotInstanceId:实例ID。
iotInstanceId = env('iotInstanceId')
clientId = env('clientId')
# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName组装方法,请参见AMQP客户端接入说明文档。
# 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))

conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# 清除历史连接检查任务,新建连接检查任务
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check, conn).tag('conn-check')


class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn

def on_error(self, frame):
print('received an error "%s"' % frame.body)

def on_message(self, frame):
# print('received a message "%s"' % frame.body)
# 解析消息
data = json.loads(frame.body)

# 从消息中提取数据
unique_id = data['iotId']
blood_oxygen = data['items']['blood_oxygen']['value']
temperature = data['items']['temperature']['value']
heart_rate = data['items']['heart_rate']['value']
blood_pressure = data['items']['blood_pressure']['value']
timestamp = datetime.fromtimestamp(data['items']['heart_rate']['time'] / 1000.0, tz=timezone.utc)

# 查找对应的设备实例
try:
device = Device.objects.get(unique_id=unique_id, is_bind=True)
# 创建DeviceData实例存储数据
DeviceData.objects.create(
device=device,
timestamp=timestamp,
heart_rate=heart_rate,
blood_oxygen=blood_oxygen,
temperature=temperature,
blood_pressure=str(blood_pressure) # 假设血压字段是字符串类型
)
print(f"设备 {unique_id} 的数据存储成功")
except Device.DoesNotExist:
print(f"设备 {unique_id} 未被绑定或不存在")

def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')

def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")

def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)


def current_time_millis():
return str(int(round(time.time() * 1000)))


def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")


# 检查连接,如果未连接则重新建连
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if not conn.is_connected():
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)


# 定时任务方法,检查连接状态
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(60)


# 接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
conn = stomp.Connection([(env('conn'), 61614)], heartbeats=(0, 300))
conn.set_ssl(for_hosts=[(env('conn'), 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e

# 异步线程运行定时任务,检查连接状态
thread = threading.Thread(target=connection_check_timer)
thread.start()

运行测试


大功告成


香港云服务器租用推荐
服务器租用资讯
·广东云服务有限公司怎么样
·广东云服务器怎么样
·广东锐讯网络有限公司怎么样
·广东佛山的蜗牛怎么那么大
·广东单位电话主机号怎么填写
·管家婆 花生壳怎么用
·官网域名过期要怎么办
·官网邮箱一般怎么命名
·官网网站被篡改怎么办
服务器租用推荐
·美国服务器租用
·台湾服务器租用
·香港云服务器租用
·香港裸金属服务器
·香港高防服务器租用
·香港服务器租用特价