JOURNALについて

データアナリティクスラボ株式会社では、ITやデータサイエンスに関する技術の研究活動を行っています。このブログでは、研究活動で得られた知見や検証結果についての情報を発信します。

本ブログで提供される情報は、可能な限り正確かつ最新の情報であるように努めますが、必ずしもその正確性を保証することはできません。場合によっては誤情報が含まれたり、最新の情報ではない可能性もあります。予めご了承いただけますようお願い申し上げます。

journal

空飛ぶコード – Pythonでドローンを自在に操る!

はじめに

データソリューション事業部の大山です。

前回の記事では、ドローンとデータサイエンスの意外な関係についての紹介とドローンの活用事例についてまとめました。その中で「IoT」や「ストリーム処理」がドローンとも密接に関わっていることをお伝えしました。

そして、今回の記事では、PCとドローン間の通信接続の仕組みと、それを実現するための手順について深く掘り下げていきます。PCからドローンに接続するプロセスは、単に二つのデバイスを繋げる以上の意味を持っています。これは、インターネットを通じて「モノ」(今回はドローン)と通信する技術の実践的な応用例です。本記事ではまず、この通信の基本的な仕組みを解説し、その後、具体的な接続プロセスを紹介します。また、実際にプログラミング(Python)でドローンの操作やセンサーデータを取得していきます。

使用したドローン:DJI Tello EDU
本ドローンは、プログラミングとドローン操作を学ぶために特化したモデルで、多言語サポート、先進的な飛行技術、耐久性、安全設計を備えています。また、高品質カメラとセンサーからのデータ取得が可能で、実践的な技術教育に最適です。

PCからドローンへの通信接続

はじめに、PCとドローンが繋がる仕組みを知っていますか??それはつまり、「モノ」(ここではドローン)がインターネットを介してPCと繋がる仕組みのことです。そこで今回は、一旦その仕組みについて説明した後、PCを使ってドローンに通信接続・操作する方法を紹介します。

「モノ」とインターネットがつながる仕組み

「モノ」がインターネットを介してデータを交換するためには、以下のような要素が必要です。聞いたことがある方も多いのではないでしょうか?いわゆるIoT(あらゆるモノをインターネットに接続する)技術のことです!

図1. IoTに必要な要素
表1. 各要素の役割と例

「ドローン」と「IoT」の関係

ここまでの説明からドローンとIoTはとても密接な関係があることが分かります。文脈によってはドローンはIoTデバイスの一部と言えますね。
例えば、多くのドローンにはカメラやGPS、気圧センサーなどのセンサーが搭載されており、これらのセンサーデータはインターネットを介して他のネットワークにリアルタイムで接続が可能です。

ドローンとIoTデバイスの違いについても触れておきます。

  • 機能が複雑:たくさんのセンサーを制御しながらカメラを動かしたり、GPSで位置を追跡したりと、たくさんの機能を有するという点で一般的なIoTデバイスとは異なる。
  • 多くのバッテリーが必要:機能が複雑な分、多くのバッテリーが必要。
  • 動的な環境:ドローンは外部の環境(風、雨、障害物など)に影響されやすく、これに適応するための技術やソフトウェアが必要。

いよいよプログラミングでドローン操作!

ここまで「モノ」(ドローン)がインターネットを介してPCと接続する仕組みについて見てきました。それでは実際に操作してみましょう。

アプリでの操作

以下は、使用機器Tello EDUの専用アプリをつかった動画です。アプリですと直感的で操作が分かりやすいです。ただ自在に操作できるまでには少し時間がかかりそうですね。

ここでは、アプリで操作可能な飛行を全種類紹介しています。

プログラミング(Python)での操作

それではいよいよ、アプリで操作した動きをPythonで再現していきたいと思います!

実装には、Tello社が出しているユーザーガイドを参考に実装しました。

出典元: https://www.ryzerobotics.com/jp/tello-edu/downloads

SDK(Software Development Kit)というのは、「ソフトウェア開発キット」のことで、ソフトウェアを開発する際に必要なプログラムやAPI・文書・サンプルなどをまとめてパッケージ化したもののことです。Telloなど自社製品を扱う会社がサードパーティー製を含む他のアプリケーションを簡単に連携できるよう用意してくれていることが多いです。

