From 8e320e23ab5edc78efd81034c25c0a9c220c6756 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Wed, 2 Dec 2020 17:26:16 +0800 Subject: [PATCH 01/18] Add UnPackV5::connect --- src/Packet/UnPackV5.php | 168 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 src/Packet/UnPackV5.php diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php new file mode 100644 index 0000000..4037310 --- /dev/null +++ b/src/Packet/UnPackV5.php @@ -0,0 +1,168 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +declare(strict_types=1); + +namespace Simps\MQTT\Packet; + +use Simps\MQTT\Exception\LengthException; +use Simps\MQTT\Types; + +class UnPackV5 +{ + public static function connect(string $remaining): array + { + $protocolName = static::string($remaining); + $protocolLevel = ord($remaining[0]); + $cleanSession = ord($remaining[1]) >> 1 & 0x1; + $willFlag = ord($remaining[1]) >> 2 & 0x1; + $willQos = ord($remaining[1]) >> 3 & 0x3; + $willRetain = ord($remaining[1]) >> 5 & 0x1; + $passwordFlag = ord($remaining[1]) >> 6 & 0x1; + $userNameFlag = ord($remaining[1]) >> 7 & 0x1; + $remaining = substr($remaining, 2); + $keepAlive = static::shortInt($remaining); + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + $sessionExpiryIntervalFlag = ord($remaining[0]) & ~0x11; + if ($sessionExpiryIntervalFlag === 0) { + $remaining = substr($remaining, 1); + $sessionExpiryInterval = static::longInt($remaining); + } + $receiveMaximumFlag = ord($remaining[0]) & ~0x21; + if ($receiveMaximumFlag === 0) { + $remaining = substr($remaining, 1); + $receiveMaximum = static::shortInt($remaining); + } + $receiveMaximumFlag = ord($remaining[0]) & ~0x22; + if ($receiveMaximumFlag === 0) { + $remaining = substr($remaining, 1); + $topicAliasMaximum = static::shortInt($remaining); + } + } + $clientId = static::string($remaining); + if ($willFlag) { + $willPropertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($willPropertiesTotalLength) { + $willDelayIntervalFlag = ord($remaining[0]) & ~0x18; + if ($willDelayIntervalFlag === 0) { + $remaining = substr($remaining, 1); + $willDelayInterval = static::longInt($remaining); + } + $messageExpiryIntervalFlag = ord($remaining[0]) & ~0x02; + if ($messageExpiryIntervalFlag === 0) { + $remaining = substr($remaining, 1); + $messageExpiryInterval = static::longInt($remaining); + } + $contentTypeFlag = ord($remaining[0]) & ~0x03; + if ($contentTypeFlag === 0) { + $remaining = substr($remaining, 1); + $contentType = static::string($remaining); + } + $payloadFormatIndicatorFlag = ord($remaining[0]) & ~0x01; + if ($payloadFormatIndicatorFlag === 0) { + $payloadFormatIndicator = ord($remaining[1]); + $remaining = substr($remaining, 2); + } + } + $willTopic = static::string($remaining); + $willMessage = static::string($remaining); + } + $userName = $password = ''; + if ($userNameFlag) { + $userName = static::string($remaining); + } + if ($passwordFlag) { + $password = static::string($remaining); + } + $package = [ + 'type' => Types::CONNECT, + 'protocol_name' => $protocolName, + 'protocol_level' => $protocolLevel, + 'clean_session' => $cleanSession, + 'will' => [], + 'user_name' => $userName, + 'password' => $password, + 'keep_alive' => $keepAlive, + ]; + + if ($propertiesTotalLength) { + if ($sessionExpiryIntervalFlag === 0) { + $package['session_expiry_interval'] = $sessionExpiryInterval; + } + if ($receiveMaximumFlag === 0) { + $package['receive_maximum'] = $receiveMaximum; + } + if ($receiveMaximumFlag === 0) { + $package['topic_alias_aximum'] = $topicAliasMaximum; + } + } + + $package['client_id'] = $clientId; + + if ($willFlag) { + if ($willPropertiesTotalLength) { + if ($willDelayIntervalFlag === 0) { + $package['will']['will_delay_interval'] = $willDelayInterval; + } + if ($messageExpiryIntervalFlag === 0) { + $package['will']['message_expiry_interval'] = $messageExpiryInterval; + } + if ($contentTypeFlag === 0) { + $package['will']['content_type'] = $contentType; + } + if ($payloadFormatIndicatorFlag === 0) { + $package['will']['payload_format_indicator'] = $payloadFormatIndicator; + } + } + $package['will'] += [ + 'qos' => $willQos, + 'retain' => $willRetain, + 'topic' => $willTopic, + 'message' => $willMessage, + ]; + } else { + unset($package['will']); + } + + return $package; + } + + private static function string(&$remaining) + { + $length = unpack('n', $remaining)[1]; + if ($length + 2 > strlen($remaining)) { + throw new LengthException("unpack remaining length error, get {$length}"); + } + $string = substr($remaining, 2, $length); + $remaining = substr($remaining, $length + 2); + + return $string; + } + + private static function shortInt(&$remaining) + { + $tmp = unpack('n', $remaining); + $remaining = substr($remaining, 2); + + return $tmp[1]; + } + + private static function longInt(&$remaining) + { + $tmp = unpack('N', $remaining); + $remaining = substr($remaining, 4); + + return $tmp[1]; + } +} From eca5a82cbda1ccf1af6a3183c1f0a5e5ce66b205 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Thu, 3 Dec 2020 10:22:47 +0800 Subject: [PATCH 02/18] Add AUTH type --- src/Types.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Types.php b/src/Types.php index 9828f17..bd558b0 100644 --- a/src/Types.php +++ b/src/Types.php @@ -42,4 +42,6 @@ class Types const PINGRESP = 13; // PING response const DISCONNECT = 14; // Client is disconnecting + + const AUTH = 15; // Authentication exchange } From 7c45ca2ff2f501be6e29340474e6abf54e1de817 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Thu, 3 Dec 2020 10:23:14 +0800 Subject: [PATCH 03/18] Add Pack connAck --- src/Packet/PackV5.php | 95 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 src/Packet/PackV5.php diff --git a/src/Packet/PackV5.php b/src/Packet/PackV5.php new file mode 100644 index 0000000..b06f039 --- /dev/null +++ b/src/Packet/PackV5.php @@ -0,0 +1,95 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +declare(strict_types=1); + +namespace Simps\MQTT\Packet; + +use Simps\MQTT\Types; + +class PackV5 +{ + public static function connAck(array $array): string + { + $body = !empty($array['session_present']) ? chr(1) : chr(0); + $code = !empty($array['code']) ? $array['code'] : 0; + $body .= chr($code); + if (!empty($array['maximum_packet_size'])) { + $body .= pack('N', $array['maximum_packet_size']); + } + $retain_available = 0; + if (!isset($array['retain_available']) || !empty($array['retain_available'])) { + $retain_available = 1; + } + $body .= chr($retain_available); + $shared_subscription_available = 0; + if (!isset($array['shared_subscription_available']) || !empty($array['shared_subscription_available'])) { + $shared_subscription_available = 1; + } + $body .= chr($shared_subscription_available); + $topic_alias_maximum = 0; + if (isset($array['topic_alias_maximum'])) { + $topic_alias_maximum = $array['topic_alias_maximum']; + } + $body .= pack('n', $topic_alias_maximum); + $subscription_identifier_available = 0; + if (!isset($array['subscription_identifier_available']) || !empty($array['subscription_identifier_available'])) { + $subscription_identifier_available = 1; + } + $body .= chr($subscription_identifier_available); + $wildcard_subscription_available = 0; + if (!isset($array['wildcard_subscription_available']) || !empty($array['wildcard_subscription_available'])) { + $wildcard_subscription_available = 1; + } + $body .= chr($wildcard_subscription_available); + $head = static::packHeader(Types::CONNACK, strlen($body)); + + return $head . $body; + } + + private static function string(string $str) + { + $len = strlen($str); + + return pack('n', $len) . $str; + } + + public static function packHeader(int $type, int $bodyLength, int $dup = 0, int $qos = 0, int $retain = 0): string + { + $type = $type << 4; + if ($dup) { + $type |= 1 << 3; + } + if ($qos) { + $type |= $qos << 1; + } + if ($retain) { + $type |= 1; + } + + return chr($type) . static::packRemainingLength($bodyLength); + } + + private static function packRemainingLength(int $bodyLength) + { + $string = ''; + do { + $digit = $bodyLength % 128; + $bodyLength = $bodyLength >> 7; + if ($bodyLength > 0) { + $digit = ($digit | 0x80); + } + $string .= chr($digit); + } while ($bodyLength > 0); + + return $string; + } +} From 0d3de13cbb43fa3320411b86d95eefda6cb36c52 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Thu, 3 Dec 2020 10:23:36 +0800 Subject: [PATCH 04/18] Add ProtocolV5 --- src/ProtocolV5.php | 173 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 src/ProtocolV5.php diff --git a/src/ProtocolV5.php b/src/ProtocolV5.php new file mode 100644 index 0000000..333990a --- /dev/null +++ b/src/ProtocolV5.php @@ -0,0 +1,173 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +declare(strict_types=1); + +namespace Simps\MQTT; + +use Simps\MQTT\Exception\InvalidArgumentException; +use Simps\MQTT\Exception\LengthException; +use Simps\MQTT\Exception\RuntimeException; +use Simps\MQTT\Packet\PackV5; +use Simps\MQTT\Packet\UnPackV5; +use Throwable; +use TypeError; + +class ProtocolV5 +{ + public static function pack(array $array) + { + try { + $type = $array['type']; + switch ($type) { + case Types::CONNECT: + $package = PackV5::connect($array); + break; + case Types::CONNACK: + $package = PackV5::connAck($array); + break; + case Types::PUBLISH: + $package = PackV5::publish($array); + break; + case Types::PUBACK: + case Types::PUBREC: + case Types::PUBREL: + case Types::PUBCOMP: + case Types::UNSUBACK: + $body = pack('n', $array['message_id']); + if ($type === Types::PUBREL) { + $head = PackV5::packHeader($type, strlen($body), 0, 1); + } else { + $head = PackV5::packHeader($type, strlen($body)); + } + $package = $head . $body; + break; + case Types::SUBSCRIBE: + $package = PackV5::subscribe($array); + break; + case Types::SUBACK: + $package = PackV5::subAck($array); + break; + case Types::UNSUBSCRIBE: + $package = PackV5::unSubscribe($array); + break; + case Types::PINGREQ: + case Types::PINGRESP: + case Types::DISCONNECT: + $package = PackV5::packHeader($type, 0); + break; + default: + throw new InvalidArgumentException('MQTT Type not exist'); + } + } catch (TypeError $e) { + throw new RuntimeException($e->getMessage(), $e->getCode()); + } catch (Throwable $e) { + throw new RuntimeException($e->getMessage(), $e->getCode()); + } + + return $package; + } + + public static function unpack(string $data) + { + try { + $type = static::getType($data); + $remaining = static::getRemaining($data); + switch ($type) { + case Types::CONNECT: + $package = UnPackV5::connect($remaining); + break; + case Types::CONNACK: + $package = UnPackV5::connAck($remaining); + break; + case Types::PUBLISH: + $dup = ord($data[0]) >> 3 & 0x1; + $qos = ord($data[0]) >> 1 & 0x3; + $retain = ord($data[0]) & 0x1; + $package = UnPackV5::publish($dup, $qos, $retain, $remaining); + break; + case Types::PUBACK: + case Types::PUBREC: + case Types::PUBREL: + case Types::PUBCOMP: + case Types::UNSUBACK: + $package = ['type' => $type, 'message_id' => unpack('n', $remaining)[1]]; + break; + case Types::PINGREQ: + case Types::PINGRESP: + case Types::DISCONNECT: + $package = ['type' => $type]; + break; + case Types::SUBSCRIBE: + $package = UnPackV5::subscribe($remaining); + break; + case Types::SUBACK: + $package = UnPackV5::subAck($remaining); + break; + case Types::UNSUBSCRIBE: + $package = UnPackV5::unSubscribe($remaining); + break; + default: + $package = []; + } + } catch (TypeError $e) { + throw new RuntimeException($e->getMessage(), $e->getCode()); + } catch (Throwable $e) { + throw new RuntimeException($e->getMessage(), $e->getCode()); + } + + return $package; + } + + public static function getType(string $data) + { + return ord($data[0]) >> 4; + } + + public static function getRemainingLength(string $data, &$headBytes) + { + $headBytes = $multiplier = 1; + $value = 0; + do { + if (!isset($data[$headBytes])) { + throw new LengthException('Malformed Remaining Length'); + } + $digit = ord($data[$headBytes]); + $value += ($digit & 127) * $multiplier; + $multiplier *= 128; + ++$headBytes; + } while (($digit & 128) != 0); + + return $value; + } + + public static function getRemaining(string $data) + { + $remainingLength = static::getRemainingLength($data, $headBytes); + + return substr($data, $headBytes, $remainingLength); + } + + public static function printf(string $data) + { + echo "\033[36m"; + for ($i = 0; $i < strlen($data); $i++) { + $ascii = ord($data[$i]); + if ($ascii > 31) { + $chr = $data[$i]; + } else { + $chr = ' '; + } + printf("%4d: %08b : 0x%02x : %d : %s\n", $i, $ascii, $ascii, $ascii, $chr); + } + echo "\033[0m"; + } +} From c5224168e03a050653356e3d0582c00ee33353ed Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Tue, 8 Dec 2020 14:32:38 +0800 Subject: [PATCH 05/18] Fix typo --- src/Packet/UnPackV5.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php index 4037310..401a864 100644 --- a/src/Packet/UnPackV5.php +++ b/src/Packet/UnPackV5.php @@ -43,8 +43,8 @@ public static function connect(string $remaining): array $remaining = substr($remaining, 1); $receiveMaximum = static::shortInt($remaining); } - $receiveMaximumFlag = ord($remaining[0]) & ~0x22; - if ($receiveMaximumFlag === 0) { + $topicAliasMaximumFlag = ord($remaining[0]) & ~0x22; + if ($topicAliasMaximumFlag === 0) { $remaining = substr($remaining, 1); $topicAliasMaximum = static::shortInt($remaining); } From 91dbc1a5aa2e10c762140356ffa65641b7ef1706 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Sun, 13 Dec 2020 12:18:00 +0800 Subject: [PATCH 06/18] Add Hex/Property and Hex/ReasonCode --- src/Hex/Property.php | 74 +++++++++++++++++++++++++++ src/Hex/ReasonCode.php | 110 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 src/Hex/Property.php create mode 100644 src/Hex/ReasonCode.php diff --git a/src/Hex/Property.php b/src/Hex/Property.php new file mode 100644 index 0000000..e9c3a74 --- /dev/null +++ b/src/Hex/Property.php @@ -0,0 +1,74 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +declare(strict_types=1); + +namespace Simps\MQTT\Hex; + +/** + * @see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901029 + */ +class Property +{ + const PAYLOAD_FORMAT_INDICATOR = 0x01; + + const MESSAGE_EXPIRY_INTERVAL = 0x02; + + const CONTENT_TYPE = 0x03; + + const RESPONSE_TOPIC = 0x08; + + const CORRELATION_DATA = 0x09; + + const SUBSCRIPTION_IDENTIFIER = 0x0B; + + const SESSION_EXPIRY_INTERVAL = 0x11; + + const ASSIGNED_CLIENT_IDENTIFIER = 0x12; + + const SERVER_KEEP_ALIVE = 0x13; + + const AUTHENTICATION_METHOD = 0x15; + + const AUTHENTICATION_DATA = 0x16; + + const REQUEST_PROBLEM_INFORMATION = 0x17; + + const WILL_DELAY_INTERVAL = 0x18; + + const REQUEST_RESPONSE_INFORMATION = 0x19; + + const RESPONSE_INFORMATION = 0x1A; + + const SERVER_REFERENCE = 0x1C; + + const REASON_STRING = 0x1F; + + const RECEIVE_MAXIMUM = 0x21; + + const TOPIC_ALIAS_MAXIMUM = 0x22; + + const TOPIC_ALIAS = 0x23; + + const MAXIMUM_QOS = 0x24; + + const RETAIN_AVAILABLE = 0x25; + + const USER_PROPERTY = 0x26; + + const MAXIMUM_PACKET_SIZE = 0x27; + + const WILDCARD_SUBSCRIPTION_AVAILABLE = 0x28; + + const SUBSCRIPTION_IDENTIFIER_AVAILABLE = 0x29; + + const SHARED_SUBSCRIPTION_AVAILABLE = 0x2A; +} diff --git a/src/Hex/ReasonCode.php b/src/Hex/ReasonCode.php new file mode 100644 index 0000000..8cd63fb --- /dev/null +++ b/src/Hex/ReasonCode.php @@ -0,0 +1,110 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +declare(strict_types=1); + +namespace Simps\MQTT\Hex; + +/** + * @see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901031 + */ +class ReasonCode +{ + const SUCCESS = 0x00; + + const NORMAL_DISCONNECTION = 0x00; + + const GRANTED_QOS_0 = 0x00; + + const GRANTED_QOS_1 = 0x01; + + const GRANTED_QOS_2 = 0x02; + + const DISCONNECT_WITH_WILL_MESSAGE = 0x04; + + const NO_MATCHING_SUBSCRIBERS = 0x10; + + const NO_SUBSCRIPTION_EXISTED = 0x11; + + const CONTINUE_AUTHENTICATION = 0x18; + + const RE_AUTHENTICATE = 0x19; + + const UNSPECIFIED_ERROR = 0x80; + + const MALFORMED_PACKET = 0x81; + + const PROTOCOL_ERROR = 0x82; + + const IMPLEMENTATION_SPECIFIC_ERROR = 0x83; + + const UNSUPPORTED_PROTOCOL_VERSION = 0x84; + + const CLIENT_IDENTIFIER_NOT_VALID = 0x85; + + const BAD_USER_NAME_OR_PASSWORD = 0x86; + + const NOT_AUTHORIZED = 0x87; + + const SERVER_UNAVAILABLE = 0x88; + + const SERVER_BUSY = 0x89; + + const BANNED = 0x8A; + + const SERVER_SHUTTING_DOWN = 0x8B; + + const BAD_AUTHENTICATION_METHOD = 0x8C; + + const KEEP_ALIVE_TIMEOUT = 0x8D; + + const SESSION_TAKEN_OVER = 0x8E; + + const TOPIC_FILTER_INVALID = 0x8F; + + const TOPIC_NAME_INVALID = 0x90; + + const PACKET_IDENTIFIER_IN_USE = 0x91; + + const PACKET_IDENTIFIER_NOT_FOUND = 0x92; + + const RECEIVE_MAXIMUM_EXCEEDED = 0x93; + + const TOPIC_ALIAS_INVALID = 0x94; + + const PACKET_TOO_LARGE = 0x95; + + const MESSAGE_RATE_TOO_HIGH = 0x96; + + const QUOTA_EXCEEDED = 0x97; + + const ADMINISTRATIVE_ACTION = 0x98; + + const PAYLOAD_FORMAT_INVALID = 0x99; + + const RETAIN_NOT_SUPPORTED = 0x9A; + + const QOS_NOT_SUPPORTED = 0x9B; + + const USE_ANOTHER_SERVER = 0x9C; + + const SERVER_MOVED = 0x9D; + + const SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 0x9E; + + const CONNECTION_RATE_EXCEEDED = 0x9F; + + const MAXIMUM_CONNECT_TIME = 0xA0; + + const SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 0xA1; + + const WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xA2; +} From 4568a89f847c4c62ab9eb82492f890930130db22 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Sun, 13 Dec 2020 17:21:08 +0800 Subject: [PATCH 07/18] Add getReasonPhrases and getReasonPhrase to ReasonCode --- src/Hex/ReasonCode.php | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/src/Hex/ReasonCode.php b/src/Hex/ReasonCode.php index 8cd63fb..c19774e 100644 --- a/src/Hex/ReasonCode.php +++ b/src/Hex/ReasonCode.php @@ -16,7 +16,7 @@ /** * @see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901031 */ -class ReasonCode +abstract class ReasonCode { const SUCCESS = 0x00; @@ -107,4 +107,42 @@ class ReasonCode const SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 0xA1; const WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xA2; + + /** + * @see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901079 + */ + protected static $reasonPhrases = [ + self::SUCCESS => 'Success', + self::UNSPECIFIED_ERROR => 'Unspecified error', + self::MALFORMED_PACKET => 'Malformed Packet', + self::PROTOCOL_ERROR => 'Protocol Error', + self::IMPLEMENTATION_SPECIFIC_ERROR => 'Implementation specific error', + self::UNSUPPORTED_PROTOCOL_VERSION => 'Unsupported Protocol Version', + self::CLIENT_IDENTIFIER_NOT_VALID => 'Client Identifier not valid', + self::BAD_USER_NAME_OR_PASSWORD => 'Bad User Name or Password', + self::NOT_AUTHORIZED => 'Not authorized', + self::SERVER_UNAVAILABLE => 'Server unavailable', + self::SERVER_BUSY => 'Server busy', + self::BANNED => 'Banned', + self::BAD_AUTHENTICATION_METHOD => 'Bad authentication method', + self::TOPIC_NAME_INVALID => 'Topic Name invalid', + self::PACKET_TOO_LARGE => 'Packet too large', + self::QUOTA_EXCEEDED => 'Quota exceeded', + self::PAYLOAD_FORMAT_INVALID => 'Payload format invalid', + self::RETAIN_NOT_SUPPORTED => 'Retain not supported', + self::QOS_NOT_SUPPORTED => 'QoS not supported', + self::USE_ANOTHER_SERVER => 'Use another server', + self::SERVER_MOVED => 'Server moved', + self::CONNECTION_RATE_EXCEEDED => 'Connection rate exceeded', + ]; + + public static function getReasonPhrases(): array + { + return static::$reasonPhrases; + } + + public static function getReasonPhrase(int $value): string + { + return static::$reasonPhrases[$value] ?? 'Unknown'; + } } From ab61e2701288fe8680db03e61ebd4fe78054d024 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Sun, 13 Dec 2020 18:36:51 +0800 Subject: [PATCH 08/18] Add connect and connAck --- src/Packet/PackV5.php | 200 ++++++++++++++++++++++++++++++++++------ src/Packet/UnPackV5.php | 104 ++++++++++++++++++--- 2 files changed, 262 insertions(+), 42 deletions(-) diff --git a/src/Packet/PackV5.php b/src/Packet/PackV5.php index b06f039..fa769d9 100644 --- a/src/Packet/PackV5.php +++ b/src/Packet/PackV5.php @@ -13,43 +13,177 @@ namespace Simps\MQTT\Packet; +use Simps\MQTT\Hex\Property; use Simps\MQTT\Types; class PackV5 { + public static function connect(array $array): string + { + $body = static::string($array['protocol_name']) . chr($array['protocol_level']); + $connectFlags = 0; + if (!empty($array['clean_session'])) { + $connectFlags |= 1 << 1; + } + if (!empty($array['will'])) { + $connectFlags |= 1 << 2; + $connectFlags |= $array['will']['qos'] << 3; + if ($array['will']['retain']) { + $connectFlags |= 1 << 5; + } + } + if (!empty($array['password'])) { + $connectFlags |= 1 << 6; + } + if (!empty($array['user_name'])) { + $connectFlags |= 1 << 7; + } + $body .= chr($connectFlags); + + $keepAlive = !empty($array['keep_alive']) && (int) $array['keep_alive'] >= 0 ? (int) $array['keep_alive'] : 0; + $body .= pack('n', $keepAlive); + + $propertiesTotalLength = 0; + if (!empty($array['properties']['session_expiry_interval'])) { + $propertiesTotalLength += 5; + } + if (!empty($array['properties']['receive_maximum'])) { + $propertiesTotalLength += 3; + } + if (!empty($array['properties']['topic_alias_maximum'])) { + $propertiesTotalLength += 3; + } + $body .= chr($propertiesTotalLength); + + if (!empty($array['properties']['session_expiry_interval'])) { + $body .= chr(Property::SESSION_EXPIRY_INTERVAL); + $body .= static::longInt($array['properties']['session_expiry_interval']); + } + if (!empty($array['properties']['receive_maximum'])) { + $body .= chr(Property::RECEIVE_MAXIMUM); + $body .= static::shortInt($array['properties']['receive_maximum']); + } + if (!empty($array['properties']['topic_alias_maximum'])) { + $body .= chr(Property::TOPIC_ALIAS_MAXIMUM); + $body .= static::shortInt($array['properties']['topic_alias_maximum']); + } + + $body .= static::string($array['client_id']); + if (!empty($array['will'])) { + $willPropertiesTotalLength = 0; + if (!empty($array['will']['properties']['will_delay_interval'])) { + $willPropertiesTotalLength += 5; + } + if (!empty($array['will']['properties']['message_expiry_interval'])) { + $willPropertiesTotalLength += 5; + } + if (!empty($array['will']['properties']['content_type'])) { + $willPropertiesTotalLength += 3; + $willPropertiesTotalLength += strlen($array['will']['properties']['content_type']); + } + if (isset($array['will']['properties']['payload_format_indicator'])) { + $willPropertiesTotalLength += 2; + } + $body .= chr($willPropertiesTotalLength); + + if (!empty($array['will']['properties']['will_delay_interval'])) { + $body .= chr(Property::WILL_DELAY_INTERVAL); + $body .= static::longInt($array['will']['properties']['will_delay_interval']); + } + if (!empty($array['will']['properties']['message_expiry_interval'])) { + $body .= chr(Property::MESSAGE_EXPIRY_INTERVAL); + $body .= static::longInt($array['will']['properties']['message_expiry_interval']); + } + if (!empty($array['will']['properties']['content_type'])) { + $body .= chr(Property::CONTENT_TYPE); + $body .= static::string($array['will']['properties']['content_type']); + } + if (isset($array['will']['properties']['payload_format_indicator'])) { + $body .= chr(Property::PAYLOAD_FORMAT_INDICATOR); + $body .= chr((int) $array['will']['properties']['payload_format_indicator']); + } + + $body .= static::string($array['will']['topic']); + $body .= static::string($array['will']['message']); + } + if (!empty($array['user_name'])) { + $body .= static::string($array['user_name']); + } + if (!empty($array['password'])) { + $body .= static::string($array['password']); + } + $head = static::packHeader(Types::CONNECT, strlen($body)); + + return $head . $body; + } + public static function connAck(array $array): string { $body = !empty($array['session_present']) ? chr(1) : chr(0); $code = !empty($array['code']) ? $array['code'] : 0; $body .= chr($code); - if (!empty($array['maximum_packet_size'])) { - $body .= pack('N', $array['maximum_packet_size']); - } - $retain_available = 0; - if (!isset($array['retain_available']) || !empty($array['retain_available'])) { - $retain_available = 1; - } - $body .= chr($retain_available); - $shared_subscription_available = 0; - if (!isset($array['shared_subscription_available']) || !empty($array['shared_subscription_available'])) { - $shared_subscription_available = 1; - } - $body .= chr($shared_subscription_available); - $topic_alias_maximum = 0; - if (isset($array['topic_alias_maximum'])) { - $topic_alias_maximum = $array['topic_alias_maximum']; - } - $body .= pack('n', $topic_alias_maximum); - $subscription_identifier_available = 0; - if (!isset($array['subscription_identifier_available']) || !empty($array['subscription_identifier_available'])) { - $subscription_identifier_available = 1; - } - $body .= chr($subscription_identifier_available); - $wildcard_subscription_available = 0; - if (!isset($array['wildcard_subscription_available']) || !empty($array['wildcard_subscription_available'])) { - $wildcard_subscription_available = 1; - } - $body .= chr($wildcard_subscription_available); + + $propertiesTotalLength = 0; + if (!empty($array['properties']['maximum_packet_size'])) { + $propertiesTotalLength += 5; + } + if (!isset($array['properties']['retain_available']) || !empty($array['properties']['retain_available'])) { + $propertiesTotalLength += 2; + } + if (!isset($array['properties']['shared_subscription_available']) || !empty($array['properties']['shared_subscription_available'])) { + $propertiesTotalLength += 2; + } + if (!isset($array['properties']['subscription_identifier_available']) || !empty($array['properties']['subscription_identifier_available'])) { + $propertiesTotalLength += 2; + } + if (isset($array['properties']['topic_alias_maximum'])) { + $propertiesTotalLength += 3; + } + if (!isset($array['properties']['wildcard_subscription_available']) || !empty($array['properties']['wildcard_subscription_available'])) { + $propertiesTotalLength += 2; + } + $body .= chr($propertiesTotalLength); + + if (!empty($array['properties']['maximum_packet_size'])) { + $body .= chr(Property::MAXIMUM_PACKET_SIZE); + $body .= pack('N', $array['properties']['maximum_packet_size']); + } + + $retainAvailable = 0; + if (!isset($array['properties']['retain_available']) || !empty($array['properties']['retain_available'])) { + $retainAvailable = 1; + } + $body .= chr(Property::RETAIN_AVAILABLE); + $body .= chr($retainAvailable); + + $sharedSubscriptionAvailable = 0; + if (!isset($array['properties']['shared_subscription_available']) || !empty($array['properties']['shared_subscription_available'])) { + $sharedSubscriptionAvailable = 1; + } + $body .= chr(Property::SHARED_SUBSCRIPTION_AVAILABLE); + $body .= chr($sharedSubscriptionAvailable); + + $subscriptionIdentifierAvailable = 0; + if (!isset($array['properties']['subscription_identifier_available']) || !empty($array['properties']['subscription_identifier_available'])) { + $subscriptionIdentifierAvailable = 1; + } + $body .= chr(Property::SUBSCRIPTION_IDENTIFIER_AVAILABLE); + $body .= chr($subscriptionIdentifierAvailable); + + $topicAliasMaximum = 0; + if (isset($array['properties']['topic_alias_maximum'])) { + $topicAliasMaximum = $array['properties']['topic_alias_maximum']; + } + $body .= chr(Property::TOPIC_ALIAS_MAXIMUM); + $body .= pack('n', $topicAliasMaximum); + + $wildcardSubscriptionAvailable = 0; + if (!isset($array['properties']['wildcard_subscription_available']) || !empty($array['properties']['wildcard_subscription_available'])) { + $wildcardSubscriptionAvailable = 1; + } + $body .= chr(Property::WILDCARD_SUBSCRIPTION_AVAILABLE); + $body .= chr($wildcardSubscriptionAvailable); + $head = static::packHeader(Types::CONNACK, strlen($body)); return $head . $body; @@ -62,6 +196,16 @@ private static function string(string $str) return pack('n', $len) . $str; } + private static function longInt($int) + { + return pack('N', $int); + } + + private static function shortInt($int) + { + return pack('n', $int); + } + public static function packHeader(int $type, int $bodyLength, int $dup = 0, int $qos = 0, int $retain = 0): string { $type = $type << 4; diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php index 401a864..7bc077b 100644 --- a/src/Packet/UnPackV5.php +++ b/src/Packet/UnPackV5.php @@ -14,6 +14,7 @@ namespace Simps\MQTT\Packet; use Simps\MQTT\Exception\LengthException; +use Simps\MQTT\Hex\Property; use Simps\MQTT\Types; class UnPackV5 @@ -33,17 +34,17 @@ public static function connect(string $remaining): array $propertiesTotalLength = ord($remaining[0]); $remaining = substr($remaining, 1); if ($propertiesTotalLength) { - $sessionExpiryIntervalFlag = ord($remaining[0]) & ~0x11; + $sessionExpiryIntervalFlag = ord($remaining[0]) & ~Property::SESSION_EXPIRY_INTERVAL; if ($sessionExpiryIntervalFlag === 0) { $remaining = substr($remaining, 1); $sessionExpiryInterval = static::longInt($remaining); } - $receiveMaximumFlag = ord($remaining[0]) & ~0x21; + $receiveMaximumFlag = ord($remaining[0]) & ~Property::RECEIVE_MAXIMUM; if ($receiveMaximumFlag === 0) { $remaining = substr($remaining, 1); $receiveMaximum = static::shortInt($remaining); } - $topicAliasMaximumFlag = ord($remaining[0]) & ~0x22; + $topicAliasMaximumFlag = ord($remaining[0]) & ~Property::TOPIC_ALIAS_MAXIMUM; if ($topicAliasMaximumFlag === 0) { $remaining = substr($remaining, 1); $topicAliasMaximum = static::shortInt($remaining); @@ -54,22 +55,22 @@ public static function connect(string $remaining): array $willPropertiesTotalLength = ord($remaining[0]); $remaining = substr($remaining, 1); if ($willPropertiesTotalLength) { - $willDelayIntervalFlag = ord($remaining[0]) & ~0x18; + $willDelayIntervalFlag = ord($remaining[0]) & ~Property::WILL_DELAY_INTERVAL; if ($willDelayIntervalFlag === 0) { $remaining = substr($remaining, 1); $willDelayInterval = static::longInt($remaining); } - $messageExpiryIntervalFlag = ord($remaining[0]) & ~0x02; + $messageExpiryIntervalFlag = ord($remaining[0]) & ~Property::MESSAGE_EXPIRY_INTERVAL; if ($messageExpiryIntervalFlag === 0) { $remaining = substr($remaining, 1); $messageExpiryInterval = static::longInt($remaining); } - $contentTypeFlag = ord($remaining[0]) & ~0x03; + $contentTypeFlag = ord($remaining[0]) & ~Property::CONTENT_TYPE; if ($contentTypeFlag === 0) { $remaining = substr($remaining, 1); $contentType = static::string($remaining); } - $payloadFormatIndicatorFlag = ord($remaining[0]) & ~0x01; + $payloadFormatIndicatorFlag = ord($remaining[0]) & ~Property::PAYLOAD_FORMAT_INDICATOR; if ($payloadFormatIndicatorFlag === 0) { $payloadFormatIndicator = ord($remaining[1]); $remaining = substr($remaining, 2); @@ -90,6 +91,7 @@ public static function connect(string $remaining): array 'protocol_name' => $protocolName, 'protocol_level' => $protocolLevel, 'clean_session' => $cleanSession, + 'properties' => [], 'will' => [], 'user_name' => $userName, 'password' => $password, @@ -98,14 +100,16 @@ public static function connect(string $remaining): array if ($propertiesTotalLength) { if ($sessionExpiryIntervalFlag === 0) { - $package['session_expiry_interval'] = $sessionExpiryInterval; + $package['properties']['session_expiry_interval'] = $sessionExpiryInterval; } if ($receiveMaximumFlag === 0) { - $package['receive_maximum'] = $receiveMaximum; + $package['properties']['receive_maximum'] = $receiveMaximum; } if ($receiveMaximumFlag === 0) { - $package['topic_alias_aximum'] = $topicAliasMaximum; + $package['properties']['topic_alias_aximum'] = $topicAliasMaximum; } + } else { + unset($package['properties']); } $package['client_id'] = $clientId; @@ -113,16 +117,16 @@ public static function connect(string $remaining): array if ($willFlag) { if ($willPropertiesTotalLength) { if ($willDelayIntervalFlag === 0) { - $package['will']['will_delay_interval'] = $willDelayInterval; + $package['will']['properties']['will_delay_interval'] = $willDelayInterval; } if ($messageExpiryIntervalFlag === 0) { - $package['will']['message_expiry_interval'] = $messageExpiryInterval; + $package['will']['properties']['message_expiry_interval'] = $messageExpiryInterval; } if ($contentTypeFlag === 0) { - $package['will']['content_type'] = $contentType; + $package['will']['properties']['content_type'] = $contentType; } if ($payloadFormatIndicatorFlag === 0) { - $package['will']['payload_format_indicator'] = $payloadFormatIndicator; + $package['will']['properties']['payload_format_indicator'] = $payloadFormatIndicator; } } $package['will'] += [ @@ -138,6 +142,78 @@ public static function connect(string $remaining): array return $package; } + public static function connAck(string $remaining): array + { + $sessionPresent = ord($remaining[0]) & 0x01; + $code = ord($remaining[1]); + $remaining = substr($remaining, 2); + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + $maximumPacketSizeFlag = ord($remaining[0]) & ~Property::MAXIMUM_PACKET_SIZE; + if ($maximumPacketSizeFlag === 0) { + $remaining = substr($remaining, 1); + $maximumPacketSize = static::longInt($remaining); + } + $retainAvailableFlag = ord($remaining[0]) & ~Property::RETAIN_AVAILABLE; + if ($retainAvailableFlag === 0) { + $retainAvailable = ord($remaining[1]); + $remaining = substr($remaining, 2); + } + $sharedSubscriptionAvailableFlag = ord($remaining[0]) & ~Property::SHARED_SUBSCRIPTION_AVAILABLE; + if ($sharedSubscriptionAvailableFlag === 0) { + $sharedSubscriptionAvailable = ord($remaining[1]); + $remaining = substr($remaining, 2); + } + $subscriptionIdentifierAvailableFlag = ord($remaining[0]) & ~Property::SUBSCRIPTION_IDENTIFIER_AVAILABLE; + if ($subscriptionIdentifierAvailableFlag === 0) { + $subscriptionIdentifierAvailable = ord($remaining[1]); + $remaining = substr($remaining, 2); + } + $topicAliasMaximumFlag = ord($remaining[0]) & ~Property::TOPIC_ALIAS_MAXIMUM; + if ($topicAliasMaximumFlag === 0) { + $remaining = substr($remaining, 1); + $topicAliasMaximum = static::shortInt($remaining); + } + $wildcardSubscriptionAvailableFlag = ord($remaining[0]) & ~Property::WILDCARD_SUBSCRIPTION_AVAILABLE; + if ($wildcardSubscriptionAvailableFlag === 0) { + $wildcardSubscriptionAvailable = ord($remaining[1]); + $remaining = substr($remaining, 2); + } + } + + $package = [ + 'type' => Types::CONNACK, + 'session_present' => $sessionPresent, + 'code' => $code, + 'properties' => [], + ]; + if ($propertiesTotalLength) { + if ($maximumPacketSizeFlag === 0) { + $package['properties']['maximum_packet_size'] = $maximumPacketSize; + } + if ($retainAvailableFlag === 0) { + $package['properties']['retain_available'] = $retainAvailable; + } + if ($sharedSubscriptionAvailableFlag === 0) { + $package['properties']['shared_subscription_available'] = $sharedSubscriptionAvailable; + } + if ($subscriptionIdentifierAvailableFlag === 0) { + $package['properties']['subscription_identifier_available'] = $subscriptionIdentifierAvailable; + } + if ($topicAliasMaximumFlag === 0) { + $package['properties']['topic_alias_maximum'] = $topicAliasMaximum; + } + if ($wildcardSubscriptionAvailableFlag === 0) { + $package['properties']['wildcard_subscription_available'] = $wildcardSubscriptionAvailable; + } + } else { + unset($package['properties']); + } + + return $package; + } + private static function string(&$remaining) { $length = unpack('n', $remaining)[1]; From 85a00ff809a1ccd4be036d728b6f041fe108a11a Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Mon, 14 Dec 2020 11:00:32 +0800 Subject: [PATCH 09/18] Update ReasonCode --- src/Hex/ReasonCode.php | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/Hex/ReasonCode.php b/src/Hex/ReasonCode.php index c19774e..323cb26 100644 --- a/src/Hex/ReasonCode.php +++ b/src/Hex/ReasonCode.php @@ -134,6 +134,25 @@ abstract class ReasonCode self::USE_ANOTHER_SERVER => 'Use another server', self::SERVER_MOVED => 'Server moved', self::CONNECTION_RATE_EXCEEDED => 'Connection rate exceeded', + self::DISCONNECT_WITH_WILL_MESSAGE => 'Disconnect with Will Message', + self::SERVER_SHUTTING_DOWN => 'Server shutting down', + self::KEEP_ALIVE_TIMEOUT => 'Keep Alive timeout', + self::SESSION_TAKEN_OVER => 'Session taken over', + self::TOPIC_FILTER_INVALID => 'Topic Filter invalid', + self::RECEIVE_MAXIMUM_EXCEEDED => 'Receive Maximum exceeded', + self::TOPIC_ALIAS_INVALID => 'Topic Alias invalid', + self::MESSAGE_RATE_TOO_HIGH => 'MESSAGE_RATE_TOO_HIGH', + self::ADMINISTRATIVE_ACTION => 'Administrative action', + self::SHARED_SUBSCRIPTIONS_NOT_SUPPORTED => 'Shared Subscriptions not supported', + self::MAXIMUM_CONNECT_TIME => 'Maximum connect time', + self::SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED => 'Subscription Identifiers not supported', + self::WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED => 'Wildcard Subscriptions not supported', + self::NO_MATCHING_SUBSCRIBERS => 'No matching subscribers', + self::NO_SUBSCRIPTION_EXISTED => 'No subscription existed', + self::CONTINUE_AUTHENTICATION => 'Continue authentication', + self::RE_AUTHENTICATE => 'Re-authenticate', + self::PACKET_IDENTIFIER_IN_USE => 'Packet Identifier in use', + self::PACKET_IDENTIFIER_NOT_FOUND => 'Packet Identifier not found', ]; public static function getReasonPhrases(): array From 84a7ba40389adba20c5452568d09c3dc0b2ba73b Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Mon, 14 Dec 2020 13:43:37 +0800 Subject: [PATCH 10/18] Add subscribe and disconnect --- src/Packet/PackV5.php | 43 ++++++++++++++++++++++++++++++++++++++- src/Packet/UnPackV5.php | 45 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/src/Packet/PackV5.php b/src/Packet/PackV5.php index fa769d9..03ea37d 100644 --- a/src/Packet/PackV5.php +++ b/src/Packet/PackV5.php @@ -14,6 +14,7 @@ namespace Simps\MQTT\Packet; use Simps\MQTT\Hex\Property; +use Simps\MQTT\Hex\ReasonCode; use Simps\MQTT\Types; class PackV5 @@ -189,7 +190,47 @@ public static function connAck(array $array): string return $head . $body; } - private static function string(string $str) + public static function subscribe(array $array): string + { + $body = pack('n', $array['message_id']); + + $propertiesTotalLength = 0; + $body .= chr($propertiesTotalLength); + + foreach ($array['topics'] as $topic => $options) { + $body .= static::string($topic); + + $subscribeOptions = 0; + if (isset($options['qos'])) { + $subscribeOptions |= (int) $options['qos']; + } + if (isset($options['no_local'])) { + $subscribeOptions |= (int) $options['no_local'] << 2; + } + if (isset($options['retain_as_published'])) { + $subscribeOptions |= (int) $options['retain_as_published'] << 3; + } + if (isset($options['retain_handling'])) { + $subscribeOptions |= (int) $options['retain_handling'] << 4; + } + $body .= chr($subscribeOptions); + } + + $head = static::packHeader(Types::SUBSCRIBE, strlen($body), 0, 1); + + return $head . $body; + } + + public static function disconnect(array $array): string + { + $code = !empty($array['code']) ? $array['code'] : ReasonCode::NORMAL_DISCONNECTION; + $body = chr($code); + $head = static::packHeader(Types::DISCONNECT, strlen($body)); + + return $head . $body; + } + + private static function string(string $str): string { $len = strlen($str); diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php index 7bc077b..1ca10c0 100644 --- a/src/Packet/UnPackV5.php +++ b/src/Packet/UnPackV5.php @@ -15,6 +15,7 @@ use Simps\MQTT\Exception\LengthException; use Simps\MQTT\Hex\Property; +use Simps\MQTT\Hex\ReasonCode; use Simps\MQTT\Types; class UnPackV5 @@ -214,6 +215,50 @@ public static function connAck(string $remaining): array return $package; } + public static function subscribe(string $remaining): array + { + $messageId = static::shortInt($remaining); + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + // TODO SUBSCRIBE Properties + } + $topics = []; + while ($remaining) { + $topic = static::string($remaining); + $topics[$topic] = [ + 'qos' => ord($remaining[0]) & 0x3, + 'no_local' => (bool) (ord($remaining[0]) >> 2 & 0x1), + 'retain_as_published' => (bool) (ord($remaining[0]) >> 3 & 0x1), + 'retain_handling' => ord($remaining[0]) >> 4, + ]; + $remaining = substr($remaining, 1); + } + + return [ + 'type' => Types::SUBSCRIBE, + 'message_id' => $messageId, + 'topics' => $topics, + ]; + } + + public static function disconnect(string $remaining): array + { + if ($remaining[0]) { + $code = ord($remaining[0]); + $msg = ReasonCode::getReasonPhrase($code); + } else { + $code = ReasonCode::NORMAL_DISCONNECTION; + $msg = 'Normal disconnection'; + } + + return [ + 'type' => Types::DISCONNECT, + 'code' => $code, + 'message' => $msg, + ]; + } + private static function string(&$remaining) { $length = unpack('n', $remaining)[1]; From 91964405c5a1131c9d899d8874c74c5f0ff997d4 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Mon, 14 Dec 2020 15:48:07 +0800 Subject: [PATCH 11/18] Add subAck --- src/Packet/PackV5.php | 15 +++++++++++++++ src/Packet/UnPackV5.php | 14 ++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/Packet/PackV5.php b/src/Packet/PackV5.php index 03ea37d..98972e7 100644 --- a/src/Packet/PackV5.php +++ b/src/Packet/PackV5.php @@ -221,6 +221,21 @@ public static function subscribe(array $array): string return $head . $body; } + public static function subAck(array $array): string + { + $body = pack('n', $array['message_id']); + $propertiesTotalLength = 0; + $body .= chr($propertiesTotalLength); + + $body .= call_user_func_array( + 'pack', + array_merge(['C*'], $array['payload']) + ); + $head = static::packHeader(Types::SUBACK, strlen($body)); + + return $head . $body; + } + public static function disconnect(array $array): string { $code = !empty($array['code']) ? $array['code'] : ReasonCode::NORMAL_DISCONNECTION; diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php index 1ca10c0..b042216 100644 --- a/src/Packet/UnPackV5.php +++ b/src/Packet/UnPackV5.php @@ -242,6 +242,20 @@ public static function subscribe(string $remaining): array ]; } + public static function subAck(string $remaining): array + { + $messageId = static::shortInt($remaining); + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + // TODO SUBACK Properties + } + + $tmp = unpack('C*', $remaining); + + return ['type' => Types::SUBACK, 'message_id' => $messageId, 'codes' => array_values($tmp)]; + } + public static function disconnect(string $remaining): array { if ($remaining[0]) { From 40a59a31001a1c21da4c11e5feabc1ac9767f0a3 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Mon, 14 Dec 2020 16:43:41 +0800 Subject: [PATCH 12/18] Add unSubscribe and unSubAck --- src/Packet/PackV5.php | 27 +++++++++++++++++++++++++++ src/Packet/UnPackV5.php | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/src/Packet/PackV5.php b/src/Packet/PackV5.php index 98972e7..5ab75e2 100644 --- a/src/Packet/PackV5.php +++ b/src/Packet/PackV5.php @@ -236,6 +236,33 @@ public static function subAck(array $array): string return $head . $body; } + public static function unSubscribe(array $array): string + { + $body = pack('n', $array['message_id']); + $propertiesTotalLength = 0; + $body .= chr($propertiesTotalLength); + + foreach ($array['topics'] as $topic) { + $body .= static::string($topic); + } + $head = static::packHeader(Types::UNSUBSCRIBE, strlen($body), 0, 1); + + return $head . $body; + } + + public static function unSubAck(array $array): string + { + $body = pack('n', $array['message_id']); + $propertiesTotalLength = 0; + $body .= chr($propertiesTotalLength); + + $code = !empty($array['code']) ? $array['code'] : ReasonCode::SUCCESS; + $body .= chr($code); + $head = PackV5::packHeader(Types::UNSUBACK, strlen($body)); + + return $head . $body; + } + public static function disconnect(array $array): string { $code = !empty($array['code']) ? $array['code'] : ReasonCode::NORMAL_DISCONNECTION; diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php index b042216..f06686d 100644 --- a/src/Packet/UnPackV5.php +++ b/src/Packet/UnPackV5.php @@ -256,6 +256,43 @@ public static function subAck(string $remaining): array return ['type' => Types::SUBACK, 'message_id' => $messageId, 'codes' => array_values($tmp)]; } + public static function unSubscribe(string $remaining): array + { + $messageId = static::shortInt($remaining); + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + // TODO UNSUBSCRIBE Properties + } + $topics = []; + while ($remaining) { + $topic = static::string($remaining); + $topics[] = $topic; + } + + return ['type' => Types::UNSUBSCRIBE, 'message_id' => $messageId, 'topics' => $topics]; + } + + public static function unSubAck(string $remaining): array + { + $messageId = static::shortInt($remaining); + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + // TODO UNSUBACK Properties + } + + $code = ord($remaining[0]); + $msg = ReasonCode::getReasonPhrase($code); + + return [ + 'type' => Types::UNSUBACK, + 'message_id' => $messageId, + 'code' => $code, + 'message' => $msg, + ]; + } + public static function disconnect(string $remaining): array { if ($remaining[0]) { From 68c4b92ad8d4c8771903b95d018b71d37af25b60 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Mon, 14 Dec 2020 18:03:36 +0800 Subject: [PATCH 13/18] Add Publish and Add genReasonPhrase and Update pack n --- src/Packet/PackV5.php | 59 +++++++++++++++++++++++++++++---- src/Packet/UnPackV5.php | 72 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 8 deletions(-) diff --git a/src/Packet/PackV5.php b/src/Packet/PackV5.php index 5ab75e2..9aad1e4 100644 --- a/src/Packet/PackV5.php +++ b/src/Packet/PackV5.php @@ -42,7 +42,7 @@ public static function connect(array $array): string $body .= chr($connectFlags); $keepAlive = !empty($array['keep_alive']) && (int) $array['keep_alive'] >= 0 ? (int) $array['keep_alive'] : 0; - $body .= pack('n', $keepAlive); + $body .= static::shortInt($keepAlive); $propertiesTotalLength = 0; if (!empty($array['properties']['session_expiry_interval'])) { @@ -147,7 +147,7 @@ public static function connAck(array $array): string if (!empty($array['properties']['maximum_packet_size'])) { $body .= chr(Property::MAXIMUM_PACKET_SIZE); - $body .= pack('N', $array['properties']['maximum_packet_size']); + $body .= static::longInt($array['properties']['maximum_packet_size']); } $retainAvailable = 0; @@ -176,7 +176,7 @@ public static function connAck(array $array): string $topicAliasMaximum = $array['properties']['topic_alias_maximum']; } $body .= chr(Property::TOPIC_ALIAS_MAXIMUM); - $body .= pack('n', $topicAliasMaximum); + $body .= static::shortInt($topicAliasMaximum); $wildcardSubscriptionAvailable = 0; if (!isset($array['properties']['wildcard_subscription_available']) || !empty($array['properties']['wildcard_subscription_available'])) { @@ -190,9 +190,36 @@ public static function connAck(array $array): string return $head . $body; } + public static function publish(array $array): string + { + $body = static::string($array['topic']); + $qos = $array['qos'] ?? 0; + if ($qos) { + $body .= static::shortInt($array['message_id']); + } + + $propertiesTotalLength = 0; + if (!empty($array['properties']['topic_alias'])) { + $propertiesTotalLength += 3; + } + $body .= chr($propertiesTotalLength); + + if (!empty($array['properties']['topic_alias'])) { + $body .= chr(Property::TOPIC_ALIAS); + $body .= static::shortInt($array['properties']['topic_alias']); + } + + $body .= $array['message']; + $dup = $array['dup'] ?? 0; + $retain = $array['retain'] ?? 0; + $head = static::packHeader(Types::PUBLISH, strlen($body), $dup, $qos, $retain); + + return $head . $body; + } + public static function subscribe(array $array): string { - $body = pack('n', $array['message_id']); + $body = static::shortInt($array['message_id']); $propertiesTotalLength = 0; $body .= chr($propertiesTotalLength); @@ -223,7 +250,7 @@ public static function subscribe(array $array): string public static function subAck(array $array): string { - $body = pack('n', $array['message_id']); + $body = static::shortInt($array['message_id']); $propertiesTotalLength = 0; $body .= chr($propertiesTotalLength); @@ -238,7 +265,7 @@ public static function subAck(array $array): string public static function unSubscribe(array $array): string { - $body = pack('n', $array['message_id']); + $body = static::shortInt($array['message_id']); $propertiesTotalLength = 0; $body .= chr($propertiesTotalLength); @@ -252,7 +279,7 @@ public static function unSubscribe(array $array): string public static function unSubAck(array $array): string { - $body = pack('n', $array['message_id']); + $body = static::shortInt($array['message_id']); $propertiesTotalLength = 0; $body .= chr($propertiesTotalLength); @@ -272,6 +299,24 @@ public static function disconnect(array $array): string return $head . $body; } + public static function genReasonPhrase(array $array): string + { + $body = static::shortInt($array['message_id']); + $code = !empty($array['code']) ? $array['code'] : ReasonCode::SUCCESS; + $body .= chr($code); + + $propertiesTotalLength = 0; + $body .= chr($propertiesTotalLength); + + if ($array['type'] === Types::PUBREL) { + $head = PackV5::packHeader($array['type'], strlen($body), 0, 1); + } else { + $head = PackV5::packHeader($array['type'], strlen($body)); + } + + return $head . $body; + } + private static function string(string $str): string { $len = strlen($str); diff --git a/src/Packet/UnPackV5.php b/src/Packet/UnPackV5.php index f06686d..d4f3f46 100644 --- a/src/Packet/UnPackV5.php +++ b/src/Packet/UnPackV5.php @@ -215,6 +215,46 @@ public static function connAck(string $remaining): array return $package; } + public static function publish(int $dup, int $qos, int $retain, string $remaining): array + { + $topic = static::string($remaining); + if ($qos) { + $messageId = static::shortInt($remaining); + } + + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + // TODO PUBLISH Properties + $topicAliasFlag = ord($remaining[0]) & ~Property::TOPIC_ALIAS; + if ($topicAliasFlag === 0) { + $remaining = substr($remaining, 1); + $topicAlias = static::shortInt($remaining); + } + } + + $package = [ + 'type' => Types::PUBLISH, + 'topic' => $topic, + 'message' => $remaining, + 'dup' => $dup, + 'qos' => $qos, + 'retain' => $retain, + ]; + + if ($qos) { + $package['message_id'] = $messageId; + } + + if ($propertiesTotalLength) { + if ($topicAliasFlag === 0) { + $package['properties']['topic_alias'] = $topicAlias; + } + } + + return $package; + } + public static function subscribe(string $remaining): array { $messageId = static::shortInt($remaining); @@ -282,7 +322,11 @@ public static function unSubAck(string $remaining): array // TODO UNSUBACK Properties } - $code = ord($remaining[0]); + if ($remaining[0]) { + $code = ord($remaining[0]); + } else { + $code = ReasonCode::SUCCESS; + } $msg = ReasonCode::getReasonPhrase($code); return [ @@ -310,6 +354,32 @@ public static function disconnect(string $remaining): array ]; } + public static function getReasonCode(int $type, string $remaining): array + { + $messageId = static::shortInt($remaining); + + if ($remaining[0]) { + $code = ord($remaining[0]); + } else { + $code = ReasonCode::SUCCESS; + } + $msg = ReasonCode::getReasonPhrase($code); + $remaining = substr($remaining, 1); + + $propertiesTotalLength = ord($remaining[0]); + $remaining = substr($remaining, 1); + if ($propertiesTotalLength) { + // TODO Properties + } + + return [ + 'type' => $type, + 'message_id' => $messageId, + 'code' => $code, + 'message' => $msg, + ]; + } + private static function string(&$remaining) { $length = unpack('n', $remaining)[1]; From 310aa65f819b73febb30e67a4291db26bd0114d1 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Tue, 15 Dec 2020 10:48:35 +0800 Subject: [PATCH 14/18] Update client and Add ProtocolV5 --- src/Client.php | 30 +++++++++++++++++++++++++----- src/ProtocolV5.php | 26 ++++++++++++++------------ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/Client.php b/src/Client.php index 128dfb7..c1eda35 100644 --- a/src/Client.php +++ b/src/Client.php @@ -14,6 +14,7 @@ namespace Simps\MQTT; use Simps\MQTT\Exception\RuntimeException; +use Simps\MQTT\Hex\ReasonCode; use Swoole\Coroutine; class Client @@ -31,6 +32,11 @@ class Client 'keep_alive' => 0, 'protocol_name' => 'MQTT', 'protocol_level' => 4, + 'properties' => [ + 'session_expiry_interval' => 0, + 'receive_maximum' => 0, + 'topic_alias_maximum' => 0, + ], ]; private $messageId = 0; @@ -52,10 +58,15 @@ public function connect(bool $clean = true, array $will = []) $data = [ 'type' => Types::CONNECT, 'protocol_name' => $this->config['protocol_name'], - 'protocol_level' => $this->config['protocol_level'], + 'protocol_level' => (int) $this->config['protocol_level'], 'clean_session' => $clean ? 0 : 1, 'client_id' => $this->config['client_id'], 'keep_alive' => $this->config['keep_alive'], + 'properties' => [ + 'session_expiry_interval' => $this->config['properties']['session_expiry_interval'], + 'receive_maximum' => $this->config['properties']['receive_maximum'], + 'topic_alias_maximum' => $this->config['properties']['topic_alias_maximum'], + ], 'user_name' => $this->config['user_name'], 'password' => $this->config['password'], ]; @@ -88,7 +99,7 @@ public function unSubscribe(array $topics) return $this->send($data); } - public function publish($topic, $content, $qos = 0, $dup = 0, $retain = 0) + public function publish($topic, $content, $qos = 0, $dup = 0, $retain = 0, array $properties = []) { $response = ($qos > 0) ? true : false; @@ -101,6 +112,7 @@ public function publish($topic, $content, $qos = 0, $dup = 0, $retain = 0) 'qos' => $qos, 'dup' => $dup, 'retain' => $retain, + 'properties' => $properties, ], $response ); @@ -111,9 +123,9 @@ public function ping() return $this->send(['type' => Types::PINGREQ]); } - public function close() + public function close(int $code = ReasonCode::NORMAL_DISCONNECTION) { - $this->send(['type' => Types::DISCONNECT], false); + $this->send(['type' => Types::DISCONNECT, 'code' => $code], false); return $this->client->close(); } @@ -133,7 +145,11 @@ private function reConnect() public function send(array $data, $response = true) { - $package = Protocol::pack($data); + if ($this->config['protocol_level'] === 5) { + $package = ProtocolV5::pack($data); + } else { + $package = Protocol::pack($data); + } $this->client->send($package); if ($response) { return $this->recv(); @@ -153,6 +169,10 @@ public function recv() throw new RuntimeException($this->client->errMsg, $this->client->errCode); } } elseif (strlen($response) > 0) { + if ($this->config['protocol_level'] === 5) { + return ProtocolV5::unpack($response); + } + return Protocol::unpack($response); } diff --git a/src/ProtocolV5.php b/src/ProtocolV5.php index 333990a..d1f240c 100644 --- a/src/ProtocolV5.php +++ b/src/ProtocolV5.php @@ -41,14 +41,7 @@ public static function pack(array $array) case Types::PUBREC: case Types::PUBREL: case Types::PUBCOMP: - case Types::UNSUBACK: - $body = pack('n', $array['message_id']); - if ($type === Types::PUBREL) { - $head = PackV5::packHeader($type, strlen($body), 0, 1); - } else { - $head = PackV5::packHeader($type, strlen($body)); - } - $package = $head . $body; + $package = PackV5::genReasonPhrase($array); break; case Types::SUBSCRIBE: $package = PackV5::subscribe($array); @@ -59,11 +52,16 @@ public static function pack(array $array) case Types::UNSUBSCRIBE: $package = PackV5::unSubscribe($array); break; + case Types::UNSUBACK: + $package = PackV5::unSubAck($array); + break; case Types::PINGREQ: case Types::PINGRESP: - case Types::DISCONNECT: $package = PackV5::packHeader($type, 0); break; + case Types::DISCONNECT: + $package = PackV5::disconnect($array); + break; default: throw new InvalidArgumentException('MQTT Type not exist'); } @@ -98,14 +96,15 @@ public static function unpack(string $data) case Types::PUBREC: case Types::PUBREL: case Types::PUBCOMP: - case Types::UNSUBACK: - $package = ['type' => $type, 'message_id' => unpack('n', $remaining)[1]]; + $package = UnPackV5::getReasonCode($type, $remaining); break; case Types::PINGREQ: case Types::PINGRESP: - case Types::DISCONNECT: $package = ['type' => $type]; break; + case Types::DISCONNECT: + $package = UnPackV5::disconnect($remaining); + break; case Types::SUBSCRIBE: $package = UnPackV5::subscribe($remaining); break; @@ -115,6 +114,9 @@ public static function unpack(string $data) case Types::UNSUBSCRIBE: $package = UnPackV5::unSubscribe($remaining); break; + case Types::UNSUBACK: + $package = UnPackV5::unSubAck($remaining); + break; default: $package = []; } From 42fdf848f65b8d48c5d5b8ec5e44b2c15bb083e9 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Tue, 15 Dec 2020 11:06:13 +0800 Subject: [PATCH 15/18] Add v5 examples --- examples/v5/publish.php | 54 +++++++++++++ examples/v5/server.php | 150 ++++++++++++++++++++++++++++++++++++ examples/v5/subscribe.php | 86 +++++++++++++++++++++ examples/v5/unsubscribe.php | 49 ++++++++++++ 4 files changed, 339 insertions(+) create mode 100644 examples/v5/publish.php create mode 100644 examples/v5/server.php create mode 100644 examples/v5/subscribe.php create mode 100644 examples/v5/unsubscribe.php diff --git a/examples/v5/publish.php b/examples/v5/publish.php new file mode 100644 index 0000000..e79791f --- /dev/null +++ b/examples/v5/publish.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +include __DIR__ . '/../../vendor/autoload.php'; + +use Swoole\Coroutine; +use Simps\MQTT\Client; + +$config = [ + 'host' => '127.0.0.1', +// 'host' => 'broker.emqx.io', + 'port' => 1883, + 'time_out' => 5, + 'user_name' => 'user001', + 'password' => 'hLXQ9ubnZGzkzf', + 'client_id' => Client::genClientID(), + 'keep_alive' => 20, + 'properties' => [ + 'session_expiry_interval' => 213, + 'receive_maximum' => 221, + 'topic_alias_maximum' => 313, + ], + 'protocol_level' => 5, +]; + +Coroutine\run( + function () use ($config) { + $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]); + while (!$client->connect()) { + Coroutine::sleep(3); + $client->connect(); + } + while (true) { + $response = $client->publish( + 'simps-mqtt/user001/update', + '{"time":' . time() . '}', + 1, + 0, + 0, + ['topic_alias' => 1] + ); + var_dump($response); + Coroutine::sleep(3); + } + } +); diff --git a/examples/v5/server.php b/examples/v5/server.php new file mode 100644 index 0000000..24dbbaa --- /dev/null +++ b/examples/v5/server.php @@ -0,0 +1,150 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +include __DIR__ . '/../../vendor/autoload.php'; + +use Simps\MQTT\ProtocolV5; +use Simps\MQTT\Types; + +$server = new Swoole\Server('127.0.0.1', 1883, SWOOLE_BASE); + +$server->set( + [ + 'open_mqtt_protocol' => true, + 'worker_num' => 2, + 'package_max_length' => 2 * 1024 * 1024 + ] +); + +$server->on('connect', function ($server, $fd) { + echo "Client #{$fd}: Connect.\n"; +}); + +$server->on('receive', function (Swoole\Server $server, $fd, $from_id, $data) { + try { + ProtocolV5::printf($data); + $data = ProtocolV5::unpack($data); + if (is_array($data) && isset($data['type'])) { + switch ($data['type']) { + case Types::CONNECT: + // Check protocol_name + if ($data['protocol_name'] != 'MQTT') { + $server->close($fd); + return false; + } + + // Check connection information, etc. + + $server->send( + $fd, + ProtocolV5::pack( + [ + 'type' => Types::CONNACK, + 'code' => 0, + 'session_present' => 0, + 'properties' => [ + 'maximum_packet_size' => 1048576, + 'retain_available' => true, + 'shared_subscription_available' => true, + 'subscription_identifier_available' => true, + 'topic_alias_maximum' => 65535, //0 + 'wildcard_subscription_available' => true, + ] + ] + ) + ); + break; + case Types::PINGREQ: + $server->send($fd, ProtocolV5::pack(['type' => Types::PINGRESP])); + break; + case Types::DISCONNECT: + if ($server->exist($fd)) { + $server->close($fd); + } + break; + case Types::PUBLISH: + // Send to subscribers + $server->send( + 1, + ProtocolV5::pack( + [ + 'type' => $data['type'], + 'topic' => $data['topic'], + 'message' => $data['message'], + 'dup' => $data['dup'], + 'qos' => $data['qos'], + 'retain' => $data['retain'], + 'message_id' => $data['message_id'] ?? '' + ] + ) + ); + + if ($data['qos'] === 1) { + $server->send( + $fd, + ProtocolV5::pack( + [ + 'type' => Types::PUBACK, + 'message_id' => $data['message_id'] ?? '', + ] + ) + ); + } + + break; + case Types::SUBSCRIBE: + $payload = []; + foreach ($data['topics'] as $k => $option) { + $qos = $option['qos']; + if (is_numeric($qos) && $qos < 3) { + $payload[] = $qos; + } else { + $payload[] = \Simps\MQTT\Hex\ReasonCode::QOS_NOT_SUPPORTED; + } + } + $server->send( + $fd, + ProtocolV5::pack( + [ + 'type' => Types::SUBACK, + 'message_id' => $data['message_id'] ?? '', + 'payload' => $payload + ] + ) + ); + break; + case Types::UNSUBSCRIBE: + $server->send( + $fd, + ProtocolV5::pack( + [ + 'type' => Types::UNSUBACK, + 'message_id' => $data['message_id'] ?? '', + ] + ) + ); + break; + } + } else { + $server->close($fd); + } + } catch (\Throwable $e) { + echo "\033[0;31mError: {$e->getMessage()}\033[0m\r\n"; + echo $e->getTraceAsString() . PHP_EOL; +// $server->close($fd); + } +}); + +$server->on('close', function ($server, $fd) { + echo "Client #{$fd}: Close.\n"; +}); + +$server->start(); diff --git a/examples/v5/subscribe.php b/examples/v5/subscribe.php new file mode 100644 index 0000000..0439003 --- /dev/null +++ b/examples/v5/subscribe.php @@ -0,0 +1,86 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +include __DIR__ . '/../../vendor/autoload.php'; + +use Swoole\Coroutine; +use Simps\MQTT\Client; + +$config = [ + 'host' => '127.0.0.1', +// 'host' => 'broker.emqx.io', + 'port' => 1883, + 'time_out' => 5, + 'user_name' => 'user001', + 'password' => 'hLXQ9ubnZGzkzf', + 'client_id' => 'd812edc1-18da-2085-0edf-a4a588c296d1', + 'keep_alive' => 20, + 'properties' => [ + 'session_expiry_interval' => 213, + 'receive_maximum' => 221, + 'topic_alias_maximum' => 313, + ], + 'protocol_level' => 5, +]; + +Coroutine\run(function () use ($config) { + $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]); + $will = [ + 'topic' => 'simps-mqtt/user001/update', + 'qos' => 1, + 'retain' => 0, + 'message' => 'byebye', + 'properties' => [ + 'will_delay_interval' => 60, + 'message_expiry_interval' => 60, + 'content_type' => 'test', + 'payload_format_indicator' => true, // false 0 1 + ], + ]; + while (!$data = $client->connect(false, $will)) { + \Swoole\Coroutine::sleep(3); + $client->connect(true, $will); + } +// $topics['simps-mqtt/user001/get'] = 0; +// $topics['simps-mqtt/user001/update'] = 2; + $topics['simps-mqtt/user001/get'] = [ + 'qos' => 1, + 'no_local' => true, + 'retain_as_published' => true, + 'retain_handling' => 2, + ]; + $topics['simps-mqtt/user001/update'] = [ + 'qos' => 2, + 'no_local' => false, + 'retain_as_published' => true, + 'retain_handling' => 2, + ]; + $timeSincePing = time(); + $res = $client->subscribe($topics); + var_dump($res); + while (true) { + $buffer = $client->recv(); + var_dump($buffer); + if ($buffer && $buffer !== true) { + $timeSincePing = time(); + } + if (isset($config['keep_alive']) && $timeSincePing < (time() - $config['keep_alive'])) { + $buffer = $client->ping(); + if ($buffer) { + echo 'send ping success' . PHP_EOL; + $timeSincePing = time(); + } else { + $client->close(); + break; + } + } + } +}); diff --git a/examples/v5/unsubscribe.php b/examples/v5/unsubscribe.php new file mode 100644 index 0000000..77355b2 --- /dev/null +++ b/examples/v5/unsubscribe.php @@ -0,0 +1,49 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code + */ + +include __DIR__ . '/../../vendor/autoload.php'; + +use Swoole\Coroutine; +use Simps\MQTT\Client; + +$config = [ + 'host' => '127.0.0.1', +// 'host' => 'broker.emqx.io', + 'port' => 1883, + 'time_out' => 5, + 'user_name' => 'user001', + 'password' => 'hLXQ9ubnZGzkzf', + 'client_id' => Client::genClientID(), + 'keep_alive' => 20, + 'properties' => [ + 'session_expiry_interval' => 213, + 'receive_maximum' => 221, + 'topic_alias_maximum' => 313, + ], + 'protocol_level' => 5, +]; + +Coroutine\run(function () use ($config) { + $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]); + $will = [ + 'topic' => 'simps-mqtt/user001/update', + 'qos' => 1, + 'retain' => 0, + 'message' => '' . time(), + ]; + while (! $client->connect(false, $will)) { + \Swoole\Coroutine::sleep(3); + $client->connect(true, $will); + } + $topics = ['simps-mqtt/user001/get']; + $res = $client->unsubscribe($topics); + var_dump($res); +}); From 3183c14525d965fa9d392bb4a3367448a0c5ccfd Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Tue, 15 Dec 2020 11:09:22 +0800 Subject: [PATCH 16/18] Update composer.json --- composer.json | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index e522400..97779da 100644 --- a/composer.json +++ b/composer.json @@ -1,6 +1,6 @@ { "name": "simps/mqtt", - "description": "MQTT Coroutine Client for PHP", + "description": "MQTT Protocol Analysis and Coroutine Client for PHP", "keywords": [ "php", "simps", @@ -9,7 +9,9 @@ "client", "coroutine", "mqtt_client", - "mqtt_protocol" + "mqtt_protocol", + "mqtt3.1", + "mqtt5.0" ], "license": "Apache-2.0", "authors": [ @@ -21,7 +23,7 @@ "require": { "php": ">=7.0", "ext-mbstring": "*", - "ext-swoole": ">=4.0" + "ext-swoole": ">=4.4.19" }, "require-dev": { "friendsofphp/php-cs-fixer": "^2.16" From d01c7f062f65ec2436f253c028af1289475ac180 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Tue, 15 Dec 2020 11:31:32 +0800 Subject: [PATCH 17/18] Update README --- README-CN.md | 191 +++++++++------------------------------------------ README.md | 179 +++++++++-------------------------------------- 2 files changed, 68 insertions(+), 302 deletions(-) diff --git a/README-CN.md b/README-CN.md index 10600b8..e106315 100644 --- a/README-CN.md +++ b/README-CN.md @@ -4,10 +4,14 @@ 适用于 PHP 的 MQTT 协议解析和协程客户端。 +支持 MQTT 协议 `3.1`、`3.1.1` 和 `5.0` 版本,支持`QoS 0`、`QoS 1`、`QoS 2`。 + [![Latest Stable Version](https://poser.pugx.org/simps/mqtt/v)](//packagist.org/packages/simps/mqtt) [![Total Downloads](https://poser.pugx.org/simps/mqtt/downloads)](//packagist.org/packages/simps/mqtt) [![Latest Unstable Version](https://poser.pugx.org/simps/mqtt/v/unstable)](//packagist.org/packages/simps/mqtt) [![License](https://poser.pugx.org/simps/mqtt/license)](LICENSE) +[![PHP Version](https://img.shields.io/badge/php-%3E=7.2-blue.svg)](https://www.php.net) +[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4.19-blue.svg)](https://www.php.net) ## 安装 @@ -15,171 +19,44 @@ composer require simps/mqtt ``` -## 示例 - -参考 [examples](./examples) 目录 - -## Client API - -### __construct() - -创建一个MQTT客户端实例 - -```php -Simps\MQTT\Client::__construct(array $config, array $swConfig = [], int $type = SWOOLE_SOCK_TCP) -``` - -* 参数`array $config` - -客户端选项数组,可以设置以下选项: - -```php -$config = [ - 'host' => '127.0.0.1', // MQTT服务端IP - 'port' => 1883, // MQTT服务端端口 - 'time_out' => 5, // 连接MQTT服务端超时时间,默认0.5秒 - 'user_name' => '', // 用户名 - 'password' => '', // 密码 - 'client_id' => '', // 客户端id - 'keep_alive' => 10, // 默认0秒,设置成0代表禁用 - 'protocol_name' => 'MQTT', // 协议名,默认为MQTT(3.1.1版本),也可为MQIsdp(3.1版本) - 'protocol_level' => 4, // 协议等级,MQTT为4,MQIsdp为3 -]; -``` - -* 参数`array $swConfig` - -用于设置`Swoole\Coroutine\Client`的配置,请参考Swoole文档:[set()](https://wiki.swoole.com/#/coroutine_client/client?id=set) - -### connect() - -连接Broker - -```php -Simps\MQTT\Client->connect(bool $clean = true, array $will = []) -``` - -* 参数`bool $clean` - -清理会话,默认为`true` - -具体描述请参考[清理会话 Clean Session](https://mcxiaoke.gitbook.io/mqtt/03-controlpackets/0301-connect#qing-li-hui-hua-clean-session) - -* 参数`array $will` - -遗嘱消息,当客户端断线后Broker会自动发送遗嘱消息给其它客户端 - -需要设置的内容如下 - -```php -$will = [ - 'topic' => '', // 主题 - 'qos' => 1, // QoS等级 - 'retain' => 0, // retain标记 - 'content' => '', // content -]; -``` - -### publish() - -向某个主题发布一条消息 - -```php -Simps\MQTT\Client->publish($topic, $content, $qos = 0, $dup = 0, $retain = 0) -``` - -* 参数`$topic` 主题 -* 参数`$content` 内容 -* 参数`$qos` QoS等级,默认0 -* 参数`$dup` 重发标志,默认0 -* 参数`$retain` retain标记,默认0 - -### subscribe() - -订阅一个主题或者多个主题 - -```php -Simps\MQTT\Client->subscribe(array $topics) -``` - -* 参数`array $topics` - -`$topics`的`key`是主题,值为`QoS`的数组,例如 - -```php -$topics = [ - // 主题 => Qos - 'topic1' => 0, - 'topic2' => 1, -]; -``` - -### unSubscribe() - -取消订阅一个主题或者多个主题 - -```php -Simps\MQTT\Client->unSubscribe(array $topics) -``` - -* 参数`array $topics` - -```php -$topics = ['topic1', 'topic2']; -``` - -### close() - -正常断开与Broker的连接,`DISCONNECT(14)`报文会被发送到Broker - -```php -Simps\MQTT\Client->close() -``` - -### recv() - -接收消息 - -```php -Simps\MQTT\Client->recv(): bool|arary|string -``` - -### send() - -发送消息 - -```php -Simps\MQTT\Client->send(array $data, $response = true) -``` - -* 参数`array $data` +## 文档 -`$data`是需要发送的数据,必须包含`type`等信息 +https://mqtt.simps.io -* 参数`bool $response` - -是否需要回执。如果为`true`,会调用一次`recv()` +## 示例 -### ping() +参考 [examples](./examples) 目录 -发送心跳包 +## 支持 -```php -Simps\MQTT\Client->ping() -``` +### Version -### buildMessageId() +- [x] `3.1` +- [x] `3.1.1` +- [x] `5.0` -生成MessageId +> 也许这是第一个支持 MQTT `v5.0` 协议的 PHP library。 -```php -Simps\MQTT\Client->buildMessageId() -``` +### QoS -### genClientId() +- [x] `QoS 0` +- [x] `QoS 1` +- [x] `QoS 2` -生成ClientId +### Type -```php -Simps\MQTT\Client->genClientID() -``` +- [x] CONNECT +- [x] CONNACK +- [x] PUBLISH +- [x] PUBACK +- [x] PUBREC +- [x] PUBREL +- [x] PUBCOMP +- [x] SUBSCRIBE +- [x] SUBACK +- [x] UNSUBSCRIBE +- [x] UNSUBACK +- [x] PINGREQ +- [x] PINGRESP +- [x] DISCONNECT +- [ ] AUTH diff --git a/README.md b/README.md index 85b24be..c6a87b2 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,14 @@ English | [中文](./README-CN.md) MQTT Protocol Analysis and Coroutine Client for PHP. +Support for MQTT protocol versions `3.1`, `3.1.1` and `5.0` and support for `QoS 0`, `QoS 1`, `QoS 2`. + [![Latest Stable Version](https://poser.pugx.org/simps/mqtt/v)](//packagist.org/packages/simps/mqtt) [![Total Downloads](https://poser.pugx.org/simps/mqtt/downloads)](//packagist.org/packages/simps/mqtt) [![Latest Unstable Version](https://poser.pugx.org/simps/mqtt/v/unstable)](//packagist.org/packages/simps/mqtt) [![License](https://poser.pugx.org/simps/mqtt/license)](LICENSE) +[![PHP Version](https://img.shields.io/badge/php-%3E=7.2-blue.svg)](https://www.php.net) +[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4.19-blue.svg)](https://www.php.net) ## Install @@ -15,159 +19,44 @@ MQTT Protocol Analysis and Coroutine Client for PHP. composer require simps/mqtt ``` -## Examples - -see [examples](./examples) - -## Client API - -### __construct() - -Create a MQTT client instance - -```php -Simps\MQTT\Client::__construct(array $config, array $swConfig = [], int $type = SWOOLE_SOCK_TCP) -``` - -* `array $config` - -An array of client options, you can set the following options: - -```php -$config = [ - 'host' => '127.0.0.1', - 'port' => 1883, - 'time_out' => 5, - 'user_name' => '', - 'password' => '', - 'client_id' => '', - 'keep_alive' => 10, - 'protocol_name' => 'MQTT', // or MQIsdp - 'protocol_level' => 4, // or 3 -]; -``` - -* `array $swConfig` - -To set the configuration of `Swoole\Coroutine\Client`, please see Swoole document: [set()](https://www.swoole.co.uk/docs/modules/swoole-coroutine-client-set) - -### connect() - -Connect Broker - -```php -Simps\MQTT\Client->connect(bool $clean = true, array $will = []) -``` - -* `bool $clean` - -Clean session. default is `true`. see [Clean Session](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180843) - -* `array $will` - -When a client is disconnected, Broker will automatically send a will message to other clients - -```php -$will = [ - 'topic' => '', - 'qos' => 1, - 'retain' => 0, - 'content' => '', -]; -``` - -### publish() - -push a message to a topic - -```php -Simps\MQTT\Client->publish($topic, $content, $qos = 0, $dup = 0, $retain = 0) -``` - -### subscribe() - -Subscribe to one topic or multiple topics - -```php -Simps\MQTT\Client->subscribe(array $topics) -``` - -* `array $topics` - -```php -$topics = [ - // topic => Qos - 'topic1' => 0, - 'topic2' => 1, -]; -``` - -### unSubscribe() - -Unsubscribe from a topic or multiple topics - -```php -Simps\MQTT\Client->unSubscribe(array $topics) -``` - -* `array $topics` - -```php -$topics = ['topic1', 'topic2']; -``` - -### close() - -Disconnect from Broker connect. The `DISCONNECT(14)` message is send to Broker - -```php -Simps\MQTT\Client->close() -``` - -### recv() - -Receive messages - -```php -Simps\MQTT\Client->recv(): bool|arary|string -``` - -### send() - -Send messages - -```php -Simps\MQTT\Client->send(array $data, $response = true) -``` +## Documentation -* `array $data` +https://mqtt.simps.io -`$data` is the data to be sent and must contain information such as `type` +## Examples -* `bool $response` +see [examples](./examples) -Are acknowledgements required. If `true`, `recv()` is called once +## Supports -### ping() +### Version -Send a heartbeat +- [x] `3.1` +- [x] `3.1.1` +- [x] `5.0` -```php -Simps\MQTT\Client->ping() -``` +> Perhaps the first PHP library to support the MQTT `v5.0` protocol. -### buildMessageId() +### QoS -Generate MessageId +- [x] `QoS 0` +- [x] `QoS 1` +- [x] `QoS 2` -```php -Simps\MQTT\Client->buildMessageId() -``` +### Type -### genClientId() - -Generate ClientId - -```php -Simps\MQTT\Client->genClientID() -``` +- [x] CONNECT +- [x] CONNACK +- [x] PUBLISH +- [x] PUBACK +- [x] PUBREC +- [x] PUBREL +- [x] PUBCOMP +- [x] SUBSCRIBE +- [x] SUBACK +- [x] UNSUBSCRIBE +- [x] UNSUBACK +- [x] PINGREQ +- [x] PINGRESP +- [x] DISCONNECT +- [ ] AUTH From e604085d3778b57e46e89fd7a027b08b659b10b1 Mon Sep 17 00:00:00 2001 From: sy-records <52o@qq52o.cn> Date: Tue, 15 Dec 2020 11:48:07 +0800 Subject: [PATCH 18/18] Update swoole link --- README-CN.md | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README-CN.md b/README-CN.md index e106315..25774ee 100644 --- a/README-CN.md +++ b/README-CN.md @@ -11,7 +11,7 @@ [![Latest Unstable Version](https://poser.pugx.org/simps/mqtt/v/unstable)](//packagist.org/packages/simps/mqtt) [![License](https://poser.pugx.org/simps/mqtt/license)](LICENSE) [![PHP Version](https://img.shields.io/badge/php-%3E=7.2-blue.svg)](https://www.php.net) -[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4.19-blue.svg)](https://www.php.net) +[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4.19-blue.svg)](https://github.com/swoole/swoole-src) ## 安装 diff --git a/README.md b/README.md index c6a87b2..6d6b4fd 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Support for MQTT protocol versions `3.1`, `3.1.1` and `5.0` and support for `QoS [![Latest Unstable Version](https://poser.pugx.org/simps/mqtt/v/unstable)](//packagist.org/packages/simps/mqtt) [![License](https://poser.pugx.org/simps/mqtt/license)](LICENSE) [![PHP Version](https://img.shields.io/badge/php-%3E=7.2-blue.svg)](https://www.php.net) -[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4.19-blue.svg)](https://www.php.net) +[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4.19-blue.svg)](https://github.com/swoole/swoole-src) ## Install