OpenSCENARIO API
본 페이지는 MORAI OpenSCENARIO API Example 사용 방법을 소개한다.
MORAI OpenSCENARIO API는 Scenario Runner에서 사용하고 있는 OpenSCENARIO 파일 Import 기능 및 OpenSCENARIO 파일 Simulation 기능을 API로 제공하여 사용자가 Python Script를 통해서 해당 기능을 이용할 수 있게 한다.
환경설정
Example code 다운로드
22.R2.0 버전과 호환되는 예제 코드
window
linux
Python 환경설정
python 버전
pyconcrete를 사용하기 위해서는 .pye 파일을 컴파일하고 실행하는 python 버전이 동일해야 함
Please make sure the Python version for compiled .pye & execution .pye should be the same
window
python 3.7.3 버전으로 테스트 진행
linux
ubuntu 18.04 운영체제에서 python 3.7.13 버전으로 테스트 진행
설치 해야하는 python 모듈
numpy (1.19.1)
PyQt5
matplotlib
scipy
eventhandler
pyproj
grpcio(1.39.0)
grpcio-tools (1.39.0)
pyconcrete
패키지 설치 전 아래 명령어로 암호문 설정 필요
set PYCONCRETE_PASSPHRASE={your passphrase}
실행 방법 및 결과
실행 방법
Morai Launcher 실행 후 로그인
Morai Launcher에서 특정 시뮬레이터 버전 설치 및 실행
시뮬레이터의 맵 선택 및 차량 선택 화면에 위치한 상태에서 main.py 스크립트 실행
실행 결과
5개의 시나리오가 연속적으로 실행되는 것을 확인
상세 설명
파일 구조
example 코드
main.py
open_scenario_client_wrapper.py
open_scenario_importer_wrapper.py
data 폴더
OpenSCENARIO 예제 파일
lib 폴더
암호화하여 제공된 MORAI OpenSCENARIO 라이브러리
예제 코드 동작 방식 설명
main 함수에서 시나리오 파일에 대해 차례대로 OpenScenarioImporterWrapper
클래스를 사용하여 시나리오 파일을 Import 하고, OpenScenarioClientWrapper
클래스를 통해 시나리오 파일 Simulation을 시작한다.
OpenScenarioImporterWrapper
, OpenScenarioClientWrapper
클래스는 MORAI OpenSCENARIO 라이브러리 사용 예시를 보여주기 위한 클래스로 사용자가 임의로 수정 가능하다.
현재 시뮬레이터와 gRPC 통신을 하기 위해서는 아래와 같은 IP, Port를 사용
localhost:7789
main.py
import os
import sys
import numpy as np
import math
import time
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, '../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../../')))
from open_scenario_importer_wrapper import OpenScenarioImporterWrapper
from open_scenario_client_wrapper import OpenScenarioClientWrapper
if __name__ == '__main__':
# 실행할 시나리오 리스트
scenario_list = [
'.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_1.xosc',
'.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_2.xosc',
'.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_3.xosc',
'.\data\openscenario\V_RHT_Suburb_03\Scenario_Animal_Deer.xosc',
'.\data\openscenario\V_RHT_Suburb_03\Scenario_UTurn.xosc'
]
client_wrapper = OpenScenarioClientWrapper('127.0.0.1', '7789')
for scenario_file_path in scenario_list:
# OpenSCENARIO 파일 Import
importer_wrapper = OpenScenarioImporterWrapper(scenario_file_path)
importer_wrapper.import_open_scenario()
client_wrapper.set_open_scenario_importer(importer_wrapper.scenario_importer)
# 시나리오 시작
client_wrapper.start_scenario()
# 시나리오가 종료될 때까지 대기
# 시나리오가 종료되면 자동으로 OpenScenarioClientWrapper에서 Stop Scenario 호출
while client_wrapper.is_stopped == False:
time.sleep(1)
pass
open_scenario_client_wrapper.py
import os
import sys
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, '../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../../')))
sys.path.append(current_path)
from lib.openscenario.client.open_scenario_client import OpenScenarioClient
class OpenScenarioClientWrapper():
def __init__(self, ip, port):
self.client = OpenScenarioClient(ip, port, self)
self.is_stopped = False
self.open_scenario_importer = None
def set_open_scenario_importer(self, open_scenario_importer):
""" open_scenario_importer instance 설정 """
self.client.set_open_scenario_importer(open_scenario_importer)
def start_scenario(self):
self.is_stopped = False
self.client.start_scenario()
def stop_scenario(self):
self.is_stopped = True
self.client.stop_scenario()
def start_scenario_callback(self, result, desc):
if result is False:
print('Failed to start scenario. Please see the log for details.')
open_scenario_importer_wrapper.py
import os
import sys
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, '../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../')))
sys.path.append(os.path.normpath(os.path.join(current_path, '../../../')))
sys.path.append(current_path)
from lib.openscenario.open_scenario_importer import OpenScenarioImporter
from lib.mgeo.class_defs.mgeo import MGeo
class OpenScenarioImporterWrapper():
""" OpenScenario 파일 Import 클래스 """
def __init__(self, file_path):
self.scenario_importer = OpenScenarioImporter(file_path)
self.scenario_data = None
self.mgeo_folder_path = None
def import_open_scenario(self):
self.scenario_importer.import_open_scenario()
self.scenario_data = self.scenario_importer.scenario_definition
self.mgeo_folder_path = self.scenario_data.mgeo_folder_path
self.mgeo_planner_map = MGeo.create_instance_from_json(self.mgeo_folder_path)
self.scenario_importer.set_mgeo(self.mgeo_planner_map)
def clear(self):
self.scenario_importer.clear()