12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- import unittest
- import settings
- import time
- import mosquitto
- import serial
- def on_message(mosq, obj, msg):
- obj.message_queue.append(msg)
- class mqtt_basic(unittest.TestCase):
-
- message_queue = []
-
- @classmethod
- def setUpClass(self):
- self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self)
- self.client.connect(settings.server_ip)
- self.client.on_message = on_message
- self.client.subscribe("outTopic",0)
- @classmethod
- def tearDownClass(self):
- self.client.disconnect()
-
- def test_one(self):
- i=30
- while len(self.message_queue) == 0 and i > 0:
- self.client.loop()
- time.sleep(0.5)
- i -= 1
- self.assertTrue(i>0, "message receive timed-out")
- self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
- msg = self.message_queue[0]
- self.assertEqual(msg.mid,0,"message id not 0")
- self.assertEqual(msg.topic,"outTopic","message topic incorrect")
- self.assertEqual(msg.payload,"hello world")
- self.assertEqual(msg.qos,0,"message qos not 0")
- self.assertEqual(msg.retain,False,"message retain flag incorrect")
-
-
|