試運転ブログ

技術的なあれこれ

AWSIoTPythonSDK のshadowGetで subscribeTimeoutException がおきた話

Greengrass上で実行されるアプリケーションの開発中に目にした、Device Shadowを取得の挙動について備忘録として記録に残しておきます。 AWS IoT CoreのMQTT topicsやDevice Shadowのことをある程度知っていると理解しやすいかと思います。

何かしら間違ったことを書いてしまっていたところがあればご指摘いただけると幸いです。

GreengrassとAWSIoTPythonについて

Greengrassは正式名称には「AWS IoT Greengrass」で、AWSが提供するIoT向けのサービスです。IoT機器上に AWS Lambdaをデプロイしたり、実行してりできます。

aws.amazon.com

Greengrassでは証明書を使ってAWSリソースの権限を制御することができます。 AWSIoTPythonというSDKを使うことで、Greengrassの証明書を使ってMQTTやMQTT over the WebSocket protocolでAWSのリソースにアクセスすることができます。

github.com

DeviceShadowの値を取得してみる

以下のコードではGreengrassの Device Shadow というデバイスと他のシステム(バックエンドサーバなど)でデータを共有するためのデータストアの値を取得しています。 データの取得はshadowGetという関数を用いていて、受け取った値をsrcCallback関数に引き渡すことでDeviceShadowの値を取得できます。 以下のコードは抜粋なので、コード全体を見たい場合は GISTの方 をご確認ください。

def shadowGet_callback(payload: str, response_status: str, token: str):
    print("Got shadow:")
    print(json.dumps(json.loads(payload), indent=2))


def main():
    CLIENT_ID = "test1_Core"
    client = create_mqtt_connection(client_id=CLIENT_ID)
    shadow_handler = build_shadow_handler(
        clientID=CLIENT_ID, awsIoTMQTTClient=client
    )

    shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)


if __name__ == "__main__":
    main()

実行するとこのような結果になります。

> python shadowGet.py
Got shadow:
{
  "state": {
    "desired": {
      "welcome": "aws-iot"
    },
    "reported": {
      "welcome": "aws-iot"
    }
  },
  "metadata": {
    "desired": {
      "welcome": {
        "timestamp": 1614564291
      }
    },
    "reported": {
      "welcome": {
        "timestamp": 1614564291
      }
    }
  },
  "version": 1,
  "timestamp": 1615016122,
  "clientToken": "95a4a071-1e17-4d2d-b3f8-4ebfa7f2d1a1"
}

DeviceShadowは、AWS Consoleで確認すると以下のようになっています。

f:id:otameshi61:20210306180118p:plain
DeviceShadowの画面

subscribeしてみる

AWS IoTではMQTTを利用してメッセージのパブリッシュとサブスクライブをすることができます。 docs.aws.amazon.com

以下のコードでは hello/world というトピックをサブスクライブして、メッセージを受け取ったら出力しています(全体は こちら )。

def main():
    CLIENT_ID = "test1_Core"
    client = create_mqtt_connection(client_id=CLIENT_ID)

    def subscribe_callback(client, userdata, message):
        print(f"Received a new message: {message.payload.decode('utf-8')}")

    client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback)

    while True:
        time.sleep(5)


if __name__ == "__main__":
    main()

ブラウザではAWS ConsoleにMQTTのテストするクライアントが提供されていて、 hello/worldのトピックにメッセージを発行しています。ターミナルの方でプログラムを実行し、メッセージを受け取れていることが確認できます。

f:id:otameshi61:20210307170529g:plain
MQTTのパブリッシュとサブスクライブの確認

実行に失敗するコード

ここからが本記事の本題なのですが、MQTTのサブスクライブのcallback関数の中で、shadowを取得しようとするとなぜか例外があがってしまうという問題がおきました。 具体的な問題が起きるコードは抜粋すると以下のような感じです。

def shadowGet_callback(payload: str, response_status: str, token: str):
    print(f"shadow: {json.loads(payload)}")