SDK User GuideのIntroductionArchitectureには以下のようなことが書かれています。

  • Wi-Fi UDPポートを介してドローンに接続する
  • PythonでUPD通信ポートを確立してドローンに接続する
  • Tello IPアドレス:192.168.10.1
  • UDPポート:8889
  • 接続Step1: PCでUDPクライアントを設定し、同じポートを経由してTelloとメッセージを送受信する
  • 接続Step2:他のコマンドを送信する前に、UDPポート8889経由でTelloに「command」を送信する

つまり絵にすると以下のようになります。

図2. PCとTelloの通信イメージ

ちなみに、Tello EDUはコネクションレス型のUDP通信なので下図のようなコネクションレス型のソケット通信になります。TCP通信の場合はコネクション型なので、その下の図のようにコネクションの確立が必要です。(ソケットとは各ネットワーク機器が外部とやりとりする際の窓口のようなものです)

図3. UDP通信とTCP通信


長々と説明してきましたが、Pythonでドローンに接続してみます。
まず初めにドローンに接続するだけのシンプルなコードです。

import logging # ログの表示
import socket # socket通信
import sys
import time

# ログの設定
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)


class TelloOperator():
    def __init__(self, host_ip='192.168.10.2', host_port=8889,
                 tello_ip='192.168.10.1', tello_port=8889):
        self.host_ip = host_ip # host側(PC)のIPアドレス
        self.host_port = host_port # host側(PC)のポート番号
        self.tello_ip = tello_ip # tello側のIPアドレス
        self.tello_port = tello_port # tello側のポート番号
        self.tello_address = (tello_ip, tello_port)
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # UDOのソケット通信を指定
        self.socket.bind((self.host_ip, self.host_port)) # バインドを実施
        
    def __del__(self):
        self.stop() # クラスのインスタンスが削除される際に呼ばれ、stopメソッドを呼び出してソケットを閉じる。

    def stop(self):
        self.socket.close() # ソケットを閉じる

    '''
		ここにドローンを動かすコマンドを定義していきます。
		'''


if __name__ == '__main__':
    tello_operator = TelloOperator() # TelloOperatorインスタンスの作成
    '''
		上で定義したコマンドを書いて実際に動作させます。
		'''

ドローンと通信するには、ホストを同じWi-Fiネットワークに接続する必要があります。そのために、ホスト側のIPアドレスとポートの設定が必要なります。

  • IPアドレス: ホストのIPアドレスは、ドローンと同じサブネット上のIPアドレスに設定されます。たとえば、ドローンのIPアドレスが192.168.10.1の場合、ホストのIPアドレスは192.168.10.xxは2から255の任意の値)に設定が可能です。
  • ポート: ポートは、利用可能な任意のポートに設定することができます。今回は、8889が使用されていますが、空いていれば他の利用可能なポートを選択することも可能です。

次に離陸や着陸といった動作をコードに加えていきます。
User Guideの3ページ~6ページ上段にTelloへ命令または受け取るためのコマンドの記載があります。そこには、大きく以下の3つのコマンド群が用意されていることがわかります。

表2. Telloコマンド

Control Commands

初めに基本となるコマンドを例に関数を定義してみます

表3. Control Commandsの一例

「コマンドモード開始」と「離陸」、「着陸」をコードに追加したのが以下になります。

コードが実行されると始めに__init__が呼び出されるので、その中でcommandコマンドを送り、SDKモードにします。そして、いろんなコマンドを送れるようにsend_commandを関数として定義し、続けてtakeoffとlandコマンドを定義しています。

class TelloOperator():	
		def __init__(self, host_ip='192.168.10.2', host_port=8889,
                 tello_ip='192.168.10.1', tello_port=8889):
				# ... 既存のメソッドは省略 ...
        self.socket.sendto(b'command', self.tello_address) # ドローンにコマンドモードを開始するよう指示

		def send_command(self, command):
		    self.socket.sendto(command.encode('utf-8'), self.tello_address) # 指定されたコマンドをドローンに送信

		def takeoff(self):
		    self.send_command('takeoff') # 離陸のコマンドを定義

		def land(self):
		    self.send_command('land')  # 着陸のコマンドを定義

