diff --git a/drivers/pid_controller/pqwsw2.py b/drivers/pid_controller/pqwsw2.py new file mode 100644 index 0000000000000000000000000000000000000000..e39a0ee653ba58a21b23dc8c10d121c617c5539f --- /dev/null +++ b/drivers/pid_controller/pqwsw2.py @@ -0,0 +1,67 @@ +import requests +import json + +class PQWSW2: + def __init__(self, device_ip): + self.device_ip = device_ip + #test if device actually responds + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + print("Smart Switch Connected Successfully") + self.connected = True + except: + print(f"Device at {device_ip} is not available. Are you connected to the PQWSW2 Wlan?") + self.connected = False + + def connect(self, device_ip): + self.device_ip = device_ip + #test if device actually responds + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + print(f"Device at {device_ip} is not available. Are you connected to the PQWSW2 Wlan?") + + def getPower(self): + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + return -1. + json_response = json.loads(response.text) + return json_response['power'] + + def getTemperature(self): + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + return -1. + json_response = json.loads(response.text) + return json_response['temperature'] + + def getState(self): + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + return None + json_response = json.loads(response.text) + return json_response['relay'] + + def turnOn(self): + url = f"http://{self.device_ip}/relay?state=1" + try: + response = requests.get(url) + except: + print("Communication Error") + + def turnOff(self): + url = f"http://{self.device_ip}/relay?state=0" + try: + response = requests.get(url) + except: + print("Communication Error") + + diff --git a/drivers/pid_controller/rl20001.py b/drivers/pid_controller/rl20001.py new file mode 100644 index 0000000000000000000000000000000000000000..d47393d376d2cd3161b734c9bf45b77c29d11d0e --- /dev/null +++ b/drivers/pid_controller/rl20001.py @@ -0,0 +1,69 @@ +import serial +import time + +class RL20001: + def __init__(self, port): + self.port = port + try: + self.ser = serial.Serial(self.port, baudrate=9600, timeout=2) + print("Numato USB Relay Connected Successfully") + self.connected = True + except: + self.ser = None + print(f"Failed to open Serial Port {self.port}.") + self.connected = False + + def connect(self, port): + self.port = port + try: + self.ser = serial.Serial(self.port, baudrate=9600, timeout=2) + except: + self.ser = None + print(f"Failed to open Serial Port {self.port}.") + + def turnOn(self): + self.ser.write(f"relay on 0\x0D".encode()) + time.sleep(0.1) + self.ser.write(f"relay on 1\x0D".encode()) + time.sleep(0.5) + while(self.ser.in_waiting): + self.ser.read().decode() + + + def turnOff(self): + self.ser.write(f"relay off 0\x0D".encode()) + time.sleep(0.1) + self.ser.write(f"relay off 1\x0D".encode()) + while(self.ser.in_waiting): + self.ser.read().decode() + + def getState(self): + #read relay 0 + self.ser.write(f"relay read 0\x0D".encode()) + time.sleep(0.5) + text = "" + while(self.ser.in_waiting): + text += self.ser.read().decode() + if(len(text.split('\n\r')) < 2): + print("RL20001: Communication Error Detected") + return None + res1 = text.split('\n\r')[1] + #read relay1 + self.ser.write(f"relay read 1\x0D".encode()) + time.sleep(0.5) + text = "" + while(self.ser.in_waiting): + text += self.ser.read().decode() + if(len(text.split('\n\r')) < 2): + print("RL20001: Communication Error Detected") + return None + res2 = text.split('\n\r')[1] + if(res2 == 'on' and res1 == 'on'): + return True + if(res2 == 'off' and res1 == 'off'): + return False + print("RL20001: Inconclusive State Detected") + return None + + def close(self): + self.ser.close() \ No newline at end of file