def main():
    CLIENT_ID = "test1_Core"
    client = create_mqtt_connection(client_id=CLIENT_ID)
    shadow_handler = build_shadow_handler(
        clientID=CLIENT_ID, awsIoTMQTTClient=client
    )

    def subscribe_callback(client, userdata, message):
        print(f"Received a new message: {message.payload.decode('utf-8')}")
        shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)

    client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback)

    while True:
        time.sleep(5)


if __name__ == "__main__":
    main()

このコードでhello/worldトピックにパブリッシュすると、callbackの最初のprintは実行された後に subscribeTimeoutException の例外があがります。

$ python fail_shadowGet.py
Received a new message: {
  "message": "Hello from AWS IoT console"
}
Subscribe timed out
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 147, in _dispatch
    self._dispatch_one()
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 154, in _dispatch_one
    self._dispatch_methods[event_type](mid, data)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 237, in _dispatch_message
    message_callback(None, None, message)  # message_callback(client, userdata, message)
  File "fail_shadowGet.py", line 47, in subscribe_callback
    shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/deviceShadow.py", line 243, in shadowGet
    self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/shadowManager.py", line 70, in basicShadowSubscribe
    self._mqttCoreHandler.subscribe(currentShadowAction.getTopicAccept(), 0, srcCallback)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/mqtt_core.py", line 306, in subscribe
    raise subscribeTimeoutException()
AWSIoTPythonSDK.exception.AWSIoTExceptions.subscribeTimeoutException

subscribeTimeoutExceptionが発生してしまう原因について調査

以降では、subscribeTimeoutExceptionが発生してしまう原因について調査をしていきます。

調査1. subscribeTimeoutExceptionという例外クラスについて

subscribeTimeoutExceptionの名前からサブスクライブ時にタイムアウトが発生しているんやろうなぁと予想できますが、なぜタイムアウトが発生してしまうのか、どのような条件だと例外が発生するかなど、何かしら情報がないか探してみます。

まずこのクラスの定義を確認してみると、クラス定義ではoperationTimeoutExceptionを継承していることやmsgが設定されていることはわかりますが、原因の手がかりにつながりそうな情報はなさそうでした。

class subscribeTimeoutException(operationTimeoutException.operationTimeoutException):
    def __init__(self, msg="Subscribe Timeout"):
        self.message = msg

github.com

APIドキュメントのサイトが公開されており、検索ボックスもあったのでsubscribeTimeoutExceptionで検索をしてみましたが、残念ながら検索ヒット数は0件でした。

s3.amazonaws.com

subscribeTimeoutExceptionでGoogle検索をした場合に出てくる情報にもいくつか目をとおしてみましたが、権限周りの設定が失敗している場合など、今回調査しているようなケースではなさそうでした。

調査2. shadowGetについて

例外をあげている shadowGet関数の説明 をみてみます。

shadowGet(srcCallback, srcTimeout)
Description

Retrieve the device shadow JSON document from AWS IoT by publishing an empty JSON document to the corresponding shadow topics. 
Shadow response topics will be subscribed to receive responses from AWS IoT regarding the result of the get operation. 
Retrieved shadow JSON document will be available in the registered callback. 
If no response is received within the provided timeout, a timeout notification will be passed into the registered callback.

shadowGetではAWS IoTのトピックにパブリッシュとサブスクライブすることで値を取得していることがわかります。 MQTT経由でDevice Shadowの値を取得する方法は AWS側で用意されているトピックを利用します。 AWS側が用意しているトピックについては、詳細についてはドキュメントに記載されています。

docs.aws.amazon.com

今回の場合は $aws/things/test1_Core/shadow/get のトピックにパブリッシュすることで、 $aws/things/test1_Core/shadow/get/accepted$aws/things/test1_Core/shadow/get/rejected のトピックに値が流れてきます。

AWS ConsoleでMQTTのテストツールを使うことで、AWS IoTへのトピックのパブリッシュとサブスクライブを確認することができます。

