Update service.py

매세지 브로드캐스팅 기능 추가
This commit is contained in:
gunu3371 2024-07-16 00:31:47 +09:00 committed by GitHub
parent 6e2e523570
commit d588719d44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,9 +3,11 @@ import os
import time
import subprocess
import logging
import threading
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import signal
import socket
import json
import datetime
class GracefulKiller:
kill_now = False
@ -58,7 +60,7 @@ class Logger:
def __init__(self):
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%Z %Y:%m:%d %H:%M:%S')
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%Z %Y/%m/%d %H:%M:%S')
file_handler = RotatingFileHandler('./log/ups.log', maxBytes=1024*1024*10, backupCount=10)
file_handler.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
@ -81,10 +83,108 @@ class Logger:
def crit(self,message):
self.logger.critical(message)
class PowerDB:
def __init__(self):
self.dbname = './powerdb.json'
if not 'data' in json.load(self.dbname):
json.dump({'data':[]},self.dbname)
def offline(self):
db = json.load(self.dbname)
db['data'].append({'time':datetime.datetime.now(datetime.UTC).strftime("%Y/%m/%d %H:%M:%S.%f %Z"),'status':'offline','ACstat':False})
json.dump({'data':db},self.dbname)
def online(self):
db = json.load(self.dbname)
db['data'].append({'time':datetime.datetime.now(datetime.UTC).strftime("%Y/%m/%d %H:%M:%S.%f %Z"),'status':'online','ACstat':True})
json.dump({'data':db},self.dbname)
def lastoff(self):
db = json.load(self.dbname)
for i in reversed(db['data']):
if i['ACstat'] == True:
lof = i['time']
lof = time.strptime(lof, '%Y/%m/%d %H:%M:%S.%f %Z')
ntime = datetime.datetime.now(datetime.UTC)
class Broadcast:
def __init__(self,broadcast_ip='255.255.255.255',port=51547):
self.broadcast_ip = broadcast_ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
def send(self,message:dict):
message = json.dumps(message)
encoded_message = self.__hamming_encode(''.join(format(ord(c), '08b') for c in message))
self.sock.sendto(encoded_message.encode('utf-8'), (self.broadcast_ip, self.port))
@staticmethod
def __hamming_encode(data):
data_bits = [int(bit) for bit in data]
m = len(data_bits)
r = 1
while (2 ** r) < (m + r + 1):
r += 1
hamming_code = [0] * (m + r)
j = 0
for i in range(1, m + r + 1):
if i == 2 ** j:
j += 1
else:
hamming_code[i - 1] = data_bits.pop(0)
for i in range(r):
position = 2 ** i
value = 0
for j in range(1, m + r + 1):
if j & position and j != position:
value ^= hamming_code[j - 1]
hamming_code[position - 1] = value
return ''.join(map(str, hamming_code))
@staticmethod
def __hamming_decode(data):
data_bits = [int(bit) for bit in data]
m = len(data_bits)
r = 0
while (2 ** r) < m:
r += 1
error_pos = 0
for i in range(r):
position = 2 ** i
value = 0
for j in range(1, m + 1):
if j & position:
value ^= data_bits[j - 1]
if value:
error_pos += position
if error_pos:
data_bits[error_pos - 1] ^= 1
decoded_data = []
j = 0
for i in range(1, m + 1):
if i != 2 ** j:
decoded_data.append(data_bits[i - 1])
else:
j += 1
return ''.join(map(str, decoded_data))
killer = GracefulKiller()
log = Logger()
log.info('UPS service is starting')
ups = UPS()
broad = Broadcast()
# ac = PowerDB()
try:
import pwmio
@ -164,5 +264,6 @@ while not killer.kill_now:
time.sleep(0.5)
buz.off()
time.sleep(0.5)
broad.send({'msg':'test'})
buz.off()
log.info('UPS service stopped successfully')