PR

某テーマパークの魔法の杖で家電を操作してみた

[自動生成]無理やり短縮リンク: 短縮URLを生成中...
スポンサーリンク

背景

皆さんこんにちは。
皆さんは某魔法学校のアトラクションがあるテーマパークに行ったことがあるでしょうか。
そこで、魔法の杖が売られていますよね。
私はそんなに興味ないし普通に高いので買う気はなかったのですが、弟がそういうのにとことん弱いので買ったんですよ。

ただ、案の定帰ってきて3日で置き場所に困っていまして。
それから適当な場所に放置されていたのでなにかに使えないかなーって思っていたところ、特許サーフィンしてたらこんな記事を見つけまして。
それで作ってみたという感じです。

使ったもの

tapo c200
カメラ。本当は赤外線カメラとかのほうがいいけど家にあったものがこれぐらいしかなかった。
RTSP対応のものが楽かも。

マイコン
ESP32を紛失したためarduinoで今回は作成。魔法っぽいからESP32推奨

サーボモーター
スイッチボット的な感じのものが欲しかったため。やりたい動作のものを使用可能

仕組み

杖に赤外線を反射するやつが付いてるので、それを検知して追跡します。

作ってみよう

https://item.rakuten.co.jp/tplinkdirect/6935364053239-new/

うちのtapoが対応してたので流用。
ただし、見ての通り部屋が暗いときにしか使えない。

ちなみにこんな感じ↓

明らかに先っぽが光ってると思います。
それを頼りに検出していきます。
実際に追跡した結果↓

カメラを起動して、以下のスクリプトを実行。
URLやシリアル通信のためのポートは各自設定してください。
今回は、arduinoのためシリアル通信でonと送ってarduino側で処理できるようにした。

import cv2
import numpy as np
import serial
import time

RTSP_URL = "rtsp://ユーザー名:パスワード@アドレス/stream1" #私のtapoでの例
SERIAL_PORT = "COM4"
BAUD_RATE = 9600
ENABLE_SERIAL = True  # 本番動作

# 感度調整
BRIGHTNESS_THRESHOLD = 250 # 255に近いほど真っ白なものだけ検知
MATCH_THRESHOLD = 0.3      # ジェスチャ判定の厳しさ(低いほど厳密)