それでは他のコマンドを一つピックアップしてそれを関数化してみます。今回は指定したポイントに移動する「go」コマンドです。

表4. Control Commandsの別例

注意文がありますが、おそらくドローンが非常に小さな移動を行うことを避けるためのもので、安定性や正確性を確保するための措置と思われます。
注意文を考慮してコードにすると以下のようになります。

# ... 既存のライブラリとロギングの設定は省略 ...

class TelloOperator():
    def __init__(self, host_ip='192.168.10.2', host_port=8889,
                 tello_ip='192.168.10.1', tello_port=8889):
        self.host_ip = host_ip # host側(PC)のIPアドレス
        self.host_port = host_port # host側(PC)のポート番号
        self.tello_ip = tello_ip # tello側のIPアドレス
        self.tello_port = tello_port # tello側のポート番号
        self.tello_address = (tello_ip, tello_port)
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # UDOのソケット通信を指定
        self.socket.bind((self.host_ip, self.host_port)) # バインドを実施
        self.socket.sendto(b'command', self.tello_address) # ドローンにコマンドモードを開始するよう指示

    def __del__(self):
        self.stop() # クラスのインスタンスが削除される際に呼ばれ、stopメソッドを呼び出してソケットを閉じる。

    def stop(self):
        self.socket.close() # ソケットを閉じる

    def send_command(self, command):
        logger.info({'実行操作': 'コマンド送信', 'コマンド': command}) # ログに記録したい情報
        self.socket.sendto(command.encode('utf-8'), self.tello_address) # 指定されたコマンドをドローンに送信

    def takeoff(self):
        self.send_command('takeoff') # 離陸のコマンドを定義

    def land(self):
        self.send_command('land')  # 着陸のコマンドを定義

		def go(self, x, y, z, speed):
				if -20 <= x <= 20 and -20 <= y <= 20 and -20 <= z <= 20:
		        logger.error("x, y, zの値は同時に-20から20の間に設定することはできません。")
		        return  # コマンドを送信せずにメソッドを終了
		    # x, y, zの座標とspeedの速度で飛行するコマンドを構築
		    command = f'go {x} {y} {z} {speed}'
		    self.send_command(command)  # 構築したコマンドを送信


if __name__ == '__main__':
    tello_operator = TelloOperator() # ドローンのインスタンスを作成
    tello_operator.takeoff() # 離陸!

    time.sleep(10)
    # x=100, y=100, z=100, speed=50でgoコマンドを実行
    tello_operator.go(100, 100, 100, 50)

		tello_operator.land() # 着陸!

Set Commands

次にドローンの特定の設定を変更するためのセットコマンドですが、ドローンの移動速度を上げるspeedコマンドに加えてリモートコントローラーの設定もしてみたいと思います。これらは通常のアプリで動かす際には自由に変更できない設定なので、プログラミングならでは特徴ですね!

出典:Tello User Manual(Telloの飛行速度はデフォルトでは2種類のみ)

表5. Set Commandsの一例


(補足)rcコマンドは、イメージしづらいかと思うので、具体例をあげておきます。

  • rc 50 0 0 0: 最大速度の50%で右に移動する。前後や上下の移動はせず、回転もしない
  • rc 0 -30 0 0: 最大速度の20%で上昇する。左右の移動や前後の移動、回転はしない
  • rc 0 0 0 -60: 最大速度の60%で反時計回りに回転する。左右の移動や前後の移動、上下の移動はしない
  • rc -100 100 50 25: 最大速度で左に移動し、同時に最大速度で前に移動し、最大速度の50%で上昇し、最大速度の25%で時計回りに回転する。

それでは、「最初にtelloの移動速度を20に設定し、その後ドローンは最大速度で左と前方に移動し、50%速度で上昇し、最大速度の25%で時計回りに回転する。(コマンドがアクティブである秒数は3秒)」を命令するコードを書いてみます。

