From 66dd3865c3519ffd2a132e53eb86bca4603b52a7 Mon Sep 17 00:00:00 2001 From: Pascal <engelerp@phys.ethz.ch> Date: Tue, 13 Jun 2023 16:58:40 +0200 Subject: [PATCH] Copied files --- drivers/pid_controller/pqwsw2.py | 67 ++++++++++++++++++++++++++++++ drivers/pid_controller/rl20001.py | 69 +++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 drivers/pid_controller/pqwsw2.py create mode 100644 drivers/pid_controller/rl20001.py diff --git a/drivers/pid_controller/pqwsw2.py b/drivers/pid_controller/pqwsw2.py new file mode 100644 index 0000000..e39a0ee --- /dev/null +++ b/drivers/pid_controller/pqwsw2.py @@ -0,0 +1,67 @@ +import requests +import json + +class PQWSW2: + def __init__(self, device_ip): + self.device_ip = device_ip + #test if device actually responds + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + print("Smart Switch Connected Successfully") + self.connected = True + except: + print(f"Device at {device_ip} is not available. Are you connected to the PQWSW2 Wlan?") + self.connected = False + + def connect(self, device_ip): + self.device_ip = device_ip + #test if device actually responds + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + print(f"Device at {device_ip} is not available. Are you connected to the PQWSW2 Wlan?") + + def getPower(self): + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + return -1. + json_response = json.loads(response.text) + return json_response['power'] + + def getTemperature(self): + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + return -1. + json_response = json.loads(response.text) + return json_response['temperature'] + + def getState(self): + url = f'http://{self.device_ip}/report' + try: + response = requests.get(url) + except: + return None + json_response = json.loads(response.text) + return json_response['relay'] + + def turnOn(self): + url = f"http://{self.device_ip}/relay?state=1" + try: + response = requests.get(url) + except: + print("Communication Error") + + def turnOff(self): + url = f"http://{self.device_ip}/relay?state=0" + try: + response = requests.get(url) + except: + print("Communication Error") + + diff --git a/drivers/pid_controller/rl20001.py b/drivers/pid_controller/rl20001.py new file mode 100644 index 0000000..d47393d --- /dev/null +++ b/drivers/pid_controller/rl20001.py @@ -0,0 +1,69 @@ +import serial +import time + +class RL20001: + def __init__(self, port): + self.port = port + try: + self.ser = serial.Serial(self.port, baudrate=9600, timeout=2) + print("Numato USB Relay Connected Successfully") + self.connected = True + except: + self.ser = None + print(f"Failed to open Serial Port {self.port}.") + self.connected = False + + def connect(self, port): + self.port = port + try: + self.ser = serial.Serial(self.port, baudrate=9600, timeout=2) + except: + self.ser = None + print(f"Failed to open Serial Port {self.port}.") + + def turnOn(self): + self.ser.write(f"relay on 0\x0D".encode()) + time.sleep(0.1) + self.ser.write(f"relay on 1\x0D".encode()) + time.sleep(0.5) + while(self.ser.in_waiting): + self.ser.read().decode() + + + def turnOff(self): + self.ser.write(f"relay off 0\x0D".encode()) + time.sleep(0.1) + self.ser.write(f"relay off 1\x0D".encode()) + while(self.ser.in_waiting): + self.ser.read().decode() + + def getState(self): + #read relay 0 + self.ser.write(f"relay read 0\x0D".encode()) + time.sleep(0.5) + text = "" + while(self.ser.in_waiting): + text += self.ser.read().decode() + if(len(text.split('\n\r')) < 2): + print("RL20001: Communication Error Detected") + return None + res1 = text.split('\n\r')[1] + #read relay1 + self.ser.write(f"relay read 1\x0D".encode()) + time.sleep(0.5) + text = "" + while(self.ser.in_waiting): + text += self.ser.read().decode() + if(len(text.split('\n\r')) < 2): + print("RL20001: Communication Error Detected") + return None + res2 = text.split('\n\r')[1] + if(res2 == 'on' and res1 == 'on'): + return True + if(res2 == 'off' and res1 == 'off'): + return False + print("RL20001: Inconclusive State Detected") + return None + + def close(self): + self.ser.close() \ No newline at end of file -- GitLab