以下の動画では、$aws/things/test1_Core/shadow/get/acceptedというトピックをサブスクライブした状態で $aws/things/test1_Core/shadow/get というトピックにパブリッシュをしています。 重要なことなのでもう一度書きますが、$aws/things/test1_Core/shadow/get/acceptedというトピックをまずサブスクライブした後に、$aws/things/test1_Core/shadow/get というトピックにパブリッシュをしています。この順番は大事で、後ほども触れます。 サブスクライブした状態でパブリッシュすると値は取得できますが、パブリッシュした後にサブスクライブしても値は取得することはできません。

$aws/things/test1_Core/shadow/get にパブリッシュした後に $aws/things/test1_Core/shadow/get/accepted に値がながれてきていることがわかります。

f:id:otameshi61:20210301193550g:plain
MQTT経由でdevice shadowを取得する

補足ですが、shadowGetの第二引数のsrcTimeoutは、$aws/things/test1_Core/shadow/get にトピックを発行して $aws/things/test1_Core/shadow/get/accepted が取得できるまでのタイムアウトです。この場合のタイムアウトでは、タイムアウトが起きたことを伝える情報を伴ってcallbackが呼ばれるため subscribeTimeoutException は発生しないので、今回調査しているタイムアウトとは関係ありません。

shadowGetでは、AWS IoTのトピックにパブリッシュとサブスクライブをすることでDevice Shadowの値を取得していることがわかりました。

調査3. shadowGetの実装について

shadowGetの実装をみてみます。 抜粋すると以下のような実装になっています。

    def shadowGet(self, srcCallback, srcTimeout):
        # (省略)

        # Two subscriptions
        if not self._isPersistentSubscribe or not self._isGetSubscribed:
            self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback)
            self._isGetSubscribed = True
            self._logger.info("Subscribed to get accepted/rejected topics for deviceShadow: " + self._shadowName)
        # One publish
        self._shadowManagerHandler.basicShadowPublish(self._shadowName, "get", currentPayload)

        # (省略)

関数の実装をすべてみたい場合はソースをご確認ください。 github.com

コメントに「Two subscriptions」と「One publish」と書かれています。「 調査2. shadowGetについて」で確認した $aws/things/test1_Core/shadow/get/accepted$aws/things/test1_Core/shadow/get/rejected のサブスクライブと $aws/things/test1_Core/shadow/get のトピックにパブリッシュすることでDevice Shadowの値を取得していることと合致していることがわかります。

実装で気になる点としては、サブスクライブする時に not self._isPersistentSubscribe or not self._isGetSubscribed の条件が真の場合のみ実行されています。サブスクライブを永続化するオプションでクライアントを生成し、一度shadowGetを行うと最初の実行時のみサブスクライブがされ、以降では使いまわしているようです。

コードをみても調査2のAWS IoTのトピックにパブリッシュとサブスクライブをすることでDevice Shadowの値を取得しているということが確認できました。

調査4. subscribeTimeoutExceptionの例外を上げている箇所について

次にPythonのTracebackをみていきます。

Traceback (most recent call last):
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 147, in _dispatch
    self._dispatch_one()
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 154, in _dispatch_one
    self._dispatch_methods[event_type](mid, data)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 237, in _dispatch_message
    message_callback(None, None, message)  # message_callback(client, userdata, message)
  File "fail_shadowGet.py", line 47, in subscribe_callback
    shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/deviceShadow.py", line 243, in shadowGet
    self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/shadowManager.py", line 70, in basicShadowSubscribe
    self._mqttCoreHandler.subscribe(currentShadowAction.getTopicAccept(), 0, srcCallback)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/mqtt_core.py", line 306, in subscribe
    raise subscribeTimeoutException()
AWSIoTPythonSDK.exception.AWSIoTExceptions.subscribeTimeoutException

以下の部分から「調査3. shadowGetの実装について」でみたshadowGet関数の中のサブスクライブをしようとしている箇所で例外があがっていることがわかります。

  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/shadow/deviceShadow.py", line 243, in shadowGet
    self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback)

実際に例外があげられているのはAWSIoTPythonSDK/core/protocol/mqtt_core.py の中のsubscribe関数の中となっています。