# ... 既存のライブラリとロギングの設定は省略 ...

class TelloOperator():
    # ... 既存のメソッドは省略 ...

    def set_speed(self, speed):
        # 移動速度を設定するコマンド
        if 10 <= speed <= 100:
            self.send_command(f'speed {speed}')
        else:
            logger.error("速度は10から100の間で設定してください。")

    def set_rc_control(self, a, b, c, d):
        # リモートコントロールの設定コマンド
        if all(-100 <= param <= 100 for param in [a, b, c, d]):
            self.send_command(f'rc {a} {b} {c} {d}')
        else:
            logger.error("各パラメータは-100から100の間で設定してください。")

tello_operator = TelloOperator() # ドローンのインスタンスを作成
tello_operator.takeoff() # 離陸
time.sleep(5)  # 離陸完了まで5秒間待機

# 移動速度を10に設定
tello_operator.set_speed(10)

# 3秒間、指定された方向に移動
tello_operator.set_rc_control(-100, 100, 50, 25)
time.sleep(3)  # 3秒間待機
tello_operator.set_rc_control(0, 0, 0, 0)  # 動きを停止

tello_operator.land() # 着陸

Read Commands

最後に、ドローンの現在の状態や設定値を確認するためのコマンドを実装していきます。

表6. Read Commandsの一例

早速実装していこうと思いますが、ここで考慮したいことがあります。
それは「プロセス」「スレッド」「非同期通信」の3つです。


以下で、それぞれについて簡単な説明をします。

  • プロセス: コンピュータ上で実行されている個々のプログラムです。例えば、ウェブブラウザやテキストエディタがそれぞれ独立したプロセスです。それぞれのプロセスは独自のメモリ領域を持ち、他のプロセスとは独立して動作します。
  • スレッド: プロセス内で実行されるより小さな単位です。一つのプロセスは複数のスレッドを持つことができ、これらのスレッドはプロセスのメモリやリソースを共有します。スレッドを使うと、一つのプログラムが同時に複数の作業を行うことができます。
  • 非同期通信: 非同期処理は、プログラムが一つのタスクを完了させるのを待たずに、別のタスクに進むことを可能にします。これにより、特に長時間かかる作業(例えばファイルのダウンロードやデータベースへのクエリ)を行う際に、プログラムが他の作業を続けることができます。


通常ドローンはコマンドを受け取り、動作するというのを一つのスレッド上で実行されます。今回、ドローンの状態やセンサーデータを取得するにあたって、これらのコマンドを実行している間、ドローンを動かせないということになります。ドローンを操作しながら、ドローンの状態やセンサーデータを取得するのは別で実行できたらいいですよね。

そこで、メインスレッドとは別に、バックグラウンドで独立したタスクを実行するために追加のスレッドを作成します。そうすることで、メインとなるスレッドのブロッキング(停止)を避け、プログラムが応答性を保ちながら複数の操作を同時に行うことが可能になります。Pythonではthreadというライブラリが用意されていますのでそれを活用しましょう。

以上を踏まえてバッテリー残量と飛行時間を確認するコードは以下のようになります。

import threading
# ... 既存のライブラリとロギングの設定は省略 ...