class MagicWandSystem:
    def __init__(self):
        self.path = []
        self.is_drawing = False
        self.last_point_time = 0
        self.template_path = [] 
        self.ser = None
        self.bg_mask = None # 背景マスク(静止している光を除外用)

        # デフォルトの呪文(起動後に 'r' キーで再登録可能)
        self.template_path = self.normalize_path([
            (0, 50), (20, 80), (50, 50), (80, 20), (80, 0)
        ])

        if ENABLE_SERIAL:
            try:
                self.ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
                print(f"Serial connected: {SERIAL_PORT}")
            except Exception as e:
                print(f"Serial Error: {e}")

    def send_command(self, command):
        print(f"✨ CASTING SPELL: {command} ✨")
        if self.ser:
            try:
                self.ser.write(f"{command}\n".encode('utf-8'))
            except Exception as e:
                print(f"Serial Send Error: {e}")

    def normalize_path(self, points, target_count=30):
        if len(points) < 2: return []
        
        # 1. パス長計算
        path_length = sum(np.linalg.norm(np.array(points[i]) - np.array(points[i-1])) for i in range(1, len(points)))
        if path_length == 0: return []

        # 2. リサンプリング
        new_points = [points[0]]
        interval = path_length / (target_count - 1)
        D = 0.0
        src_idx = 1
        while src_idx < len(points):
            p1, p2 = np.array(points[src_idx-1]), np.array(points[src_idx])
            dist = np.linalg.norm(p2 - p1)
            if D + dist >= interval:
                t = (interval - D) / dist
                new_p = p1 + t * (p2 - p1)
                new_points.append(tuple(new_p))
                points.insert(src_idx, tuple(new_p))
                D = 0.0
            else:
                D += dist
            src_idx += 1
        while len(new_points) < target_count: new_points.append(points[-1])

        # 3. 0-1正規化
        np_points = np.array(new_points)
        min_xy = np.min(np_points, axis=0)
        max_xy = np.max(np_points, axis=0)
        scale = max(max_xy[0] - min_xy[0], max_xy[1] - min_xy[1])
        if scale == 0: scale = 1
        return ((np_points - min_xy) / scale).tolist()

    def compare_paths(self, input_path, template_path):
        if not template_path or not input_path: return float('inf')
        norm_input = self.normalize_path(input_path)
        if not norm_input: return float('inf')
        dist_sum = sum(np.linalg.norm(np.array(p1) - np.array(p2)) for p1, p2 in zip(norm_input, template_path))
        return dist_sum / len(norm_input)

    def run(self):
        cap = cv2.VideoCapture(RTSP_URL)
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

        print("Connecting to camera...")
        print("Initial Calibration: DO NOT MOVE THE WAND yet.")
        
        recording_mode = False
        calibration_frames = 0
        calibration_limit = 30 # 最初の30フレームで背景を覚える
        
        # 背景(静止光)の蓄積用
        accumulated_bg = None 

        while True:
            ret, frame = cap.read()
            if not ret:
                print("Failed to grab frame (Retrying...)")
                time.sleep(1)
                cap = cv2.VideoCapture(RTSP_URL) # 再接続トライ
                continue

            # リサイズ処理(座標エラー修正済み)
            raw_h, raw_w = frame.shape[:2]
            scale_ratio = 0.5
            small_frame = cv2.resize(frame, (int(raw_w * scale_ratio), int(raw_h * scale_ratio)))
            h, w = small_frame.shape[:2]

            gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)

            # === 背景学習フェーズ(起動直後) ===
            if calibration_frames < calibration_limit:
                # 明るい場所だけ抽出
                _, bright_spots = cv2.threshold(gray, BRIGHTNESS_THRESHOLD, 255, cv2.THRESH_BINARY)
                
                if accumulated_bg is None:
                    accumulated_bg = bright_spots.astype(float)
                else:
                    cv2.accumulateWeighted(bright_spots, accumulated_bg, 0.1)
                
                calibration_frames += 1
                cv2.putText(small_frame, f"Calibrating... {int((calibration_frames/calibration_limit)*100)}%", 
                           (10, h//2), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
                cv2.imshow("Wand View", small_frame)
                if cv2.waitKey(1) & 0xFF == ord('q'): break
                continue
            
            # 学習完了時に一度だけマスク生成
            elif self.bg_mask is None:
                # 蓄積された「いつもの光」をマスク化
                bg_result = cv2.convertScaleAbs(accumulated_bg)
                # 少し膨張させて、光のゆらぎもカバーする
                kernel = np.ones((5,5), np.uint8)
                self.bg_mask = cv2.dilate(bg_result, kernel, iterations=2)
                # マスクを反転(光ってるところ=0、暗いところ=255)
                self.bg_mask = cv2.bitwise_not(self.bg_mask)
                print("Background calibration done.")

            # === メイン処理 ===
            
            # 1. 現在のフレームから高輝度部を抽出
            _, curr_thresh = cv2.threshold(gray, BRIGHTNESS_THRESHOLD, 255, cv2.THRESH_BINARY)
            
            # 2. 背景マスクを適用(最初から光っていた場所を黒く塗りつぶす)
            masked_thresh = cv2.bitwise_and(curr_thresh, curr_thresh, mask=self.bg_mask)

            # 3. ノイズ除去
            masked_thresh = cv2.erode(masked_thresh, None, iterations=1)
            masked_thresh = cv2.dilate(masked_thresh, None, iterations=2)

            # 4. 輪郭抽出
            contours, _ = cv2.findContours(masked_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            
            wand_point = None
            if contours:
                c = max(contours, key=cv2.contourArea)
                if cv2.contourArea(c) > 5:
                    ((x, y), radius) = cv2.minEnclosingCircle(c)
                    wand_point = (int(x), int(y))
                    cv2.circle(small_frame, wand_point, int(radius), (0, 255, 255), 2)

            # 軌跡追跡ロジック
            curr_time = time.time()
            if wand_point:
                self.path.append(wand_point)
                self.last_point_time = curr_time
                self.is_drawing = True
            else:
                if self.is_drawing and (curr_time - self.last_point_time > 0.5):
                    self.is_drawing = False
                    if len(self.path) > 10:
                        if recording_mode:
                            self.template_path = self.normalize_path(self.path)
                            print(">>> New Template Recorded! <<<")
                            recording_mode = False
                        else:
                            score = self.compare_paths(self.path, self.template_path)
                            print(f"Score: {score:.4f}")
                            if score < MATCH_THRESHOLD:
                                cv2.putText(small_frame, "WINGARDIUM LEVIOSA!", (10, 50), 
                                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                                self.send_command("on")
                    self.path = []

            # 描画処理
            if len(self.path) > 1:
                cv2.polylines(small_frame, [np.array(self.path, dtype=np.int32)], False, (0, 0, 255), 2)

            # テンプレート表示
            if self.template_path:
                preview_size = 100
                if w > preview_size and h > preview_size:
                    preview_img = np.zeros((preview_size, preview_size, 3), dtype=np.uint8)
                    tp_points = [(int(p[0]*(preview_size-10)+5), int(p[1]*(preview_size-10)+5)) for p in self.template_path]
                    if len(tp_points) > 1:
                        cv2.polylines(preview_img, [np.array(tp_points)], False, (0, 255, 0), 1)
                    small_frame[0:preview_size, w-preview_size:w] = preview_img

            if recording_mode:
                cv2.putText(small_frame, "REC MODE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

            cv2.imshow("Wand View", small_frame)
            cv2.imshow("Masked Input", masked_thresh) # デバッグ用:何が見えているか確認

            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'): break
            elif key == ord('r'):
                print("Recording mode...")
                self.path = []
                recording_mode = True
            elif key == ord('c'): # 再キャリブレーション(照明が変わった時など)
                print("Recalibrating background...")
                calibration_frames = 0
                self.bg_mask = None
                accumulated_bg = None

        cap.release()
        cv2.destroyAllWindows()
        if self.ser: self.ser.close()

if __name__ == "__main__":
    app = MagicWandSystem()
    app.run()


これでいい感じに表示されます。

rを押したあとに杖を振って、コマンドを登録。
その後同じように杖を降ったら認識してonと指定したポートに送ります。

コメント

タイトルとURLをコピーしました