Skip to content
Snippets Groups Projects
Commit 66dd3865 authored by Pascal Engeler's avatar Pascal Engeler
Browse files

Copied files

parent 4d2c99d5
No related branches found
No related tags found
No related merge requests found
import requests
import json
class PQWSW2:
def __init__(self, device_ip):
self.device_ip = device_ip
#test if device actually responds
url = f'http://{self.device_ip}/report'
try:
response = requests.get(url)
print("Smart Switch Connected Successfully")
self.connected = True
except:
print(f"Device at {device_ip} is not available. Are you connected to the PQWSW2 Wlan?")
self.connected = False
def connect(self, device_ip):
self.device_ip = device_ip
#test if device actually responds
url = f'http://{self.device_ip}/report'
try:
response = requests.get(url)
except:
print(f"Device at {device_ip} is not available. Are you connected to the PQWSW2 Wlan?")
def getPower(self):
url = f'http://{self.device_ip}/report'
try:
response = requests.get(url)
except:
return -1.
json_response = json.loads(response.text)
return json_response['power']
def getTemperature(self):
url = f'http://{self.device_ip}/report'
try:
response = requests.get(url)
except:
return -1.
json_response = json.loads(response.text)
return json_response['temperature']
def getState(self):
url = f'http://{self.device_ip}/report'
try:
response = requests.get(url)
except:
return None
json_response = json.loads(response.text)
return json_response['relay']
def turnOn(self):
url = f"http://{self.device_ip}/relay?state=1"
try:
response = requests.get(url)
except:
print("Communication Error")
def turnOff(self):
url = f"http://{self.device_ip}/relay?state=0"
try:
response = requests.get(url)
except:
print("Communication Error")
import serial
import time
class RL20001:
def __init__(self, port):
self.port = port
try:
self.ser = serial.Serial(self.port, baudrate=9600, timeout=2)
print("Numato USB Relay Connected Successfully")
self.connected = True
except:
self.ser = None
print(f"Failed to open Serial Port {self.port}.")
self.connected = False
def connect(self, port):
self.port = port
try:
self.ser = serial.Serial(self.port, baudrate=9600, timeout=2)
except:
self.ser = None
print(f"Failed to open Serial Port {self.port}.")
def turnOn(self):
self.ser.write(f"relay on 0\x0D".encode())
time.sleep(0.1)
self.ser.write(f"relay on 1\x0D".encode())
time.sleep(0.5)
while(self.ser.in_waiting):
self.ser.read().decode()
def turnOff(self):
self.ser.write(f"relay off 0\x0D".encode())
time.sleep(0.1)
self.ser.write(f"relay off 1\x0D".encode())
while(self.ser.in_waiting):
self.ser.read().decode()
def getState(self):
#read relay 0
self.ser.write(f"relay read 0\x0D".encode())
time.sleep(0.5)
text = ""
while(self.ser.in_waiting):
text += self.ser.read().decode()
if(len(text.split('\n\r')) < 2):
print("RL20001: Communication Error Detected")
return None
res1 = text.split('\n\r')[1]
#read relay1
self.ser.write(f"relay read 1\x0D".encode())
time.sleep(0.5)
text = ""
while(self.ser.in_waiting):
text += self.ser.read().decode()
if(len(text.split('\n\r')) < 2):
print("RL20001: Communication Error Detected")
return None
res2 = text.split('\n\r')[1]
if(res2 == 'on' and res1 == 'on'):
return True
if(res2 == 'off' and res1 == 'off'):
return False
print("RL20001: Inconclusive State Detected")
return None
def close(self):
self.ser.close()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment