본 페이지는 MORAI OpenSCENARIO API Example 사용 방법을 소개한다.

MORAI OpenSCENARIO API는 Scenario Runner에서 사용하고 있는 OpenSCENARIO 파일 Import 기능 및 OpenSCENARIO 파일 Simulation 기능을 API로 제공하여 사용자가 Python Script를 통해서 해당 기능을 이용할 수 있게 한다.


환경설정

Example code 다운로드

22.R2.0 버전과 호환되는 예제 코드

  • window

OpenSCENARIO_API.zip

  • linux

OpenSCENARIO_API_Linux.zip

Python 환경설정

  • python 버전

    • pyconcrete를 사용하기 위해서는 .pye 파일을 컴파일하고 실행하는 python 버전이 동일해야 함

    • window

      • python 3.7.3 버전으로 테스트 진행

    • linux

      • ubuntu 18.04 운영체제에서 python 3.7.13 버전으로 테스트 진행

  • 설치 해야하는 python 모듈

    • numpy (1.19.1)

    • PyQt5

    • matplotlib

    • scipy

    • eventhandler

    • pyproj

    • grpcio(1.39.0)

    • grpcio-tools (1.39.0)

    • pyconcrete

      • 패키지 설치 전 아래 명령어로 암호문 설정 필요

        • set PYCONCRETE_PASSPHRASE={your passphrase}

실행 방법 및 결과

실행 방법

  • Morai Launcher 실행 후 로그인

  • Morai Launcher에서 특정 시뮬레이터 버전 설치 및 실행

  • 시뮬레이터의 맵 선택 및 차량 선택 화면에 위치한 상태에서 main.py 스크립트 실행

실행 결과

  • 5개의 시나리오가 연속적으로 실행되는 것을 확인

상세 설명

파일 구조

  • example 코드

    • main.py

    • open_scenario_client_wrapper.py

    • open_scenario_importer_wrapper.py

    • data 폴더

      • OpenSCENARIO 예제 파일

    • lib 폴더

      • 암호화하여 제공된 MORAI OpenSCENARIO 라이브러리

예제 코드 동작 방식 설명

main 함수에서 시나리오 파일에 대해 차례대로 OpenScenarioImporterWrapper 클래스를 사용하여 시나리오 파일을 Import 하고, OpenScenarioClientWrapper 클래스를 통해 시나리오 파일 Simulation을 시작한다.

OpenScenarioImporterWrapper , OpenScenarioClientWrapper 클래스는 MORAI OpenSCENARIO 라이브러리 사용 예시를 보여주기 위한 클래스로 사용자가 임의로 수정 가능하다.

  • 현재 시뮬레이터와 gRPC 통신을 하기 위해서는 아래와 같은 IP, Port를 사용

    • localhost:7789

main.py

import os
import sys
import numpy as np
import math
import time

current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, '../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../../')))

from open_scenario_importer_wrapper import OpenScenarioImporterWrapper
from open_scenario_client_wrapper import OpenScenarioClientWrapper

if __name__ == '__main__':
    # 실행할 시나리오 리스트
    scenario_list = [
        '.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_1.xosc',
        '.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_2.xosc',
        '.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_3.xosc',
        '.\data\openscenario\V_RHT_Suburb_03\Scenario_Animal_Deer.xosc',
        '.\data\openscenario\V_RHT_Suburb_03\Scenario_UTurn.xosc'
    ]

    client_wrapper = OpenScenarioClientWrapper('127.0.0.1', '7789')
    for scenario_file_path in scenario_list:
        # OpenSCENARIO 파일 Import
        importer_wrapper = OpenScenarioImporterWrapper(scenario_file_path)
        importer_wrapper.import_open_scenario()
        
        client_wrapper.set_open_scenario_importer(importer_wrapper.scenario_importer)
        
        # 시나리오 시작
        client_wrapper.start_scenario()

        # 시나리오가 종료될 때까지 대기
        # 시나리오가 종료되면 자동으로 OpenScenarioClientWrapper에서 Stop Scenario 호출
        while client_wrapper.is_stopped == False:
            time.sleep(1)
            pass
PY

open_scenario_client_wrapper.py

import os
import sys

current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, '../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../../')))
sys.path.append(current_path)

from lib.openscenario.client.open_scenario_client import OpenScenarioClient

class OpenScenarioClientWrapper():   
    def __init__(self, ip, port):
        self.client = OpenScenarioClient(ip, port, self) 
        self.is_stopped = False
        self.open_scenario_importer = None

    def set_open_scenario_importer(self, open_scenario_importer):
        """ open_scenario_importer instance 설정 """
        self.client.set_open_scenario_importer(open_scenario_importer)
        
    def start_scenario(self):
        self.is_stopped = False
        self.client.start_scenario()

    def stop_scenario(self):
        self.is_stopped = True
        self.client.stop_scenario()

    def start_scenario_callback(self, result, desc):
        if result is False:
            print('Failed to start scenario. Please see the log for details.')
CODE

open_scenario_importer_wrapper.py

import os
import sys

current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, '../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../../')))
sys.path.append(current_path)

from lib.openscenario.open_scenario_importer import OpenScenarioImporter
from lib.mgeo.class_defs.mgeo import MGeo

class OpenScenarioImporterWrapper(): 
    """ OpenScenario 파일 Import 클래스 """   
    def __init__(self, file_path):       
        self.scenario_importer = OpenScenarioImporter(file_path)
        self.scenario_data = None
        self.mgeo_folder_path = None
    
    def import_open_scenario(self):
        self.scenario_importer.import_open_scenario()

        self.scenario_data = self.scenario_importer.scenario_definition
        self.mgeo_folder_path = self.scenario_data.mgeo_folder_path
        
        self.mgeo_planner_map = MGeo.create_instance_from_json(self.mgeo_folder_path)

        self.scenario_importer.set_mgeo(self.mgeo_planner_map)

    def clear(self):
        self.scenario_importer.clear()
CODE