さて、ドキュメントの関数一覧をみてみると、AWSIoTPythonSDKでは、subscribesubscribeAsync の二通りのサブスクライブするインターフェイスが提供されていることがわかります。名前からある程度察することができますが、AWSIoTPythonではサブスクライブする時に同期的行うか、非同期的に行うかを選ぶことができます。

同期的なsubscribe関数では、関数の実行が完了した時にサブスクライブしていることが保証されますが、非同期的なsubscribeAsync関数ではサブスクライブしていることは保証されていません。

調査2と調査3で確認した通り、shadowGetではまずはトピックをサブスクライブした後に、パブリッシュをして値を取得しています。そのため、サブスクライブが完了していることが保証されるsubscribe関数を使用する必要があります。

getShadowは、トピックのサブスクライブをする時に、特にsbscribe関数を使っているということがわかります。

s3.amazonaws.com

調査5. READMEに記載されている同期的な呼び出しと非同期的な呼び出しについて

さて、次はREADMEに記載されている「Synchronous APIs and Asynchronous APIs」という部分をみてみます。

Synchronous APIs and Asynchronous APIs
Beginning with Release v1.2.0, SDK provides asynchronous APIs and enforces synchronous API behaviors for MQTT operations, which includes: - connect/connectAsync - disconnect/disconnectAsync - publish/publishAsync - subscribe/subscribeAsync - unsubscribe/unsubscribeAsync

github.com

関数の一覧の中には subscribe/subscirbeAsync などサブスクライブに関して同期的なAPIと非同期的なAPIがあることが示されています。

もう少し読み進めると、以下のような説明があります。

Since callbacks are sequentially dispatched and invoked, calling synchronous APIs within callbacks will deadlock the user application. 
If users are inclined to utilize the asynchronous mode and perform MQTT operations within callbacks, asynchronous APIs should be used. 

DeepLでの翻訳も載せておきます。

コールバックは順次ディスパッチされて呼び出されるので、コールバック内で同期APIを呼び出すと、ユーザアプリケーションはデッドロックしてしまいます。
ユーザーが非同期モードを利用してコールバック内でMQTT操作を行いたい場合は、非同期APIを使用する必要があります。

callback内で同期的なAPI呼び出しするとデッドロックを引き起こすため、非同期的なAPIを呼び出すように注意されています。

ここで、わざとcallback内で同期的なAPIを呼び出してみた場合の挙動を確認してみます。

def main():
    CLIENT_ID = "test1_Core"
    client = create_mqtt_connection(client_id=CLIENT_ID)
    shadow_handler = AWSIoTMQTTShadowClient(
        clientID=CLIENT_ID, awsIoTMQTTClient=client
    ).createShadowHandlerWithName(shadowName=CLIENT_ID, isPersistentSubscribe=True)

    def subscribe_callback(_client, _userdata, message):
        print(f"Received a new message: {message.payload.decode('utf-8')}")
        def subscribe_callback2(_client, _userdata, message):
            print("subscribe_callback2 is called")

        client.subscribe(topic="hello/world2", QoS=1, callback=subscribe_callback2)

    client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback)

    while True:
        time.sleep(5)


if __name__ =

実行してみると、この場合でもsubscribeTimeoutExceptionがでることがわかりました。

> python fail_shadowGet2.py
Received a new message: {
  "message": "Hello from AWS IoT console"
}
Subscribe timed out
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 147, in _dispatch
    self._dispatch_one()
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 154, in _dispatch_one
    self._dispatch_methods[event_type](mid, data)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/internal/workers.py", line 237, in _dispatch_message
    message_callback(None, None, message)  # message_callback(client, userdata, message)
  File "fail_shadowGet2.py", line 54, in subscribe_callback
    client.subscribe(topic="hello/world2", QoS=1, callback=subscribe_callback2)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/MQTTLib.py", line 696, in subscribe
    return self._mqtt_core.subscribe(topic, QoS, callback)
  File "/Users/saso/.pyenv/versions/3.7.5/lib/python3.7/site-packages/AWSIoTPythonSDK/core/protocol/mqtt_core.py", line 306, in subscribe
    raise subscribeTimeoutException()
