Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve race condition for UNSUBACK #225

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ch4nsuk3
Copy link

@ch4nsuk3 ch4nsuk3 commented Oct 9, 2024

When attempting to unsubscribe from a topic it was possible to receive the following error:

Traceback (most recent call last): File "code.py", line 79, in <module> File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 869, in unsubscribe MMQTTException: ('invalid message received as response to UNSUBSCRIBE: 0x30', None)

This is due to the unsubscribe function assuming the next packet received will either be the UNSUBACK or an error code, rather than accounting for possible PUBLISH packets coming from the MQTT broker that may have already been in flight. Looking at some packet captures taken from the broker side its possible to get this error before the broker even receives the UNSUB request.

This fix essentially sets the unsubscribe function to match the existing behavior in the subscribe function, which ignores publish messages while waiting for the SUBACK.

Additionally I added named constant values for the SUBACK and UNSUBACK opcodes, replacing the 'magic number' values for comparing returned opcodes to increase clarity.

Corrects the behavior of erroring out while waiting for an UNSUBACK when a publish message from the server arrives before the UNSUBACK does. Also changed op comparisons from using magic numbers to named constants for clarity.
f"invalid message received as response to UNSUBSCRIBE: {hex(op)}"
)
if op != MQTT_PUBLISH:
raise MMQTTException(
Copy link
Contributor

@vladak vladak Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to add a comment similar to the one in subscribe(), explain the rationale.

@vladak
Copy link
Contributor

vladak commented Nov 4, 2024

Looks good and make sense as The MQTT 3.1.1 spec says in section 3.10.4 Response:

If a Server deletes a Subscription:

  • It MUST stop adding any new messages for delivery to the Client [MQTT-3.10.4-2].
  • It MUST complete the delivery of any QoS 1 or QoS 2 messages which it has started to send to the Client [MQTT-3.10.4-3].
  • It MAY continue to deliver any existing messages buffered for delivery to the Client.

So the last bullet applies to this case.

Added a comment referencing the MQTT specification for why the server may not immediately respond to an UNSUBACK
Adjusted formatting to resolve the ruff E501 Error.
@ch4nsuk3
Copy link
Author

ch4nsuk3 commented Nov 8, 2024

Good call on the comment. Its been added.

@vladak
Copy link
Contributor

vladak commented Nov 15, 2024

It would be nice if the testdata packet sequences in https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/blob/main/tests/test_unsubscribe.py were augmented by the PUBLISH+UNSUBACK case like it is done for the SUBSCRIBE case:

# SUBSCRIBE responded to by PUBLISH followed by SUBACK
(
"foo/bar",
bytearray(
[
0x30, # PUBLISH
0x0C,
0x00,
0x07,
0x66,
0x6F,
0x6F,
0x2F,
0x62,
0x61,
0x72,
0x66,
0x6F,
0x6F,
0x90, # SUBACK
0x03,
0x00,
0x01,
0x00,
]
),
bytearray(
[
0x82, # fixed header
0x0C, # remaining length
0x00,
0x01, # message ID
0x00,
0x07, # topic length
0x66, # topic
0x6F,
0x6F,
0x2F,
0x62,
0x61,
0x72,
0x00, # QoS
]
),
),

@ch4nsuk3
Copy link
Author

I should be able to make these changes soon.

@vladak
Copy link
Contributor

vladak commented Jan 9, 2025

This should do it:

diff --git a/tests/test_unsubscribe.py b/tests/test_unsubscribe.py
index f7bbb21..1dfbb85 100644
--- a/tests/test_unsubscribe.py
+++ b/tests/test_unsubscribe.py
@@ -68,6 +68,49 @@ testdata = [
             + [0x6F] * 257
         ),
     ),
+    # UNSUBSCRIBE responded to by PUBLISH followed by UNSUBACK
+    (
+        "foo/bar",
+        bytearray(
+            [
+                0x30,  # PUBLISH
+                0x0C,
+                0x00,
+                0x07,
+                0x66,
+                0x6F,
+                0x6F,
+                0x2F,
+                0x62,
+                0x61,
+                0x72,
+                0x66,
+                0x6F,
+                0x6F,
+                0xB0,  # UNSUBACK
+                0x02,
+                0x00,
+                0x01,
+            ]
+        ),
+        bytearray(
+            [
+                0xA2,  # fixed header
+                0x0B,  # remaining length
+                0x00,
+                0x01,  # message ID
+                0x00,
+                0x07,  # topic length
+                0x66,  # topic
+                0x6F,
+                0x6F,
+                0x2F,
+                0x62,
+                0x61,
+                0x72,
+            ]
+        ),
+    ),
     # use list of topics for more coverage. If the range was (1, 10000), that would be
     # long enough to use 3 bytes for remaining length, however that would make the test
     # run for many minutes even on modern systems, so 1000 is used instead.
@@ -95,7 +138,7 @@ testdata = [
 @pytest.mark.parametrize(
     "topic,to_send,exp_recv",
     testdata,
-    ids=["short_topic", "long_topic", "topic_list_long"],
+    ids=["short_topic", "long_topic", "publish_first", "topic_list_long"],
 )
 def test_unsubscribe(topic, to_send, exp_recv) -> None:
     """

It passes the test_unpublish test on a CPython system.

Feel free to grab it from vladak@2047753

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants