Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
4 changes: 4 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import pylontech

p = pylontech.Pylontech(serial_port='COM3', baudrate=9600)
print(p.get_values())
35 changes: 28 additions & 7 deletions pylontech/pylontech.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
import logging
import serial
import construct
import logging
import time

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s:%(funcName)s:%(lineno)d - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -171,17 +178,21 @@ def get_info_length(info: bytes) -> int:

def send_cmd(self, address: int, cmd, info: bytes = b''):
raw_frame = self._encode_cmd(address, cmd, info)
logger.debug(f"{raw_frame=}")
self.s.write(raw_frame)


def _encode_cmd(self, address: int, cid2: int, info: bytes = b''):
ver = 0x20
cid1 = 0x46

info_length = Pylontech.get_info_length(info)

frame = "{:02X}{:02X}{:02X}{:02X}{:04X}".format(0x20, address, cid1, cid2, info_length).encode()
frame = "{:02X}{:02X}{:02X}{:02X}{:04X}".format(ver, address, cid1, cid2, info_length).encode()
frame += info

logger.info(f"VER=%s; ADDR=%s; CID1={cid1:02X}H; CID2={cid2:02X}H; frame.INFO=%s", ver, address, info.decode())

frame_chksum = Pylontech.get_frame_checksum(frame)
whole_frame = (b"~" + frame + "{:04X}".format(frame_chksum).encode() + b"\r")
return whole_frame
Expand Down Expand Up @@ -212,9 +223,15 @@ def _decode_frame(self, frame):

def read_frame(self):
raw_frame = self.s.readline()
f = self._decode_hw_frame(raw_frame=raw_frame)
parsed = self._decode_frame(f)
return parsed
logger.debug(f"{raw_frame=}")
chk_frame = self._decode_hw_frame(raw_frame=raw_frame)
logger.debug(f"{chk_frame=}")
dec_frame = self._decode_frame(chk_frame)
data = b'\x05\xff\x0a';
logger.info(f"VER=%s; ADDR=%s; CID1={dec_frame.cid1[0]:02X}H; CID2={dec_frame.cid2[0]:02X}H; frame.INFO=%s", ord(dec_frame.ver), ord(dec_frame.adr), dec_frame.info.hex(" ").upper())

logger.debug(f"{dec_frame=}")
return dec_frame


def scan_for_batteries(self, start=0, end=255) -> Dict[int, str]:
Expand Down Expand Up @@ -284,9 +301,13 @@ def get_values(self):
self.send_cmd(2, 0x42, b'FF')
f = self.read_frame()

# infoflag = f.info[0]
d = self.get_values_fmt.parse(f.info[1:])
return d
if f.cid2 == b'\x00':
# infoflag = f.info[0]
d = self.get_values_fmt.parse(f.info[1:])
return d
else:
return -1
return -2

def get_values_single(self, dev_id):
bdevid = "{:02X}".format(dev_id).encode()
Expand Down