AWSIoTPythonSDK.exception.AWSIoTExceptions.subscribeTimeoutException

callback内で同期的なAPIを呼び出すとsubscribeTimeoutExceptionの例外があがることがわかりました。

調査でわかったことの整理とまとめ

ここまでにわかったことをまとめると以下のようになります。

  • 調査2~4
    • shadowGetは2つのトピックを同期的なAPIのsubscribeを使ってサブスクライブする
  • 調査5
    • callback内で同期的なAPIを使用するとsubscribeTimeoutExceptionが発生する

したがって、今回起きていた問題の原因は、callback内で同期的なAPIのsubscribeを実行するshadowGetを実行してしまうことで、subscribeTimeoutException が発生してしまっていた、ということが原因を特定することができました。めでたし、めでたし。

対応策

いくつか考えられる対応策について説明します。

対応策1. callback前にsubscribeをすませておく

「調査3. shadowGetの実装について」で見たように、shadowGetではサブスクライブするかどうかの条件があります。接続を永続化する設定かつ、すでにサブスクライブ済みであれば、同期的なサブスクライブは行われません。

    def shadowGet(self, srcCallback, srcTimeout):
        # (省略)

        # Two subscriptions
        if not self._isPersistentSubscribe or not self._isGetSubscribed:
            self._shadowManagerHandler.basicShadowSubscribe(self._shadowName, "get", self.generalCallback)
            self._isGetSubscribed = True
            self._logger.info("Subscribed to get accepted/rejected topics for deviceShadow: " + self._shadowName)
        # One publish
        self._shadowManagerHandler.basicShadowPublish(self._shadowName, "get", currentPayload)

        # (省略)

よって、callback外で一度shadowGetを呼ぶことで、それ以降のshadowGetでは同期的なサブスクライブを避けることができます。

def shadowGet_callback(payload: str, response_status: str, token: str):
    print(f"shadow: {json.loads(payload)}")


def main():
    CLIENT_ID = "test1_Core"
    client = create_mqtt_connection(client_id=CLIENT_ID)
    shadow_handler = build_shadow_handler(client_id=CLIENT_ID, client=client)

    def subscribe_callback(client, userdata, message):
        print(f"Received a new message: {message.payload.decode('utf-8')}")
        shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)

    # 追加:subscribeをする前にshadowGetを実行する
    shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)

    client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback)

    while True:
        time.sleep(5)


if __name__ == "__main__":
    main()

対応策2. subscribeとshadowの取得のMQTTの接続を分ける

同じMQTTの接続の場合にデッドロックが発生してしまうためサブスクライブするMQTTの接続とcallback中に実行するMQTTの接続を分けることで、デッドロックを回避することが可能です。

def shadowGet_callback(payload: str, response_status: str, token: str):
    print(f"shadow: {json.loads(payload)}")


def main():
    client = create_mqtt_connection(client_id="test1_Core")
    shadow_client = create_mqtt_connection(client_id="test1_Core_shadow")
    shadow_handler = AWSIoTMQTTShadowClient(
        clientID="shadow_client", awsIoTMQTTClient=shadow_client
    ).createShadowHandlerWithName(shadowName=CLIENT_ID, isPersistentSubscribe=True)

    def subscribe_callback(client, userdata, message):
        print(f"Received a new message: {message.payload.decode('utf-8')}")
        shadow_handler.shadowGet(srcCallback=shadowGet_callback, srcTimeout=10)

    client.subscribe(topic="hello/world", QoS=1, callback=subscribe_callback)

    while True:
        time.sleep(5)


if __name__ == "__main__":
    main()

対応策3. boto3を使用してshadowの取得をする

こちらは未確認ですが、DeviceShadowはboto3経由でも取得することが可能なので、boto3を使えばMQTTのデッドロックは回避することは可能かと思います。

boto3.amazonaws.com

