百木园-与人分享,
就是让自己快乐。

基于树莓派+传感器+阿里云IoT的智能家居管理(代码实现)

视频教程已经放在B站
请大家狠狠地三连我
虽然我没有稚晖君那么强

[video(video-1y2sFBXw-1623142522346)(type-bilibili)(url-https://player.bilibili.com/player.html?aid=716086471)(image-https://ss.csdn.net/p?http://i0.hdslb.com/bfs/archive/c292aeda52dff181453ea0f9da809fd5d38fece8.jpg)(title-教程!基于树莓派+传感器+阿里云IoT的智能家居管理(2))]

主文件

#!/usr/bin/python3

import aliLink,mqttd,rpi
import time,json
import Adafruit_DHT
import time
import LCD1602
import flame_sensor
import buzzer_1
import rain_detector
import gas_sensor
import relay
from threading import Thread

pin = 19 # DHT11 温湿度传感器管脚定义
Buzzer = 20 # 有源蜂鸣器管脚定义

# GPIO口定义
sensor = Adafruit_DHT.DHT11

# 三元素(iot后台获取)
ProductKey = \'a11lzCDSgZP\'
DeviceName = \'IU6aSETyiImFPSkpcywm\'
DeviceSecret = \"2551eb5f630c372743c538e9b87bfe6d\"
# topic (iot后台获取)
POST = \'/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post\' # 上报消息到云
POST_REPLY = \'/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post_reply\'
SET = \'/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/service/property/set\' # 订阅云端指令

#窗户开关
window = 0
window_status = 0
Thread(target=relay.close).start()

# 消息回调(云端下发消息的回调函数)
def on_message(client, userdata, msg):
#print(msg.payload)
Msg = json.loads(msg.payload)

global window,window_status
window = Msg[\'params\'][\'window\']
print(msg.payload) # 开关值
if window_status != window:
window_status = window
if window == 1:
Thread(target=relay.open).start()
else:
Thread(target=relay.close).start()

#连接回调(与阿里云建立链接后的回调函数)
def on_connect(client, userdata, flags, rc):
pass

# 链接信息
Server,ClientId,userNmae,Password = aliLink.linkiot(DeviceName,ProductKey,DeviceSecret)

# mqtt链接
mqtt = mqttd.MQTT(Server,ClientId,userNmae,Password)
mqtt.subscribe(SET) # 订阅服务器下发消息topic
mqtt.begin(on_message,on_connect)

# 信息获取上报,每2秒钟上报一次系统参数
while True:
#获取指示灯状态
power_stats=int(rpi.getLed())
if(power_stats == 0):
power_LED = 0
else:
power_LED = 1

# CPU 信息
CPU_temp = float(rpi.getCPUtemperature()) # 温度 ℃
CPU_usage = float(rpi.getCPUuse()) # 占用率 %

# RAM 信息
RAM_stats =rpi.getRAMinfo()
RAM_total =round(int(RAM_stats[0]) /1000,1) #
RAM_used =round(int(RAM_stats[1]) /1000,1)
RAM_free =round(int(RAM_stats[2]) /1000,1)

# Disk 信息
DISK_stats =rpi.getDiskSpace()
DISK_total = float(DISK_stats[0][:-1])
DISK_used = float(DISK_stats[1][:-1])
DISK_perc = float(DISK_stats[3][:-1])

#温度,湿度
humidity, temperature = Adafruit_DHT.read_retry(sensor, pin)

# LCD显示
LCD = 0
try:
LCD1602.init(0x27, 1) # 初始化显示屏
LCD1602.write(0, 0, \'humidity: \' + str(int(humidity)) + \'%\')
LCD1602.write(0, 1, \'temperature: \' + str(int(temperature)) + \'\\\'\') # 在第二行显示world!
LCD = 1
except:
print(\"显示屏连接不稳定,请检查\")
LCD = 0

# 蜂鸣器
buzzer = 0

# 火焰传感器
flame_sensor.setup()
if flame_sensor.fire() == 0:
flame = 1
buzzer_1.buzzer_on() #让铃声叫
buzzer = 1
else:
flame = 0
buzzer_1.buzzer_off() #铃声不叫
buzzer = 0

# 烟雾传感器
gas = gas_sensor.gas()

# 雨滴传感器
rain_detector.setup()
if rain_detector.rain() == 0:
rain = 1
else:
rain = 0

# 构建与云端模型一致的消息结构
updateMsn = {
\'cpu_temperature\':CPU_temp,
\'cpu_usage\':CPU_usage,
\'RAM_total\':RAM_total,
\'RAM_used\':RAM_used,
\'RAM_free\':RAM_free,
\'DISK_total\':DISK_total,
\'DISK_used_space\':DISK_used,
\'DISK_used_percentage\':DISK_perc,
\'PowerLed\':power_LED,
\'temperature\':temperature,
\'humidity\':humidity,
\'window\':window,
\'LCD\':LCD,
\'buzzer\':buzzer,
\'flame\':flame,
\'rain\':rain,
\'gas\':gas
}
JsonUpdataMsn = aliLink.Alink(updateMsn)
print(JsonUpdataMsn)

mqtt.push(POST,JsonUpdataMsn) # 定时向阿里云IOT推送我们构建好的Alink协议数据

time.sleep(3)

rpi

# 树莓派数据与控制

import os
# Return CPU temperature as a character string

def getCPUtemperature():
res =os.popen(\'vcgencmd measure_temp\').readline()
return(res.replace(\"temp=\",\"\").replace(\"\'C\\n\",\"\"))

# Return RAM information (unit=kb) in a list
# Index 0: total RAM
# Index 1: used RAM
# Index 2: free RAM
def getRAMinfo():
p =os.popen(\'free\')
i =0
while 1:
i =i +1
line =p.readline()
if i==2:
return(line.split()[1:4])

# Return % of CPU used by user as a character string
def getCPUuse():
data = os.popen(\"top -n1 | awk \'/Cpu\\(s\\):/ {print $2}\'\").readline().strip()
return(data)

# Return information about disk space as a list (unit included)
# Index 0: total disk space
# Index 1: used disk space
# Index 2: remaining disk space
# Index 3: percentage of disk used
def getDiskSpace():
p =os.popen(\"df -h /\")
i =0
while True:
i =i +1
line =p.readline()
if i==2:
return(line.split()[1:5])
def powerLed(swatch):
led = open(\'/sys/class/leds/led1/brightness\', \'w\', 1)
led.write(str(swatch))
led.close()

# LED灯状态检测
def getLed():
led = open(\'/sys/class/leds/led1/brightness\', \'r\', 1)
state=led.read()
led.close()
return state

if __name__ == \"__main__\":

# CPU informatiom
CPU_temp =getCPUtemperature()
CPU_usage =getCPUuse()
print(CPU_usage)
# RAM information
# Output is in kb, here I convert it in Mb for readability
RAM_stats =getRAMinfo()

RAM_total = round(int(RAM_stats[0]) /1000,1)
RAM_used = round(int(RAM_stats[1]) /1000,1)
RAM_free = round(int(RAM_stats[2]) /1000,1)
print(RAM_total,RAM_used,RAM_free)
# Disk information
DISK_stats =getDiskSpace()

DISK_total = DISK_stats[0][:-1]
DISK_used = DISK_stats[1][:-1]
DISK_perc = DISK_stats[3][:-1]
print(DISK_total,DISK_used,DISK_perc)

继电器

import RPi.GPIO as GPIO
import time
import asyncio

GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码
GPIO.setwarnings(False) # 忽略GPIO 警告

CH1 = 17
CH2 = 16 #继电器输入信号管脚

GPIO.setup(CH1,GPIO.OUT)
GPIO.setup(CH2,GPIO.OUT)

def open():
GPIO.output(CH1, GPIO.LOW)
time.sleep(10)
GPIO.output(CH1, GPIO.HIGH)

def close():
GPIO.output(CH2, GPIO.LOW)
time.sleep(10)
GPIO.output(CH2, GPIO.HIGH)

阿里云连接

import time,json,random
import hmac,hashlib

def linkiot(DeviceName,ProductKey,DeviceSecret,server = \'iot-as-mqtt.cn-shanghai.aliyuncs.com\'):
serverUrl = server
ClientIdSuffix = \"|securemode=3,signmethod=hmacsha256,timestamp=\"

# 拼合
Times = str(int(time.time())) # 获取登录时间戳
Server = ProductKey+\'.\'+serverUrl # 服务器地址
ClientId = DeviceName + ClientIdSuffix + Times +\'|\' # ClientId
userNmae = DeviceName + \"&\" + ProductKey
PasswdClear = \"clientId\" + DeviceName + \"deviceName\" + DeviceName +\"productKey\"+ProductKey + \"timestamp\" + Times # 明文密码

# 加密
h = hmac.new(bytes(DeviceSecret,encoding= \'UTF-8\'),digestmod=hashlib.sha256) # 使用密钥
h.update(bytes(PasswdClear,encoding = \'UTF-8\'))
Passwd = h.hexdigest()
return Server,ClientId,userNmae,Passwd

# 阿里Alink协议实现(字典传入,json str返回)
def Alink(params):
AlinkJson = {}
AlinkJson[\"id\"] = random.randint(0,999999)
AlinkJson[\"version\"] = \"1.0\"
AlinkJson[\"params\"] = params
AlinkJson[\"method\"] = \"thing.event.property.post\"
return json.dumps(AlinkJson)

if __name__ == \"__main__\":
pass

蜂鸣器

import RPi.GPIO as GPIO
import time

BuzzerPin = 20 # 有源蜂鸣器管脚定义

# GPIO设置函数
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False) # 关闭GPIO警告提示
GPIO.setup(BuzzerPin, GPIO.OUT) # 设置有源蜂鸣器管脚为输出模式
GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鸣器设置为高电平,关闭蜂鸟器

# 打开蜂鸣器
def buzzer_on():
GPIO.output(BuzzerPin, GPIO.LOW) # 蜂鸣器为低电平触发,所以使能蜂鸣器让其发声
# 关闭蜂鸣器
def buzzer_off():
GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鸣器设置为高电平,关闭蜂鸟器

# 控制蜂鸣器鸣叫
def beep(x):
buzzer_on() # 打开蜂鸣器控制
time.sleep(x) # 延时时间
buzzer_off() # 关闭蜂鸣器控制
time.sleep(x) # 延时时间

# 循环函数
def loop():
while True:
beep(1) # 控制蜂鸣器鸣叫,延时时间为500mm

def destroy():
GPIO.output(BuzzerPin, GPIO.HIGH) # 关闭蜂鸣器鸣叫
GPIO.cleanup() # 释放资源

# 程序入口
if __name__ == \'__main__\':
try: # 检测异常
loop() # 调用循环函数
except KeyboardInterrupt: # 当按下Ctrl+C时,将执行destroy()子程序。
destroy() # 释放资源

火焰传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 26 # 火焰传感器数字IO口
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码

# 初始化工作
def setup():
ADC.setup(0x48) # 设置PCF8591模块地址
GPIO.setup(DO, GPIO.IN) # 设置火焰传感器数字IO口为输入模式

# 打印信息,打印出火焰传感器的状态值
def Print(x):
if x == 1: # 安全
print (\'\')
print (\' *******************\')
print (\' * Makerobo Safe~ *\')
print (\' *******************\')
print (\'\')
if x == 0: # 有火焰
print (\'\')
print (\' ******************\')
print (\' * Makerobo Fire! *\')
print (\' ******************\')
print (\'\')

# 功能函数
def fire():
status = 1 # 状态值
# 读取火焰传感器数字IO口
return GPIO.input(DO)

# 程序入口
if __name__ == \'__main__\':
try:
setup() # 初始化
fire()
except KeyboardInterrupt:
pass

烟雾传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 18 # 烟雾传感器数字IO口
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码
GPIO.setwarnings(False) # 忽略GPIO 警告

# 初始化工作
def setup():
ADC.setup(0x48) # 设置PCF8591模块地址
GPIO.setup (DO,GPIO.IN) # 烟雾传感器数字IO口,设置为输入模式

# 打印信息,打印出是否检测到烟雾信息
def Print(x):
if x == 1: # 安全
print (\'\')
print (\' ******************\')
print (\' * Makerobo Safe~ *\')
print (\' ******************\')
print (\'\')
if x == 0: # 检测到烟雾
print (\'\')
print (\' ************************\')
print (\' * Makerobo Danger Gas! *\')
print (\' ************************\')
print (\'\')

# 循环函数
def gas():
setup()
return GPIO.input(DO) # 读取GAS烟雾传感器数字IO口值

# 程序入口
if __name__ == \'__main__\':
setup() # 初始化函数
loop() # 循环函数

LCD1602

import time
import smbus

BUS = smbus.SMBus(1)

# IIC LCD1602 液晶模块写入字
def write_word(addr, data):
global BLEN
temp = data
if BLEN == 1:
temp |= 0x08
else:
temp &= 0xF7
BUS.write_byte(addr ,temp) # 设置IIC LCD1602 液晶模块地址

# IIC LCD1602 发送命令
def send_command(comm):
# 首先发送 bit7-4 位
lcd_buf = comm & 0xF0
lcd_buf |= 0x04 # RS = 0, RW = 0, EN = 1
write_word(LCD_ADDR ,lcd_buf)
time.sleep(0.002)
lcd_buf &= 0xFB # Make EN = 0
write_word(LCD_ADDR ,lcd_buf)

# 其次发送 bit3-0 位
lcd_buf = (comm & 0x0F) << 4
lcd_buf |= 0x04 # RS = 0, RW = 0, EN = 1
write_word(LCD_ADDR ,lcd_buf)
time.sleep(0.002)
lcd_buf &= 0xFB # Make EN = 0
write_word(LCD_ADDR ,lcd_buf)

def send_data(data):
# 首先发送 bit7-4 位
lcd_buf = data & 0xF0
lcd_buf |= 0x05 # RS = 1, RW = 0, EN = 1
write_word(LCD_ADDR ,lcd_buf)
time.sleep(0.002)
lcd_buf &= 0xFB # Make EN = 0
write_word(LCD_ADDR ,lcd_buf)

# 其次发送 bit3-0 位
lcd_buf = (data & 0x0F) << 4
lcd_buf |= 0x05 # RS = 1, RW = 0, EN = 1
write_word(LCD_ADDR ,lcd_buf)
time.sleep(0.002)
lcd_buf &= 0xFB # Make EN = 0
write_word(LCD_ADDR ,lcd_buf)

# IIC LCD1602 初始化
def init(addr, bl):
global LCD_ADDR
global BLEN
LCD_ADDR = addr
BLEN = bl
try:
send_command(0x33) # 必须先初始化到8线模式
time.sleep(0.005)
send_command(0x32) # 然后初始化为4行模式
time.sleep(0.005)
send_command(0x28) # 2 行 & 5*7 点位
time.sleep(0.005)
send_command(0x0C) # 启用无光标显示
time.sleep(0.005)
send_command(0x01) # 清除显示
BUS.write_byte(LCD_ADDR, 0x08)
except:
return False
else:
return True

# LCD 1602 清空显示函数
def clear():
send_command(0x01) # 清除显示

# LCD 1602 使能背光显示
def openlight():
BUS.write_byte(0x27,0x08) # 使能背光显示命令
BUS.close() # 关闭总线

# LCD 1602 显示函数
def write(lcd_x, lcd_y, lcd_str):
# 选择行与列
if lcd_x < 0:
lcd_x = 0
if lcd_x > 15:
lcd_x = 15
if lcd_y <0:
lcd_y = 0
if lcd_y > 1:
lcd_y = 1

# 移动光标
lcd_addr = 0x80 + 0x40 * lcd_y + lcd_x
send_command(lcd_addr) # 发送地址

for chr in lcd_str: # 获取字符串长度
send_data(ord(chr)) # 发送显示

# 程序入口
if __name__ == \'__main__\':
init(0x27, 1) # 初始化显示屏
write(0, 0, \'Hello\') # 在第一行显示Hello
write(0, 1, \'world!\') # 在第二行显示world!

mqtt

#!/usr/bin/python3

# pip install paho-mqtt
import paho.mqtt.client

# =====初始化======
class MQTT():
def __init__(self,host,CcientID,username=None,password=None,port=1883,timeOut=60):
self.Host = host
self.Port = port
self.timeOut = timeOut
self.username =username
self.password = password
self.CcientID = CcientID

self.mqttc = paho.mqtt.client.Client(self.CcientID) #配置ID
if self.username is not None: #判断用户名密码是否为空
self.mqttc.username_pw_set(self.username, self.password) #不为空则配置账号密码

self.mqttc.connect(self.Host, self.Port, self.timeOut) #初始化服务器 IP 端口 超时时间

# 初始化
def begin(self,message,connect):
self.mqttc.on_connect = connect
self.mqttc.on_message = message
self.mqttc.loop_start() # 后台新进程循环监听

# =====发送消息==========
def push(self,tag,date,_Qos = 0):
self.mqttc.publish(tag,date,_Qos)
#print(\'OK\',date)

# =======订阅tips=====
def subscribe(self,_tag):
self.mqttc.subscribe(_tag) #监听标签

PCF8591数模转化模块

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 说明:这是一个PCF8591模块的程序。
# 警告:模拟输入不能超过3.3V!
# 在这个程序中,我们使用电位计进行模拟输入和控制一个模拟电压
# 的LED灯,你可以导入这个程序到另一个程序中使用:
# import PCF8591 as ADC
# ADC.Setup(Address) # 通过 sudo i2cdetect -y -1 可以获取到IIC的地址
# ADC.read(channal) # 通道选择范围为0-3
# ADC.write(Value) # 值的范围为:0-255
#####################################################
import smbus
import time

# 对应比较旧的版本如RPI V1 版本,则 \"bus = smbus.SMBus(0)\"
bus = smbus.SMBus(1)

#通过 sudo i2cdetect -y -1 可以获取到IIC的地址
def setup(Addr):
global address
address = Addr

# 读取模拟量信息
def read(chn): #通道选择,范围是0-3之间
try:
if chn == 0:
bus.write_byte(address,0x40)
if chn == 1:
bus.write_byte(address,0x41)
if chn == 2:
bus.write_byte(address,0x42)
if chn == 3:
bus.write_byte(address,0x43)
bus.read_byte(address) # 开始进行读取转换
except Exception as e:
print (\"Address: %s\" % address)
print (e)
return bus.read_byte(address)

# 模块输出模拟量控制,范围为0-255
def write(val):
try:
temp = val # 将数值赋给temmp 变量
temp = int(temp) # 将字符串转换为整型
# 在终端上打印temp以查看,否则将注释掉
bus.write_byte_data(address, 0x40, temp)
except Exception as e:
print (\"Error: Device address: 0x%2X\" % address)
print (e)

if __name__ == \"__main__\":
setup(0x48)
while True:
print (\'AIN0 = \', read(0))
print (\'AIN1 = \', read(1))
tmp = read(0)
tmp = tmp*(255-125)/255+125 # 低于125时LED不会亮,所以请将“0-255”转换为“125-255”
write(tmp)
# time.sleep(0.3)

雨滴传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 22 # 雨滴传感器数字管脚

GPIO.setmode(GPIO.BCM) # 采用BCM管脚给GPIO口

# GPIO口定义
def setup():
ADC.setup(0x48) # 设置PCF8591模块地址
GPIO.setup(DO, GPIO.IN) # 设置雨滴传感器管脚为输入模式

# 打印出雨滴传感器提示信息
def Print(x):
if x == 1: # 没有雨滴
print (\'\')
print (\' ************************\')
print (\' * Not raining *\')
print (\' ************************\')
print (\'\')
if x == 0: # 有雨滴
print (\'\')
print (\' **********************\')
print (\' * Raining!! *\')
print (\' **********************\')
print (\'\')
# 循环函数
def loop():
status = 1 # 雨滴传感器状态
while True:
print (ADC.read(2)) # 打印出AIN3的模拟量数值

tmp = GPIO.input(DO) # 读取数字IO口电平,读取数字雨滴传感器DO端口
if tmp != status: # 状态发生改变
Print(tmp) # 打印出雨滴传感器检测信息
status = tmp # 状态值重新赋值
time.sleep(0.2) # 延时200ms

# 功能函数
def rain():
status = 1 # 雨滴传感器状态
# 读取数字IO口电平,读取数字雨滴传感器DO端口
return GPIO.input(DO)

# 程序入口
if __name__ == \'__main__\':
try:
setup() # GPIO定义
loop() # 调用循环函数
except KeyboardInterrupt:
pass

来源:https://www.cnblogs.com/kemuling/p/15179033.html
图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 基于树莓派+传感器+阿里云IoT的智能家居管理(代码实现)

相关推荐

  • 暂无文章