class TelloOperator():
    def __init__(self, host_ip='192.168.10.2', host_port=8889,
                 tello_ip='192.168.10.1', tello_port=8889):
        # ... 既存の初期化処理は省略 ...

        # データ受信用にスレッドを追加
        self.response = None

        # データ受信用にスレッドを追加
        self.monitoring_event = threading.Event()
        self.monitoring_thread = threading.Thread(target=self.monitoring_task, args=(5,1))
        self.monitoring_thread.start()

    # ... 既存のメソッドは省略 ...

    def get_response(self):
        """
        最後に受け取った応答を返します。
        """
        return self.response

    def monitoring_task(self, initial_interval, regular_interval):
        """
        初回のループでは長い待機時間を設け、その後は通常の間隔でバッテリー残量と飛行時間を取得するタスク。
        """
        first_loop = True
        """
        定期的にバッテリー残量と飛行時間を取得するタスク。
        """
        while not self.monitoring_event.is_set():
            try:
                if first_loop:
                    time.sleep(initial_interval)
                    first_loop = False
                else:
                    time.sleep(regular_interval)
                self.send_command('battery?')
                # time.sleep(0.5)  # 応答待ちのための遅延
                # ソケットを介してドローンからの応答(1024バイトのデータ)を受け取る、送信元アドレスは重要でないため"_"とする。
                battery_response, _ = self.socket.recvfrom(1024)
                # 受け取ったデータはバイナリなのでUTF-8でデコードして文字列に変換
                battery_level = battery_response.decode('utf-8')
                logger.info(f"バッテリー残量: {battery_level}%")

                self.send_command('time?')
                # time.sleep(0.5)  # 応答待ちのための遅延
                time_response, _ = self.socket.recvfrom(1024)
                flight_time = time_response.decode('utf-8')
                logger.info(f"飛行時間: {flight_time}")

            except socket.error as err:
                logger.error(f"ソケットエラー: {err}")

    def stop_monitoring(self):
        """
        モニタリングスレッドを停止します。
        """
        self.monitoring_event.set()
        self.monitoring_thread.join()

if __name__ == '__main__':
    # ドローンのインスタンスを作成
    tello_operator = TelloOperator()
    tello_operator.takeoff() # 離陸
    
    time.sleep(20) # 20秒間の飛行時間

    # 応答受信スレッドを停止
    tello_operator.land() # 着陸
    tello_operator.stop_monitoring()

画像認識機能の実装

今回は既存のドローンの操作機能をプログラミングで再現するだけでなく、画像認識も取り入れています!その画像認識には画像処理で有名なオープンソースのライブラリ「OpenCV」を使用しました。(OpenCVには「画像認識」だけでなく、「文字認識」や「画像のパターンマッチング」といったカスケード分類器が用意されているので簡単に実装することができます。)

さらに今回は画像認識だけでなく、応用として人の顔をドローンが認識して追跡するようなロジックを組み込んでいます。コードを実行すると、フレームの中心に顔が来るようにドローンが追跡してくれます。

ドローンが人物を追跡するコードと環境構築の方法は、以下になります。

▼Mac環境の方

  1. ターミナルで「pip install opencv-python」と打って、OpenCVをインストールします。その際、FFmpeg*も一緒にインストールされます。
  2. ターミナルでpythonを起動し、「import cv2」を実行してみてエラーが出なければインストール成功です。
  3. 以下より、”haarcascade_frontalface_default.xml”(顔認識カスケード分類器)をダウンロードして、ドローンのプログラミングコードがあるディレクトリにcascadeディレクトリを作成し、そこに格納します。
    https://github.com/opencv/opencv/tree/4.x/data/haarcascades

▼Windows環境の方

  1. 以下の公式サイトからOpenCVのインストーラーをダウンロードし、インストールしていきます。(保存先はCドライブ直下がおすすめです。)
    [OpenCV公式サイト] https://opencv.org/
  2. ユーザー環境変数を設定します。
  3. コマンドスクリプト等でPythonのOpenCVをインストールします。
  4. FFmpeg*を以下のサイトからパッケージをダウンロードして、OpenCV同様に環境変数を設定します。
    [FFmpeg公式サイト] https://ffmpeg.org/
  5. 以下より、”haarcascade_frontalface_default.xml”(顔認識カスケード分類器)をダウンロードして、ドローンのプログラミングコードがあるディレクトリにcascadeディレクトリを作成し、そこに格納します。
    https://github.com/opencv/opencv/tree/4.x/data/haarcascades

import subprocess
import cv2
import numpy as np
# ... 既存のライブラリとロギングの設定は省略 ...

# 定数の設定
TELLO_SPEED = 15
TELLO_ROTATION_DEGREE = 20

# ビデオフレームサイズの設定
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
FRAME_AREA = FRAME_WIDTH * FRAME_HEIGHT # フレーム面積