boto3のトークンにはAWS Security Token Serviceを使えばGreengrassの証明書から作れそうですが、こちらも未確認です。

docs.aws.amazon.com

感想

今回の問題は、READMEに目を通しておいてAWSIoTPythonの仕様を把握しており、shadowGetに同期的なsubscribeが行われているだろうという推理力があればある程度原因の特定まではできたのかもしれないなとは感じました。普段は便利すぎて、あまり内部実装に関心をもてていなかったですが、もう少し自分の使っているSDKやライブラリに関心を持たないといけないなぁと反省しました。

また、例外のクラス名でGoogle検索や、Githubでopenなissueがないかは探していたのですが、closeされたissueにcallback中に同期的なAPIを呼び出すケースで困っている人がいたようです。例外が上がるのが仕様の場合には、issueはcloseされるものかと思うので、closeされたissueも確認した方が良いということ学びました。

github.com

おまけ - デッドロックが起きるフロー

おまけで、callback内で同期的なsubscribeの関数を呼ぶとデッドロックが起きるフローについて挙動をおってみます。

subscribe関数の実装

subscribe関数をまずみていきます。

    def subscribe(self, topic, qos, message_callback=None):
        self._logger.info("Performing sync subscribe...")
        ret = False
        if ClientStatus.STABLE != self._client_status.get_status():
            self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None))
        else:
            event = Event()
            rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback)
            if not event.wait(self._operation_timeout_sec):
                self._internal_async_client.remove_event_callback(mid)
                self._logger.error("Subscribe timed out")
                raise subscribeTimeoutException()
            ret = True
        return ret

github.com

eventという変数について注目してみます。

            event = Event()
            rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback)
            if not event.wait(self._operation_timeout_sec):

Eventはpythonの標準のthreading.Eventです。 self._create_blocking_ack_callback(event)event.wait(self._operation_timeout_sec) でeventを参照していることがわかります。

_create_blocking_ack_callbackは以下のような実装になっています。

    def _create_blocking_ack_callback(self, event):
        def ack_callback(mid, data=None):
            event.set()
        return ack_callback

github.com

event.set()を呼ぶだけの関数が渡されていることがわかります。

その次の event.wait(self._operation_timeout_sec) は、event.set()が呼ばれるまで待機するというコードです。

pythonのthreading.Eventについては、以下のブログがわかりやすくまとまっているのでおすすめです。

qiita.com

ここまでをまとめると、subscribe関数は、サブスクライブが完了した時に呼ばれるcallback関数でevent.set()を実行し、subscribe関数内ではevent.wait()でcallback関数が呼ばれるのを待機しています。subscribe関数が正常に完了した時には、サブスクライブされていることが保証されています。

MQTTのやりとりの実装について

さらに細かい話を進めていきます。 AWSIoTPythonでは、MQTTのクライアントにはPaho MQTT Python clientが使われています。

www.eclipse.org

このライブラリではMQTTで接続した時(on_connect)やサブスクライブが完了した時(on_subscribe)やメッセージを受け取った時(on_message)にcallback関数を設定することができます。

callback関数はEventProducerクラスに纏まっています。

class EventProducer(object):

    def __init__(self, cv, event_queue):
        self._cv = cv
        self._event_queue = event_queue

    # 略

    def on_subscribe(self, client, user_data, mid, granted_qos):
        self._add_to_queue(mid, EventTypes.SUBACK, granted_qos)
        self._logger.debug("Produced [suback] event")

    # 略

    def _add_to_queue(self, mid, event_type, data):
        with self._cv:
            self._event_queue.put((mid, event_type, data))
            self._cv.notify()

EventProducerクラスでは、MQTTで何かしらデータを受け取ったら_add_to_queue関数で _event_queue に追加をしているだけで、実際の処理を行うのは他のクラスになります。 _cv という変数には、pythonのthreading.Conditionのオブジェクトが渡されていて、with self._cvself._cv.notify() という使われ方をされています。今回デッドロックを引き起こしているのはまさにこの _cv 変数の部分なのですが、これを参照しているもう一か所のところを触れてから説明します。

