Django服务器通过AMQP获取阿里云物联网平台数据的方法
毕业设计需要使用到阿里云物联网平台,最近正好正在开发django框架的服务端,记录一下
一、准备工作
在使用AMQP获取阿里云物联网平台的数据之前,我们需要安装几个python包:stomp.py和schedule
pip install stomp.py
pip install schedule
pip install django-environ
这些包在后面的部分需要用到
二、编写连接阿里云物联网平台的.py文件
这里我们直接使用阿里云提供的代码,但是我们做了一些小修改,而import的environ就是我们刚才安装的django-environ包。当然下面的文件你可以放在项目的任何地方,但是我推荐在根目录新建一个common文件夹来专门存放类似这样的服务的文件。
# encoding=utf-8
import time
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
# 阿里云使用os包,为了方便管理这里和setting一样使用django-environ
# import os
import environ
from graduation_project_web_end.settings import BASE_DIR
# 设置.env路径
env_file = BASE_DIR / '.env'
# 初始化环境变量
env = environ.Env()
# 读取.env文件
environ.Env.read_env(env_file=str(env_file))
def connect_and_subscribe(conn):
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
accessKey = env('accessKey')
accessSecret = env('accessSecret')
consumerGroupId = env('consumerGroupId')
# iotInstanceId:实例ID。
iotInstanceId = env('iotInstanceId')
clientId = env('clientId')
# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName组装方法,请参见AMQP客户端接入说明文档。
# 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# 清除历史连接检查任务,新建连接检查任务
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check, conn).tag('conn-check')
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print('received a message "%s"' % frame.body)
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")
def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")
# 检查连接,如果未连接则重新建连
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if (not conn.is_connected()):
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)
# 定时任务方法,检查连接状态
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(10)
# 接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
conn = stomp.Connection([(env('conn'), 61614)], heartbeats=(0, 300))
conn.set_ssl(for_hosts=[(env('conn'), 61614)], ssl_version=ssl.PROTOCOL_TLS)
try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e
# 异步线程运行定时任务,检查连接状态
thread = threading.Thread(target=connection_check_timer)
thread.start()
现在我们来解释一下这部分代码(不感兴趣的可以跳到下一个标题)
import environ
from graduation_project_web_end.settings import BASE_DIR
# 设置.env路径
env_file = BASE_DIR / '.env'
# 初始化环境变量
env = environ.Env()
# 读取.env文件
environ.Env.read_env(env_file=str(env_file))
首先我们通过设置env_file设置了.env文件的路径,告诉服务器.env文件在根目录下,然后我们初始化一下env,然后使用environ.Env.read_env(env_file=str(env_file))读取根目录下的.env文件。
后面就可以通过以下的形式使用env文件的内容了
accessKey = env('accessKey')
三、在根目录新建.env文件
.env文件的格式是,下面这样的,注意等号两边不能有空格
DEBUG=True
现在放一下.env文件编写完成后的样子,前三项是没有必要的,那个是我对setting.py部分参数的定义。其他项的编写规则我引用一下阿里云的文档,链接在这里:阿里云官方文档
试运行一下
连接物联网平台完成
四、如何将数据存储在模型层
方法及部分原理
注意到在前面的代码中有一个类被叫做MyListener,其中有一个on_message方法
def on_message(self, frame):
print('received a message "%s"' % frame.body)
这里其实就是阿里云将消息转发过来会触发的方法
在消息转发 > 服务端订阅中我们可以编辑订阅,勾选上设备上报消息
在设备管理 > 设备模拟器,我们测试一下属性上报(当然如果想尝试得先在物联网平台注册一个设备,这里只是演示)
我们成功收到了消息
编写models.py
from django.contrib.auth.models import User
from django.db import models
# Create your models here.
class Device(models.Model):
unique_id = models.CharField(max_length=100, unique=True)
name = models.CharField(max_length=100)
is_bind = models.BooleanField(default=False) # 表示设备是否被绑定
def __str__(self):
return self.name
class Meta:
db_table = '设备'
ordering = ['unique_id']
verbose_name = '设备'
verbose_name_plural = '设备'
class BindDevice(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='bind_devices')
device = models.ForeignKey(Device, on_delete=models.CASCADE, related_name='bind_users')
bind_time = models.DateTimeField(auto_now_add=True) # 绑定时间
def __str__(self):
return f"{self.user.username} - {self.device.name}"
class Meta:
db_table = '绑定设备'
unique_together = (('user', 'device'),) # 用户和设备的组合必须是唯一的
verbose_name = '绑定设备'
verbose_name_plural = '绑定设备'
class DeviceData(models.Model):
device = models.ForeignKey(Device, on_delete=models.CASCADE, related_name='data')
timestamp = models.DateTimeField(auto_now_add=True)
heart_rate = models.IntegerField(blank=True, null=True) # 心率
blood_oxygen = models.IntegerField(blank=True, null=True) # 血氧水平
temperature = models.FloatField(blank=True, null=True) # 温度
# 可根据需要添加更多字段
def __str__(self):
return f"{self.device.name} Data at {self.timestamp}"
class Meta:
db_table = '设备数据'
ordering = ['-timestamp'] # 最新的数据首先显示
verbose_name = '设备数据'
verbose_name_plural = '设备数据'
现在开始修改aliyuniot.py的MyListener的on_message方法
def on_message(self, frame):
# print('received a message "%s"' % frame.body)
# 解析消息
data = json.loads(frame.body)
# 从消息中提取数据
unique_id = data['iotId']
blood_oxygen = data['items']['blood_oxygen']['value']
temperature = data['items']['temperature']['value']
heart_rate = data['items']['heart_rate']['value']
blood_pressure = data['items']['blood_pressure']['value']
timestamp = datetime.fromtimestamp(data['items']['heart_rate']['time'] / 1000.0, tz=timezone.utc)
# 查找对应的设备实例
try:
device = Device.objects.get(unique_id=unique_id, is_bind=True)
# 创建DeviceData实例存储数据
DeviceData.objects.create(
device=device,
timestamp=timestamp,
heart_rate=heart_rate,
blood_oxygen=blood_oxygen,
temperature=temperature,
blood_pressure=str(blood_pressure) # 假设血压字段是字符串类型
)
print(f"设备 {unique_id} 的数据存储成功")
except Device.DoesNotExist:
print(f"设备 {unique_id} 未被绑定或不存在")
这个文件被我用作脚本运行,在这种情况下为了避免项目出错,我们在import模型层的时候应该注意到,必须首先设置django环境才能导入模型层,注意把setting的路径修改为自己的路径
# 设置django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'graduation_project_web_end.settings')
django.setup()
# 必须设置django环境之后才可以引入APP的models文件
from index.models import *
解释一下是怎么设置环境的
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'graduation_project_web_end.settings')
这行代码通过设置环境变量DJANGO_SETTING_MODULE来指定Django项目的配置文件(settings.py)。'graduation_project_web_end.settings'应该替换为你自己项目的设置模块路径。它的作用是告诉Django,当初始化框架时,应该使用哪个设置文件。setup则是按路径加载上面的环境。
五、完整代码
aliyuniot.py如下
# encoding=utf-8
import json
import os
import django
import time
import hashlib
import hmac
import base64
from datetime import datetime, timezone
import stomp
import ssl
import schedule
import threading
# 阿里云使用os包,为了方便管理这里和setting一样使用django-environ
# import os
import environ
from graduation_project_web_end.settings import BASE_DIR
# 设置django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'graduation_project_web_end.settings')
django.setup()
# 必须设置django环境之后才可以引入APP的models文件
from index.models import *
# 设置.env路径
env_file = BASE_DIR / '.env'
# 初始化环境变量
env = environ.Env()
# 读取.env文件
environ.Env.read_env(env_file=str(env_file))
def connect_and_subscribe(conn):
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
accessKey = env('accessKey')
accessSecret = env('accessSecret')
consumerGroupId = env('consumerGroupId')
# iotInstanceId:实例ID。
iotInstanceId = env('iotInstanceId')
clientId = env('clientId')
# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName组装方法,请参见AMQP客户端接入说明文档。
# 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# 清除历史连接检查任务,新建连接检查任务
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check, conn).tag('conn-check')
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
# print('received a message "%s"' % frame.body)
# 解析消息
data = json.loads(frame.body)
# 从消息中提取数据
unique_id = data['iotId']
blood_oxygen = data['items']['blood_oxygen']['value']
temperature = data['items']['temperature']['value']
heart_rate = data['items']['heart_rate']['value']
blood_pressure = data['items']['blood_pressure']['value']
timestamp = datetime.fromtimestamp(data['items']['heart_rate']['time'] / 1000.0, tz=timezone.utc)
# 查找对应的设备实例
try:
device = Device.objects.get(unique_id=unique_id, is_bind=True)
# 创建DeviceData实例存储数据
DeviceData.objects.create(
device=device,
timestamp=timestamp,
heart_rate=heart_rate,
blood_oxygen=blood_oxygen,
temperature=temperature,
blood_pressure=str(blood_pressure) # 假设血压字段是字符串类型
)
print(f"设备 {unique_id} 的数据存储成功")
except Device.DoesNotExist:
print(f"设备 {unique_id} 未被绑定或不存在")
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")
def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")
# 检查连接,如果未连接则重新建连
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if not conn.is_connected():
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)
# 定时任务方法,检查连接状态
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(60)
# 接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
conn = stomp.Connection([(env('conn'), 61614)], heartbeats=(0, 300))
conn.set_ssl(for_hosts=[(env('conn'), 61614)], ssl_version=ssl.PROTOCOL_TLS)
try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e
# 异步线程运行定时任务,检查连接状态
thread = threading.Thread(target=connection_check_timer)
thread.start()
运行测试
大功告成