Skip to content

Commit

Permalink
Add support for Broadlink A2
Browse files Browse the repository at this point in the history
  • Loading branch information
felipediel committed Apr 11, 2024
1 parent 24b9d30 commit 0090d6e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 26 deletions.
52 changes: 26 additions & 26 deletions broadlink/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class dooya2(Device):

TYPE = "DT360E-2"

def _send(self, operation: int, data: bytes):
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
Expand All @@ -72,22 +72,22 @@ def _send(self, operation: int, data: bytes):
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8

packet += bytes(data)
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8

packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8

resp = self.send_packet(0x6a, packet)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
Expand Down Expand Up @@ -119,7 +119,7 @@ class wser(Device):

TYPE = "WSER"

def _send(self, operation: int, data: bytes):
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
Expand All @@ -128,22 +128,22 @@ def _send(self, operation: int, data: bytes):
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8

packet += bytes(data)
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8

packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8

resp = self.send_packet(0x6a, packet)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
Expand All @@ -156,24 +156,24 @@ def get_position(self) -> int:

def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4a, 0x31, 0xa0])
resp = self._send(2, [0x4A, 0x31, 0xA0])
position = resp[0x0E]
return position

def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xa0])
resp = self._send(2, [0x61, 0x32, 0xA0])
position = resp[0x0E]
return position

def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4c, 0x73, 0xa0])
resp = self._send(2, [0x4C, 0x73, 0xA0])
position = resp[0x0E]
return position

def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xa0])
resp = self._send(2, [position, 0x70, 0xA0])
position = resp[0x0E]
return position
47 changes: 47 additions & 0 deletions broadlink/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,50 @@ def check_sensors_raw(self) -> dict:
"air_quality": data[0x6],
"noise": data[0x8],
}


class a2(Device):
"""Controls a Broadlink A2."""

TYPE = "A2"

def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8

packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8

resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload

def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
resp = self._send(1)
data = resp[0x6:]
return {
"temperature": data[0x0D] * 255 + data[0x0E],
"humidity": data[0x0F] * 255 + data[0x10],
"pm100": data[0x07] * 255 + data[0x08],
"pm25": data[0x09] * 255 + data[0x0A],
"pm10": data[0x0B] * 255 + data[0x0C],
}

0 comments on commit 0090d6e

Please sign in to comment.