省略した部分以外も確認したい場合はソースをみてください。 github.com

実際にpahoのmqttクライアントにEventProducerの関数が登録されているところは、MqttCoreクラスのコンストラクタの_init_worlersあたりの処理を追うとわかるかと思います。

github.com

MQTTを受け取った後の処理について

EventProducerが _event_queue に追加したeventを処理するのは EventConsumerクラスとなっています。

class EventConsumer(object):

    def __init__(self, cv, event_queue, internal_async_client,
                 subscription_manager, offline_requests_manager, client_status):
        self._cv = cv
        # 略

    def start(self):
        #略
        dispatch_events = Thread(target=self._dispatch)
        #略

    def _dispatch(self):
        while self._is_running:
            with self._cv:
                if self._event_queue.empty():
                    self._cv.wait(self.MAX_DISPATCH_INTERNAL_SEC)
                else:
                    while not self._event_queue.empty():
                        self._dispatch_one()
      # 略

EventConsumerで今回の問題に関係するところだけを抜き出しています。 コンストラクタで渡されている cv はEventProducerと同じオブジェクトが渡されていて、共有しています。

_event_queue 経由で渡されるイベントを処理しているのは、 dispatch関数となります。 dispatch関数では with self._cvself._cv.wait_cv を参照しています。

EventConsumerクラスのソースは以下です。

github.com

Pythonのthreading.Conditionについて

threading.Condition は、条件付きのロックでwith文で囲まれたブロック内でロックを獲得することができます。threading.Conditionの面白い(?)ところとしては、wait関数を呼ぶことでConditionオブジェクトのロックを開放して、別のスレッドにConditionオブジェクトでロックをかけて処理をさせることができるようにできます。wait関数はConditionオブジェクトのnotify関数を呼ぶことで再度処理を進めることができます。

docs.python.org

今回のEventConsumer._dispatchではeventキューが空の場合には self._cv.wait(self.MAX_DISPATCH_INTERNAL_SEC) でロックを開放しておきますが、eventを処理している最中にはロックをかけていることがわかります。

    def _dispatch(self):
        while self._is_running:
            with self._cv:
                if self._event_queue.empty():
                    self._cv.wait(self.MAX_DISPATCH_INTERNAL_SEC)
                else:
                    while not self._event_queue.empty():
                        self._dispatch_one()

EventProducerクラスのeventキューに追加しているところをもう一度みてみると、ロックがかかっている間は、MQTTからデータを受け取った後にeventキューに追加することはできません。

    def _add_to_queue(self, mid, event_type, data):
        with self._cv:
            self._event_queue.put((mid, event_type, data))
            self._cv.notify()

したがって、 _dispatch の処理中に新しくMQTTで受け取ったデータは、 _dispatch が完了するまで取得することができません。

同期的なsubscribeでは、MQTTでサブスクライブが完了したデータ(on_subscribe)を受け取って、dispatch内でcallbackの実行(event.set())を呼ぶことを期待しています。ただし、subscribeのcallback内(dispatch関数で _cv にロックをかけた状態)で同期的なsubscribeの関数を呼ぶと、EventProducerがon_subscribeでeventキューに新しく追加しようとしても _cv のロックがかかっているためロックの開放をまってしまいます。subscribeのcallbackも、event.set()が呼ばれるまで _cv を解放できないという状態になってしまうため、デッドロックが生じてしまっているということがわかりました。

おまけの感想

非同期的な処理がはいると、処理を追うのがかなり大変でした。実際、今回例外が上がる箇所から、どこでデッドロックが起きているかの特定は、ブログを書き始めた後の最後の方でやっと特定できた感じでした。 ただ、SDK内でもログレベルをDEBUGにするといろいろログをはいてくれるのでかなり助かりました。

pythonのthreading周りのドキュメントは何度か読んだことはあるはずなのですが、threading.Conditionはすっかり忘れていたので、具体的なユースケースを目にすることができたのはとてもよかったなぁと感じました。