mqtt_publish_in_callback.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import unittest
  2. import settings
  3. import time
  4. import mosquitto
  5. import serial
  6. def on_message(mosq, obj, msg):
  7. obj.message_queue.append(msg)
  8. class mqtt_publish_in_callback(unittest.TestCase):
  9. message_queue = []
  10. @classmethod
  11. def setUpClass(self):
  12. self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self)
  13. self.client.connect(settings.server_ip)
  14. self.client.on_message = on_message
  15. self.client.subscribe("outTopic",0)
  16. @classmethod
  17. def tearDownClass(self):
  18. self.client.disconnect()
  19. def test_connect(self):
  20. i=30
  21. while len(self.message_queue) == 0 and i > 0:
  22. self.client.loop()
  23. time.sleep(0.5)
  24. i -= 1
  25. self.assertTrue(i>0, "message receive timed-out")
  26. self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
  27. msg = self.message_queue.pop(0)
  28. self.assertEqual(msg.mid,0,"message id not 0")
  29. self.assertEqual(msg.topic,"outTopic","message topic incorrect")
  30. self.assertEqual(msg.payload,"hello world")
  31. self.assertEqual(msg.qos,0,"message qos not 0")
  32. self.assertEqual(msg.retain,False,"message retain flag incorrect")
  33. def test_publish(self):
  34. self.assertEqual(len(self.message_queue), 0, "message queue not empty")
  35. payload = "abcdefghij"
  36. self.client.publish("inTopic",payload)
  37. i=30
  38. while len(self.message_queue) == 0 and i > 0:
  39. self.client.loop()
  40. time.sleep(0.5)
  41. i -= 1
  42. self.assertTrue(i>0, "message receive timed-out")
  43. self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
  44. msg = self.message_queue.pop(0)
  45. self.assertEqual(msg.mid,0,"message id not 0")
  46. self.assertEqual(msg.topic,"outTopic","message topic incorrect")
  47. self.assertEqual(msg.payload,payload)
  48. self.assertEqual(msg.qos,0,"message qos not 0")
  49. self.assertEqual(msg.retain,False,"message retain flag incorrect")