FRAME_DATA_SIZE = FRAME_AREA * 3 # 各ピクセルには3つのカラーチャネル(赤、緑、青)が含まれる
FRAME_CENTER_X, FRAME_CENTER_Y= FRAME_WIDTH / 2, FRAME_HEIGHT / 2

# FFmpegコマンドの設定
'''
-hwaccel auto: 自動でハードウェアアクセラレーションを使用するよう設定。処理速度を向上させる。
-hwaccel_device opencl: ハードウェアアクセラレーションのためにOpenCLデバイスを使用。
-i pipe:0: 標準入力(stdin)からビデオデータを読み込むよう設定。
-pix_fmt bgr24: OpenCVで使用できるようピクセルフォーマットをBGR24(青、緑、赤の各8ビット)に設定。
-s {FRAME_WIDTH}x{FRAME_HEIGHT}: ビデオの解像度を設定。
-f rawvideo pipe:1: 出力フォーマットをrawビデオとし、標準出力(stdout)へのパイプを使用するよう設定。
'''
FFMPEG_CMD = (f'ffmpeg -hwaccel auto -hwaccel_device opencl -i pipe:0 '
              f'-pix_fmt bgr24 -s {FRAME_WIDTH}x{FRAME_HEIGHT} -f rawvideo pipe:1')

# 顔認識用XMLファイルのパス設定
current_dir = os.path.dirname(os.path.abspath(__file__))
CASCADE_XML_FILE = os.path.join(current_dir, 'cascade/haarcascade_frontalface_default.xml')

class TelloOperator():
    def __init__(self, host_ip='192.168.10.2', host_port=8889,
                 tello_ip='192.168.10.1', tello_port=8889,
                 tello_video_port=11111,
                 is_imperial=False, speed=TELLO_SPEED):
        self.host_ip = host_ip
        self.host_port = host_port
        self.tello_ip = tello_ip
        self.tello_port = tello_port
        self.tello_address = (tello_ip, tello_port)
        self.speed = speed
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind((self.host_ip, self.host_port))

        # Telloからのステータス受信の設定
        self.response = None
        self.stop_event = threading.Event() # デフォルトはFalse
        self._response_thread = threading.Thread(target=self.receive_response,
                                           args=(self.stop_event, ))
        self._response_thread.start()

        # FFmpegプロセスの設定
        self.proc = subprocess.Popen(FFMPEG_CMD.split(' '), 
                                     stdin=subprocess.PIPE, 
                                     stdout=subprocess.PIPE)
        self.proc_stdin = self.proc.stdin
        self.proc_stdout = self.proc.stdout

        # 受信映像の設定
        self.tello_video_port = tello_video_port

        self._receive_video_thread = threading.Thread(
            target=self.receive_video,
            args=(self.stop_event, self.proc_stdin,
                  self.host_ip, self.tello_video_port,))
        self._receive_video_thread.start()

        # 顔認識Cascadeの設定
        self.face_cascade = cv2.CascadeClassifier(CASCADE_XML_FILE)

        # その他の設定
        self._command_thread = None

        self.send_command('command') # コマンドモード開始
        self.send_command('streamon') # ビデオストリーム有効
        self.set_speed(self.speed)

    def receive_response(self, stop_event):
        while not stop_event.is_set():
            try:
                self.response, _ = self.socket.recvfrom(3000)
                logger.info({'実行操作': 'データ受信', 'レスポンス': self.response})
            except socket.error as exception:
                logger.error({'実行操作': 'データ受信','異常発生': exception})
                break

    def __del__(self):
        self.stop()

    def stop(self):
        self.stop_event.set()
        self.socket.close()
        os.kill(self.proc.pid, 9)
    
    # コマンド送信メソッド
    def send_command(self, command):
        logger.info({'実行操作': 'コマンド送信', 'command': command})
        self.socket.sendto(command.encode('utf-8'), self.tello_address)

    def takeoff(self):
        self._face_tracking = True
        return self.send_command('takeoff')

    def land(self):
        self._face_tracking = False
        return self.send_command('land')

    def set_speed(self, speed):
        return self.send_command(f'speed {speed}')
    
    # FFmpegプロセスの停止
    def stop_ffmpeg(self):
        self.proc.terminate()

    def receive_video(self, stop_event, pipe_in, host_ip, tello_video_port):
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock_video:
            sock_video.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock_video.settimeout(.5)
            sock_video.bind((host_ip, tello_video_port))
            data = bytearray(2048)
            while not stop_event.is_set():
                try:
                    size, _ = sock_video.recvfrom_into(data)
                except socket.timeout as exception:
                    logger.warning({'実行操作': '映像受信', '異常発生': exception })
                    time.sleep(0.5)
                    continue
                except socket.error as exception:
                    logger.error({'実行操作': '映像受信', '異常発生': exception})
                    break

                try:
                    pipe_in.write(data[:size])
                    pipe_in.flush()
                except Exception as exception:
                    logger.error({'実行操作': '映像受信', '異常発生': exception})
                    break

    def video_binary_gen(self):
        while True:
            try:
                frame = self.proc_stdout.read(FRAME_DATA_SIZE)
            except Exception as exception:
                logger.error({'実行操作': '映像バイナリ生成', '異常発生': exception})
                continue

            if not frame:
                continue
            frame = np.frombuffer(frame, np.uint8).reshape(FRAME_HEIGHT, FRAME_WIDTH, 3)
            yield frame

    def video_jpg_gen(self): # 出力映像画面の中央に人の顔が来るようにドローンが追跡するロジック
        for frame in self.video_binary_gen():
            if self._face_tracking:
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
                for (x, y, w, h) in faces:
                    cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)

                    face_center_x = x + (w/2)
                    face_center_y = y + (h/2)
                    diff_x = FRAME_CENTER_X - face_center_x
                    diff_y = FRAME_CENTER_Y - face_center_y
                    face_area = w * h
                    percent_face = face_area / FRAME_AREA

                    drone_x, drone_y, drone_z, speed = 0, 0, 0, self.speed
                    if diff_x < -30:
                        drone_y = -30
                    if diff_x > 30:
                        drone_y = 30
                    if diff_y < -15:
                        drone_z = -30
                    if diff_y > 15:
                        drone_z = 30
                    if percent_face > 0.30:
                        drone_x = -30
                    if percent_face < 0.02:
                        drone_x = 30
                    self.send_command(f'go {drone_x} {drone_y} {drone_z} {speed}')
                    break

            _, jpeg = cv2.imencode('.jpg', frame)
            jpeg_binary = jpeg.tobytes()
            yield jpeg_binary

