AWSIoTPythonSDK のshadowGetで subscribeTimeoutException がおきた話
Greengrass上で実行されるアプリケーションの開発中に目にした、Device Shadowを取得の挙動について備忘録として記録に残しておきます。 AWS IoT CoreのMQTT topicsやDevice Shadowのことをある程度知っていると理解しやすいかと思います。
何かしら間違ったことを書いてしまっていたところがあればご指摘いただけると幸いです。
GreengrassとAWSIoTPythonについて
Greengrassは正式名称には「AWS IoT Greengrass」で、AWSが提供するIoT向けのサービスです。IoT機器上に AWS Lambdaをデプロイしたり、実行してりできます。
Greengrassでは証明書を使ってAWSリソースの権限を制御することができます。 AWSIoTPythonというSDKを使うことで、Greengrassの証明書を使ってMQTTやMQTT over the WebSocket protocolでAWSのリソースにアクセスすることができます。
DeviceShadowの値を取得してみる
以下のコードではGreengrassの Device Shadow というデバイスと他のシステム(バックエンドサーバなど)でデータを共有するためのデータストアの値を取得しています。 データの取得はshadowGetという関数を用いていて、受け取った値をsrcCallback関数に引き渡すことでDeviceShadowの値を取得できます。 以下のコードは抜粋なので、コード全体を見たい場合は GISTの方 をご確認ください。
def shadowGet_callback(payload: str, response_status: str, token: str): print("Got shadow:") print(json.dumps(json.loads(payload), indent=2)) def main(): CLIENT_ID = "test1_Core" client = create_mqtt_connection(client_id=CLIENT_ID) shadow_handler = build_shadow_handler( clientID=CLIENT_ID, awsIoTMQTTClient=client ) shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) if __name__ == "__main__": main()
実行するとこのような結果になります。
> python shadowGet.py Got shadow: { "state": { "desired": { "welcome": "aws-iot" }, "reported": { "welcome": "aws-iot" } }, "metadata": { "desired": { "welcome": { "timestamp": 1614564291 } }, "reported": { "welcome": { "timestamp": 1614564291 } } }, "version": 1, "timestamp": 1615016122, "clientToken": "95a4a071-1e17-4d2d-b3f8-4ebfa7f2d1a1" }
DeviceShadowは、AWS Consoleで確認すると以下のようになっています。
subscribeしてみる
AWS IoTではMQTTを利用してメッセージのパブリッシュとサブスクライブをすることができます。 docs.aws.amazon.com
以下のコードでは hello/world
というトピックをサブスクライブして、メッセージを受け取ったら出力しています(全体は こちら )。
def main(): CLIENT_ID = "test1_Core" client = create_mqtt_connection(client_id=CLIENT_ID) def subscribe_callback(client, userdata, message): print(f"Received a new message: {message.payload.decode('utf-8')}") client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback) while True: time.sleep(5) if __name__ == "__main__": main()
ブラウザではAWS ConsoleにMQTTのテストするクライアントが提供されていて、 hello/worldのトピックにメッセージを発行しています。ターミナルの方でプログラムを実行し、メッセージを受け取れていることが確認できます。
実行に失敗するコード
ここからが本記事の本題なのですが、MQTTのサブスクライブのcallback関数の中で、shadowを取得しようとするとなぜか例外があがってしまうという問題がおきました。 具体的な問題が起きるコードは抜粋すると以下のような感じです。
def shadowGet_callback(payload: str, response_status: str, token: str): print(f"shadow: {json.loads(payload)}") def main(): CLIENT_ID = "test1_Core" client = create_mqtt_connection(client_id=CLIENT_ID) shadow_handler = build_shadow_handler( clientID=CLIENT_ID, awsIoTMQTTClient=client ) def subscribe_callback(client, userdata, message): print(f"Received a new message: {message.payload.decode('utf-8')}") shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback) while True: time.sleep(5) if __name__ == "__main__": main()
このコードでhello/worldトピックにパブリッシュすると、callbackの最初のprintは実行された後に subscribeTimeoutException の例外があがります。
$ python fail_shadowGet.py Received a new message: { "message": "Hello from AWS IoT console" } Subscribe timed out Exception in thread Thread-1: Traceback (most recent call last): File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner self.run() File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 147, in _dispatch self._dispatch_one() File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 154, in _dispatch_one self._dispatch_methods[event_type](mid, data) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 237, in _dispatch_message message_callback(None, None, message) # message_callback(client, userdata, message) File "fail_shadowGet.py", line 47, in subscribe_callback shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/deviceShadow.py", line 243, in shadowGet self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/shadowManager.py", line 70, in basicShadowSubscribe self._mqttCoreHandler.subscribe(currentShadowAction.getTopicAccept(), 0, srcCallback) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/mqtt_core.py", line 306, in subscribe raise subscribeTimeoutException() AWSIoTPythonSDK.exception.AWSIoTExceptions.subscribeTimeoutException
subscribeTimeoutExceptionが発生してしまう原因について調査
以降では、subscribeTimeoutExceptionが発生してしまう原因について調査をしていきます。
調査1. subscribeTimeoutExceptionという例外クラスについて
subscribeTimeoutExceptionの名前からサブスクライブ時にタイムアウトが発生しているんやろうなぁと予想できますが、なぜタイムアウトが発生してしまうのか、どのような条件だと例外が発生するかなど、何かしら情報がないか探してみます。
まずこのクラスの定義を確認してみると、クラス定義ではoperationTimeoutExceptionを継承していることやmsgが設定されていることはわかりますが、原因の手がかりにつながりそうな情報はなさそうでした。
class subscribeTimeoutException(operationTimeoutException.operationTimeoutException): def __init__(self, msg="Subscribe Timeout"): self.message = msg
APIドキュメントのサイトが公開されており、検索ボックスもあったのでsubscribeTimeoutExceptionで検索をしてみましたが、残念ながら検索ヒット数は0件でした。
subscribeTimeoutExceptionでGoogle検索をした場合に出てくる情報にもいくつか目をとおしてみましたが、権限周りの設定が失敗している場合など、今回調査しているようなケースではなさそうでした。
調査2. shadowGetについて
例外をあげている shadowGet関数の説明 をみてみます。
shadowGet(srcCallback, srcTimeout) Description Retrieve the device shadow JSON document from AWS IoT by publishing an empty JSON document to the corresponding shadow topics. Shadow response topics will be subscribed to receive responses from AWS IoT regarding the result of the get operation. Retrieved shadow JSON document will be available in the registered callback. If no response is received within the provided timeout, a timeout notification will be passed into the registered callback.
shadowGetではAWS IoTのトピックにパブリッシュとサブスクライブすることで値を取得していることがわかります。 MQTT経由でDevice Shadowの値を取得する方法は AWS側で用意されているトピックを利用します。 AWS側が用意しているトピックについては、詳細についてはドキュメントに記載されています。
今回の場合は $aws/things/test1_Core/shadow/get
のトピックにパブリッシュすることで、 $aws/things/test1_Core/shadow/get/accepted
か $aws/things/test1_Core/shadow/get/rejected
のトピックに値が流れてきます。
AWS ConsoleでMQTTのテストツールを使うことで、AWS IoTへのトピックのパブリッシュとサブスクライブを確認することができます。
以下の動画では、$aws/things/test1_Core/shadow/get/accepted
というトピックをサブスクライブした状態で $aws/things/test1_Core/shadow/get
というトピックにパブリッシュをしています。
重要なことなのでもう一度書きますが、$aws/things/test1_Core/shadow/get/accepted
というトピックをまずサブスクライブした後に、$aws/things/test1_Core/shadow/get
というトピックにパブリッシュをしています。この順番は大事で、後ほども触れます。
サブスクライブした状態でパブリッシュすると値は取得できますが、パブリッシュした後にサブスクライブしても値は取得することはできません。
$aws/things/test1_Core/shadow/get
にパブリッシュした後に $aws/things/test1_Core/shadow/get/accepted
に値がながれてきていることがわかります。
補足ですが、shadowGetの第二引数のsrcTimeoutは、$aws/things/test1_Core/shadow/get
にトピックを発行して $aws/things/test1_Core/shadow/get/accepted
が取得できるまでのタイムアウトです。この場合のタイムアウトでは、タイムアウトが起きたことを伝える情報を伴ってcallbackが呼ばれるため subscribeTimeoutException は発生しないので、今回調査しているタイムアウトとは関係ありません。
shadowGetでは、AWS IoTのトピックにパブリッシュとサブスクライブをすることでDevice Shadowの値を取得していることがわかりました。
調査3. shadowGetの実装について
shadowGetの実装をみてみます。 抜粋すると以下のような実装になっています。
def shadowGet(self, srcCallback, srcTimeout): # (省略) # Two subscriptions if not self._isPersistentSubscribe or not self._isGetSubscribed: self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback) self._isGetSubscribed = True self._logger.info("Subscribed to get accepted/rejected topics for deviceShadow: " + self._shadowName) # One publish self._shadowManagerHandler.basicShadowPublish(self._shadowName, "get", currentPayload) # (省略)
関数の実装をすべてみたい場合はソースをご確認ください。 github.com
コメントに「Two subscriptions」と「One publish」と書かれています。「 調査2. shadowGetについて」で確認した
$aws/things/test1_Core/shadow/get/accepted
と $aws/things/test1_Core/shadow/get/rejected
のサブスクライブと $aws/things/test1_Core/shadow/get
のトピックにパブリッシュすることでDevice Shadowの値を取得していることと合致していることがわかります。
実装で気になる点としては、サブスクライブする時に not self._isPersistentSubscribe or not self._isGetSubscribed
の条件が真の場合のみ実行されています。サブスクライブを永続化するオプションでクライアントを生成し、一度shadowGetを行うと最初の実行時のみサブスクライブがされ、以降では使いまわしているようです。
コードをみても調査2のAWS IoTのトピックにパブリッシュとサブスクライブをすることでDevice Shadowの値を取得しているということが確認できました。
調査4. subscribeTimeoutExceptionの例外を上げている箇所について
次にPythonのTracebackをみていきます。
Traceback (most recent call last): File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner self.run() File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 147, in _dispatch self._dispatch_one() File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 154, in _dispatch_one self._dispatch_methods[event_type](mid, data) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 237, in _dispatch_message message_callback(None, None, message) # message_callback(client, userdata, message) File "fail_shadowGet.py", line 47, in subscribe_callback shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/deviceShadow.py", line 243, in shadowGet self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/shadowManager.py", line 70, in basicShadowSubscribe self._mqttCoreHandler.subscribe(currentShadowAction.getTopicAccept(), 0, srcCallback) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/mqtt_core.py", line 306, in subscribe raise subscribeTimeoutException() AWSIoTPythonSDK.exception.AWSIoTExceptions.subscribeTimeoutException
以下の部分から「調査3. shadowGetの実装について」でみたshadowGet関数の中のサブスクライブをしようとしている箇所で例外があがっていることがわかります。
File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/deviceShadow.py", line 243, in shadowGet self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback)
実際に例外があげられているのはAWSIoTPythonSDK/core/protocol/mqtt_core.py
の中のsubscribe関数の中となっています。
さて、ドキュメントの関数一覧をみてみると、AWSIoTPythonSDKでは、subscribe と subscribeAsync の二通りのサブスクライブするインターフェイスが提供されていることがわかります。名前からある程度察することができますが、AWSIoTPythonではサブスクライブする時に同期的行うか、非同期的に行うかを選ぶことができます。
同期的なsubscribe関数では、関数の実行が完了した時にサブスクライブしていることが保証されますが、非同期的なsubscribeAsync関数ではサブスクライブしていることは保証されていません。
調査2と調査3で確認した通り、shadowGetではまずはトピックをサブスクライブした後に、パブリッシュをして値を取得しています。そのため、サブスクライブが完了していることが保証されるsubscribe関数を使用する必要があります。
getShadowは、トピックのサブスクライブをする時に、特にsbscribe関数を使っているということがわかります。
調査5. READMEに記載されている同期的な呼び出しと非同期的な呼び出しについて
さて、次はREADMEに記載されている「Synchronous APIs and Asynchronous APIs」という部分をみてみます。
Synchronous APIs and Asynchronous APIs Beginning with Release v1.2.0, SDK provides asynchronous APIs and enforces synchronous API behaviors for MQTT operations, which includes: - connect/connectAsync - disconnect/disconnectAsync - publish/publishAsync - subscribe/subscribeAsync - unsubscribe/unsubscribeAsync
関数の一覧の中には subscribe/subscirbeAsync
などサブスクライブに関して同期的なAPIと非同期的なAPIがあることが示されています。
もう少し読み進めると、以下のような説明があります。
Since callbacks are sequentially dispatched and invoked, calling synchronous APIs within callbacks will deadlock the user application. If users are inclined to utilize the asynchronous mode and perform MQTT operations within callbacks, asynchronous APIs should be used.
DeepLでの翻訳も載せておきます。
コールバックは順次ディスパッチされて呼び出されるので、コールバック内で同期APIを呼び出すと、ユーザアプリケーションはデッドロックしてしまいます。 ユーザーが非同期モードを利用してコールバック内でMQTT操作を行いたい場合は、非同期APIを使用する必要があります。
callback内で同期的なAPI呼び出しするとデッドロックを引き起こすため、非同期的なAPIを呼び出すように注意されています。
ここで、わざとcallback内で同期的なAPIを呼び出してみた場合の挙動を確認してみます。
def main(): CLIENT_ID = "test1_Core" client = create_mqtt_connection(client_id=CLIENT_ID) shadow_handler = AWSIoTMQTTShadowClient( clientID=CLIENT_ID, awsIoTMQTTClient=client ).createShadowHandlerWithName(shadowName=CLIENT_ID, isPersistentSubscribe=True) def subscribe_callback(_client, _userdata, message): print(f"Received a new message: {message.payload.decode('utf-8')}") def subscribe_callback2(_client, _userdata, message): print("subscribe_callback2 is called") client.subscribe(topic="hello/world2", QoS=1, callback=subscribe_callback2) client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback) while True: time.sleep(5) if __name__ =
実行してみると、この場合でもsubscribeTimeoutExceptionがでることがわかりました。
> python fail_shadowGet2.py Received a new message: { "message": "Hello from AWS IoT console" } Subscribe timed out Exception in thread Thread-1: Traceback (most recent call last): File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner self.run() File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 147, in _dispatch self._dispatch_one() File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 154, in _dispatch_one self._dispatch_methods[event_type](mid, data) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 237, in _dispatch_message message_callback(None, None, message) # message_callback(client, userdata, message) File "fail_shadowGet2.py", line 54, in subscribe_callback client.subscribe(topic="hello/world2", QoS=1, callback=subscribe_callback2) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/MQTTLib.py", line 696, in subscribe return self._mqtt_core.subscribe(topic, QoS, callback) File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/mqtt_core.py", line 306, in subscribe raise subscribeTimeoutException() AWSIoTPythonSDK.exception.AWSIoTExceptions.subscribeTimeoutException
callback内で同期的なAPIを呼び出すとsubscribeTimeoutExceptionの例外があがることがわかりました。
調査でわかったことの整理とまとめ
ここまでにわかったことをまとめると以下のようになります。
- 調査2~4
- shadowGetは2つのトピックを同期的なAPIのsubscribeを使ってサブスクライブする
- 調査5
- callback内で同期的なAPIを使用するとsubscribeTimeoutExceptionが発生する
したがって、今回起きていた問題の原因は、callback内で同期的なAPIのsubscribeを実行するshadowGetを実行してしまうことで、subscribeTimeoutException が発生してしまっていた、ということが原因を特定することができました。めでたし、めでたし。
対応策
いくつか考えられる対応策について説明します。
対応策1. callback前にsubscribeをすませておく
「調査3. shadowGetの実装について」で見たように、shadowGetではサブスクライブするかどうかの条件があります。接続を永続化する設定かつ、すでにサブスクライブ済みであれば、同期的なサブスクライブは行われません。
def shadowGet(self, srcCallback, srcTimeout): # (省略) # Two subscriptions if not self._isPersistentSubscribe or not self._isGetSubscribed: self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback) self._isGetSubscribed = True self._logger.info("Subscribed to get accepted/rejected topics for deviceShadow: " + self._shadowName) # One publish self._shadowManagerHandler.basicShadowPublish(self._shadowName, "get", currentPayload) # (省略)
よって、callback外で一度shadowGetを呼ぶことで、それ以降のshadowGetでは同期的なサブスクライブを避けることができます。
def shadowGet_callback(payload: str, response_status: str, token: str): print(f"shadow: {json.loads(payload)}") def main(): CLIENT_ID = "test1_Core" client = create_mqtt_connection(client_id=CLIENT_ID) shadow_handler = build_shadow_handler(client_id=CLIENT_ID, client=client) def subscribe_callback(client, userdata, message): print(f"Received a new message: {message.payload.decode('utf-8')}") shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) # 追加:subscribeをする前にshadowGetを実行する shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback) while True: time.sleep(5) if __name__ == "__main__": main()
対応策2. subscribeとshadowの取得のMQTTの接続を分ける
同じMQTTの接続の場合にデッドロックが発生してしまうためサブスクライブするMQTTの接続とcallback中に実行するMQTTの接続を分けることで、デッドロックを回避することが可能です。
def shadowGet_callback(payload: str, response_status: str, token: str): print(f"shadow: {json.loads(payload)}") def main(): client = create_mqtt_connection(client_id="test1_Core") shadow_client = create_mqtt_connection(client_id="test1_Core_shadow") shadow_handler = AWSIoTMQTTShadowClient( clientID="shadow_client", awsIoTMQTTClient=shadow_client ).createShadowHandlerWithName(shadowName=CLIENT_ID, isPersistentSubscribe=True) def subscribe_callback(client, userdata, message): print(f"Received a new message: {message.payload.decode('utf-8')}") shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10) client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback) while True: time.sleep(5) if __name__ == "__main__": main()
対応策3. boto3を使用してshadowの取得をする
こちらは未確認ですが、DeviceShadowはboto3経由でも取得することが可能なので、boto3を使えばMQTTのデッドロックは回避することは可能かと思います。
boto3のトークンにはAWS Security Token Serviceを使えばGreengrassの証明書から作れそうですが、こちらも未確認です。
感想
今回の問題は、READMEに目を通しておいてAWSIoTPythonの仕様を把握しており、shadowGetに同期的なsubscribeが行われているだろうという推理力があればある程度原因の特定まではできたのかもしれないなとは感じました。普段は便利すぎて、あまり内部実装に関心をもてていなかったですが、もう少し自分の使っているSDKやライブラリに関心を持たないといけないなぁと反省しました。
また、例外のクラス名でGoogle検索や、Githubでopenなissueがないかは探していたのですが、closeされたissueにcallback中に同期的なAPIを呼び出すケースで困っている人がいたようです。例外が上がるのが仕様の場合には、issueはcloseされるものかと思うので、closeされたissueも確認した方が良いということ学びました。
おまけ - デッドロックが起きるフロー
おまけで、callback内で同期的なsubscribeの関数を呼ぶとデッドロックが起きるフローについて挙動をおってみます。
subscribe関数の実装
subscribe関数をまずみていきます。
def subscribe(self, topic, qos, message_callback=None): self._logger.info("Performing sync subscribe...") ret = False if ClientStatus.STABLE != self._client_status.get_status(): self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None)) else: event = Event() rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback) if not event.wait(self._operation_timeout_sec): self._internal_async_client.remove_event_callback(mid) self._logger.error("Subscribe timed out") raise subscribeTimeoutException() ret = True return ret
eventという変数について注目してみます。
event = Event() rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback) if not event.wait(self._operation_timeout_sec):
Eventはpythonの標準のthreading.Eventです。
self._create_blocking_ack_callback(event)
と event.wait(self._operation_timeout_sec)
でeventを参照していることがわかります。
_create_blocking_ack_callbackは以下のような実装になっています。
def _create_blocking_ack_callback(self, event): def ack_callback(mid, data=None): event.set() return ack_callback
event.set()を呼ぶだけの関数が渡されていることがわかります。
その次の event.wait(self._operation_timeout_sec)
は、event.set()が呼ばれるまで待機するというコードです。
pythonのthreading.Eventについては、以下のブログがわかりやすくまとまっているのでおすすめです。
ここまでをまとめると、subscribe関数は、サブスクライブが完了した時に呼ばれるcallback関数でevent.set()を実行し、subscribe関数内ではevent.wait()でcallback関数が呼ばれるのを待機しています。subscribe関数が正常に完了した時には、サブスクライブされていることが保証されています。
MQTTのやりとりの実装について
さらに細かい話を進めていきます。 AWSIoTPythonでは、MQTTのクライアントにはPaho MQTT Python clientが使われています。
このライブラリではMQTTで接続した時(on_connect)やサブスクライブが完了した時(on_subscribe)やメッセージを受け取った時(on_message)にcallback関数を設定することができます。
callback関数はEventProducerクラスに纏まっています。
class EventProducer(object): def __init__(self, cv, event_queue): self._cv = cv self._event_queue = event_queue # 略 def on_subscribe(self, client, user_data, mid, granted_qos): self._add_to_queue(mid, EventTypes.SUBACK, granted_qos) self._logger.debug("Produced [suback] event") # 略 def _add_to_queue(self, mid, event_type, data): with self._cv: self._event_queue.put((mid, event_type, data)) self._cv.notify()
EventProducerクラスでは、MQTTで何かしらデータを受け取ったら_add_to_queue関数で _event_queue
に追加をしているだけで、実際の処理を行うのは他のクラスになります。
_cv
という変数には、pythonのthreading.Conditionのオブジェクトが渡されていて、with self._cv
と self._cv.notify()
という使われ方をされています。今回デッドロックを引き起こしているのはまさにこの _cv
変数の部分なのですが、これを参照しているもう一か所のところを触れてから説明します。
省略した部分以外も確認したい場合はソースをみてください。 github.com
実際にpahoのmqttクライアントにEventProducerの関数が登録されているところは、MqttCoreクラスのコンストラクタの_init_worlersあたりの処理を追うとわかるかと思います。
MQTTを受け取った後の処理について
EventProducerが _event_queue
に追加したeventを処理するのは EventConsumerクラスとなっています。
class EventConsumer(object): def __init__(self, cv, event_queue, internal_async_client, subscription_manager, offline_requests_manager, client_status): self._cv = cv # 略 def start(self): #略 dispatch_events = Thread(target=self._dispatch) #略 def _dispatch(self): while self._is_running: with self._cv: if self._event_queue.empty(): self._cv.wait(self.MAX_DISPATCH_INTERNAL_SEC) else: while not self._event_queue.empty(): self._dispatch_one() # 略
EventConsumerで今回の問題に関係するところだけを抜き出しています。
コンストラクタで渡されている cv
はEventProducerと同じオブジェクトが渡されていて、共有しています。
_event_queue
経由で渡されるイベントを処理しているのは、 dispatch関数となります。
dispatch関数では with self._cv
と self._cv.wait
で _cv
を参照しています。
EventConsumerクラスのソースは以下です。
Pythonのthreading.Conditionについて
threading.Condition は、条件付きのロックでwith文で囲まれたブロック内でロックを獲得することができます。threading.Conditionの面白い(?)ところとしては、wait関数を呼ぶことでConditionオブジェクトのロックを開放して、別のスレッドにConditionオブジェクトでロックをかけて処理をさせることができるようにできます。wait関数はConditionオブジェクトのnotify関数を呼ぶことで再度処理を進めることができます。
今回のEventConsumer._dispatchではeventキューが空の場合には self._cv.wait(self.MAX_DISPATCH_INTERNAL_SEC)
でロックを開放しておきますが、eventを処理している最中にはロックをかけていることがわかります。
def _dispatch(self): while self._is_running: with self._cv: if self._event_queue.empty(): self._cv.wait(self.MAX_DISPATCH_INTERNAL_SEC) else: while not self._event_queue.empty(): self._dispatch_one()
EventProducerクラスのeventキューに追加しているところをもう一度みてみると、ロックがかかっている間は、MQTTからデータを受け取った後にeventキューに追加することはできません。
def _add_to_queue(self, mid, event_type, data): with self._cv: self._event_queue.put((mid, event_type, data)) self._cv.notify()
したがって、 _dispatch
の処理中に新しくMQTTで受け取ったデータは、 _dispatch
が完了するまで取得することができません。
同期的なsubscribeでは、MQTTでサブスクライブが完了したデータ(on_subscribe)を受け取って、dispatch内でcallbackの実行(event.set())を呼ぶことを期待しています。ただし、subscribeのcallback内(dispatch関数で _cv
にロックをかけた状態)で同期的なsubscribeの関数を呼ぶと、EventProducerがon_subscribeでeventキューに新しく追加しようとしても _cv
のロックがかかっているためロックの開放をまってしまいます。subscribeのcallbackも、event.set()が呼ばれるまで _cv
を解放できないという状態になってしまうため、デッドロックが生じてしまっているということがわかりました。
おまけの感想
非同期的な処理がはいると、処理を追うのがかなり大変でした。実際、今回例外が上がる箇所から、どこでデッドロックが起きているかの特定は、ブログを書き始めた後の最後の方でやっと特定できた感じでした。 ただ、SDK内でもログレベルをDEBUGにするといろいろログをはいてくれるのでかなり助かりました。
pythonのthreading周りのドキュメントは何度か読んだことはあるはずなのですが、threading.Conditionはすっかり忘れていたので、具体的なユースケースを目にすることができたのはとてもよかったなぁと感じました。