mirror of
https://github.com/gunu3371/m1s_ups_control.git
synced 2024-09-24 21:25:40 +00:00
Compare commits
7 Commits
0399a87f20
...
7a5a74fe33
Author | SHA1 | Date | |
---|---|---|---|
|
7a5a74fe33 | ||
|
7855f04eed | ||
|
f5bbfec17f | ||
|
f168bae171 | ||
|
a889e64c8e | ||
|
6f190062eb | ||
|
39ab6db52f |
@ -2,7 +2,7 @@
|
||||
apt update
|
||||
apt install python3-pip
|
||||
python3 -m pip install -r requirements.txt
|
||||
mkdir -p /etc/m1s_ups/
|
||||
mkdir -p /etc/m1s_ups/log
|
||||
|
||||
cp kill.sh /etc/m1s_ups/
|
||||
cp service.py /etc/m1s_ups/
|
||||
|
166
service.py
166
service.py
@ -4,48 +4,134 @@ import time
|
||||
import subprocess
|
||||
import pwmio
|
||||
import board
|
||||
import logging
|
||||
import threading
|
||||
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
|
||||
import signal
|
||||
|
||||
piezo = pwmio.PWMOut(board.D15, variable_frequency=True)
|
||||
piezo.duty_cycle = 0
|
||||
piezo.frequency = 523
|
||||
class GracefulKiller:
|
||||
kill_now = False
|
||||
def __init__(self):
|
||||
signal.signal(signal.SIGINT, self.exit_gracefully)
|
||||
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
||||
|
||||
ups_loc = subprocess.check_output(['sudo','/bin/bash','kill.sh'])
|
||||
ups_loc = ups_loc.decode("UTF-8")
|
||||
ups_loc = ups_loc.replace("\n", "")
|
||||
ser = serial.Serial(ups_loc, 9600)
|
||||
def exit_gracefully(self, signum, frame):
|
||||
self.kill_now = True
|
||||
log.info('UPS service is being stopped')
|
||||
|
||||
def swr(a):
|
||||
a = bytes(a, 'utf-8')
|
||||
ser.write(a)
|
||||
a = ser.readline().decode("UTF-8")
|
||||
a = a.replace("\n", "")
|
||||
a = a.replace("\r", "")
|
||||
a = a.replace("#", "")
|
||||
a = a.replace("@", "")
|
||||
return(a)
|
||||
class UPS:
|
||||
def __init__(self):
|
||||
ups_loc = subprocess.check_output(['sudo','/bin/bash','kill.sh'])
|
||||
ups_loc = ups_loc.decode("UTF-8")
|
||||
ups_loc = ups_loc.replace("\n", "")
|
||||
self.ser = serial.Serial(ups_loc, 9600)
|
||||
|
||||
def get_firmver(self):
|
||||
return self.__get('@Fx#').replace("F","").replace("-",".")
|
||||
|
||||
def get_curvol(self):
|
||||
return self.__get('@V0#').replace("V","")
|
||||
|
||||
def get_chargestat(self):
|
||||
a = self.__get('@C0#').replace("","")
|
||||
if a == "CF0C0":
|
||||
return "err"
|
||||
elif a == "CF1C0":
|
||||
return "chg"
|
||||
elif a == "CF0C1":
|
||||
return "full"
|
||||
elif a == "CF1C1":
|
||||
return "dch"
|
||||
|
||||
def shutdown(self):
|
||||
self.__get('@Px#')
|
||||
|
||||
def __get(self,a):
|
||||
a = bytes(a, 'utf-8')
|
||||
self.ser.write(a)
|
||||
a = self.ser.readline().decode("UTF-8")
|
||||
a = a.replace("\n", "")
|
||||
a = a.replace("\r", "")
|
||||
a = a.replace("#", "")
|
||||
a = a.replace("@", "")
|
||||
return(a)
|
||||
|
||||
class Logger:
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger()
|
||||
self.logger.setLevel(logging.INFO)
|
||||
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%Z %Y:%m:%d %H:%M:%S')
|
||||
file_handler = RotatingFileHandler('./log/ups.log', maxBytes=1024*1024*10, backupCount=10)
|
||||
file_handler.setFormatter(formatter)
|
||||
consoleHandler = logging.StreamHandler()
|
||||
consoleHandler.setFormatter(formatter)
|
||||
self.logger.addHandler(file_handler)
|
||||
self.logger.addHandler(consoleHandler)
|
||||
|
||||
prt = str()
|
||||
while True:
|
||||
print("------------------------------------------------------")
|
||||
print("------------------------------------------------------")
|
||||
a = swr('@Fx#').replace("F","")
|
||||
print("firmware ver ",a)
|
||||
vol = swr('@V0#').replace("V","")
|
||||
print("current voltage "+vol+"mV")
|
||||
a = swr('@C0#').replace("","")
|
||||
if a != "CF1C1":
|
||||
piezo.duty_cycle = 0
|
||||
print("Power Status Normal")
|
||||
elif a == "CF0C0":
|
||||
print("UPS !ERROR!")
|
||||
else:
|
||||
print("Power Loss Detect")
|
||||
piezo.duty_cycle = 45000
|
||||
if int(vol) <= 3600:
|
||||
ser.write(b'@Px#')
|
||||
def debug(self,message):
|
||||
self.logger.debug(message)
|
||||
|
||||
def info(self,message):
|
||||
self.logger.info(message)
|
||||
|
||||
def warn(self,message):
|
||||
self.logger.warning(message)
|
||||
|
||||
def error(self,message):
|
||||
self.logger.error(message)
|
||||
|
||||
def crit(self,message):
|
||||
self.logger.critical(message)
|
||||
|
||||
class Alarm:
|
||||
def __init__(self):
|
||||
self.piezo = pwmio.PWMOut(board.D15, variable_frequency=True)
|
||||
self.piezo.duty_cycle = 0
|
||||
self.piezo.frequency = 523
|
||||
|
||||
def on(self):
|
||||
self.piezo.duty_cycle = 50000
|
||||
|
||||
def off(self):
|
||||
self.piezo.duty_cycle = 0
|
||||
|
||||
killer = GracefulKiller()
|
||||
log = Logger()
|
||||
log.info('UPS service is starting')
|
||||
ups = UPS()
|
||||
buz = Alarm()
|
||||
|
||||
buz.on()
|
||||
log.info('UPS service started successfully')
|
||||
log.info(f"UPS Firmware Ver {ups.get_firmver()}")
|
||||
time.sleep(0.3)
|
||||
buz.off()
|
||||
|
||||
while not killer.kill_now:
|
||||
ps = ups.get_chargestat()
|
||||
|
||||
if ps == "full":
|
||||
log.info("AC power active")
|
||||
elif ps == "chg":
|
||||
log.info('AC power restored Charging')
|
||||
elif ps == "dch":
|
||||
buz.on()
|
||||
log.warn('AC power loss')
|
||||
pw = int(ups.get_curvol())
|
||||
if pw <= 3500:
|
||||
log.crit('battery is too low Shut down system')
|
||||
ups.shutdown()
|
||||
os.system("shutdown now")
|
||||
print("------------------------------------------------------")
|
||||
print("------------------------------------------------------")
|
||||
time.sleep(1)
|
||||
piezo.duty_cycle = 0
|
||||
time.sleep(1)
|
||||
elif pw <= 3650:
|
||||
log.crit('battery is too low')
|
||||
elif pw <= 3800:
|
||||
log.warn('battery is low')
|
||||
elif ps == "err":
|
||||
log.crit('UPS hardware error detect')
|
||||
|
||||
log.info("Current Voltage "+ups.get_curvol()+"mV")
|
||||
time.sleep(0.5)
|
||||
buz.off()
|
||||
time.sleep(0.5)
|
||||
buz.off()
|
||||
log.info('UPS service stopped successfully')
|
||||
|
Loading…
Reference in New Issue
Block a user