# メイン関数
if __name__ == '__main__':
    tello_operator = TelloOperator()
    tello_operator.takeoff()

    try:
        for frame in tello_operator.video_jpg_gen():
            image = cv2.imdecode(np.frombuffer(frame, np.uint8), cv2.IMREAD_COLOR)
            cv2.imshow('Tello Cam', image)

            if cv2.waitKey(1) & 0xFF == ord('q'): # キーボードのqキーを押下で終了
                break
    finally:
        tello_operator.land()
        cv2.destroyAllWindows()
        tello_operator.stop()

まとめ

今回の記事では、ドローンをプログラミングで実際に操作する方法について紹介しました。今回使用したTello EDUは教育用のDroneキットということで、すでにコマンドなどは用意されていますが、うまく活用することで、専用アプリではできない動きを実装したりセンサーデータを取得したりすることがご理解いただけたと思います。

私の研究では、紹介したようなモノが持つ情報をリアルタイムで受け取り処理・分析する部分に注目し、市場の急成長が見込まれているドローンを掛け合わせ、次世代のデータサイエンティストが活躍できる市場開拓とともに最新動向の調査と実践をテーマとしています。

現在は、今回紹介したセンサーデータをクラウドに保存・グラフ化し、それを基に自動制御できるような古典的なアルゴリズムを作成中です。今後はこれらを駆使し、強化学習などを駆使し、応用していく予定です。

最後に学生の方々へ紹介になりますが、今回作成したコードやドローンはDALインターンでも紹介しています。興味がある方は是非コンタクトを取ってみてください!


オウンドメディアも運営しています