MSC UDP Unit Test Quick Start Guide
본 페이지는 MORAI Simulator Control(MSC) Quick Start 사용 방법을 소개한다.
MSC는 UI를 직접 이용하지않고 통신 API를 이용하여 시뮬레이터를 제어 할 수 있는 기능이고, 이를 이용한 Unit Test Example Manual이다.
환경설정
Example code 다운로드
https://www.dropbox.com/s/hf77j7v3tmu4x1q/MSC_UNIT_TEST_UDP%2821.05.17%29.zip?dl=0
다운로드 받은 파일 구성은 다음과 같다.
msc_udp : msc_udp_exmaple 파일
Python version
Example code는 python 3.7.5에서 작성 및 테스트 함.
Morai Launcher 실행
MSC를 사용하기 위해 Morai Launcher를 실행 후 좌상단의 ‘MSC is connected’를 확인한다.
* ‘MSC is disconnect’ 일땐 다른 런쳐가 실행 중 인지 확인 필요.
Code 실행
시뮬레이터 및 Code 실행이 다른 환경(IP 가 다른 환경)에서 진행 되면 params.txt , params.json 의 ip를 수정해주어야 한다.
다운로드 받은 파일 중 params.txt를 열어 사용할 id, pw, version ,차량, 맵을 대소문자 구분 하여 채워준다.
e.g)
receive_user_ip : 127.0.0.1
receive_user_port : 10329
request_dst_ip : 127.0.0.1
request_dst_port : 10509
user_id : id
user_pw : pw
version : v.3.9.210226.3
map/vehicle : K-City/Niroapi.py를 실행하면 parmas.txt에 입력 한 id, pw ,version, map/vehicle에 맞춰 시뮬레이터를 실행 하고 알고리즘 테스트를 확인할 수 있다.
Simulator Network setting
Network Setting은 Scenario를 저장할때 할때의 네트워크 정보를 적용.
네트워크 setting은 시뮬레이터 상단 메뉴 Network → Network Settings에서 다음과 같이 한다.
Host IP : 127.0.0.1
Destination IP : 127.0.0.1
Ego Ctrl Cmd
Host PORT : 7601
Destination PORT : 7600
Ego Vehicle Status
Host PORT : 8001
Destination PORT : 8000
Object Info
Host PORT : 8101
Destination PORT : 8100

IP / Ego Ctrl Cmd

Ego Vehicle Status / Object Info
코드
class launcher_start
parmas.txt에 입력한 정보를 읽어 MSC를 통해 시뮬레이션을 시작해주는 classclass gen_planner
unit test를 할 알고리즘 class.해당 class 에서 msc를 사용하고자 할 때 .
commander() : define.py의
Command_list
에 정의된 기능을 사용 하고자 할 때custom_command() : gen_planner class안에서 사용자가 option을 변경하며
MSC명령을 사용하고자 할 때
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time,os,json,threading
from lib.utils import *
from lib.define import *
from lib.controller import *
class launcher_start(controller):
def __init__(self):
self.controller = controller()
def launcher_start(self):
while True:
if self.controller.update():
self.controller.is_waitting() #status_wait
self.controller.is_downloading() #check_is_download_status
if self.controller.is_befor_login():
self.controller.commander(Command.LOGIN)#Login명령
if self.controller.is_after_login() or self.controller.is_after_sim_quit_to_launcher(): # is_after_sim_quit_to_launcher : Simulator에서 quit 명령 후 Launcher 복귀 상태 확인
self.controller.commander(Command.SELECT_VER)#version select명령
if self.controller.is_not_find_version(): #version_error
break
if self.controller.is_can_execute_sim():
self.controller.commander(Command.EXECUTE_SIM) #Simulator 실행 명령
self.controller.watting_execute()
if self.controller.is_sim_not_install():
self.controller.commander(Command.INSTALL_SIM) #Simulator 설치 명령
if self.controller.is_sim_lobby():
self.controller.commander(Command.MAP_VEHICLE_SELECT)#시뮬레이션/옵션 변경 실행 명령
self.controller.watting_loading()
if self.controller.is_sim_playing():
self.controller.commander(Command.NET_SETTING) #Network setting
self.controller.commander(Command.SEN_SETTING) #Sensor setting
self.controller.commander(Command.SCEN_SETTING) #Scenraio setting
planner(self.controller)
break
else :
print("[NO Simulator Control Data]")
time.sleep(1)
class planner:
def __init__(self,controller):
path = os.path.dirname( os.path.abspath( __file__ ) )
with open(os.path.join(path,("params.json")),'r') as fp :
params = json.load(fp)
params=params["params"]
user_ip = params["user_ip"]
host_ip = params["host_ip"]
status_port =params["vehicle_status_dst_port"]
object_port =params["object_info_dst_port"]
get_traffic_port=params["get_traffic_dst_port"]
set_traffic_port=params["set_traffic_host_port"]
ctrl_cmd_port = params["ctrl_cmd_host_port"]
planner_path_file_name = params["planner_path_file_name"]
traffic_greenlight_setting= params["traffic_greenlight_setting"]
self.status=udp_parser(user_ip, status_port,'status')
self.obj=udp_parser(user_ip, object_port,'obj')
self.traffic=udp_parser(user_ip, get_traffic_port,'get_traffic')
self.ctrl_cmd=udp_sender(host_ip,ctrl_cmd_port,'ctrl_cmd')
self.set_traffic=udp_sender(host_ip,set_traffic_port,'set_traffic')
self.txt_reader=pathReader()
self.global_path=self.txt_reader.read(planner_path_file_name)
vel_planner=velocityPlanning(60,2) ## 경로기반 속도 계획
self.vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)
self.pure_pursuit=purePursuit()
self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
self.vo=vaildObject()## 장애물 유무 확인
self.pid=pidController() ## pidController import
self.current_waypoint=0
self._is_status=False
while not self._is_status :
# if not self.status.get_data() :
controller.commander(Command.SCEN_SETTING)
print('No Status Data Cannot run main_loop')
time.sleep(0.5)
# else :
self._is_status=True
self.main_loop()
def main_loop(self):
self.timer=threading.Timer(0.1,self.main_loop)
self.timer.start()
status_data=self.status.get_data()
obj_data=self.obj.get_data()
traffic_data = self.traffic.get_data()
position_x=status_data[4]
position_y=status_data[5]
position_z=status_data[6]
heading=status_data[9]# degree
velocity=status_data[10]
#set trafficlight (green)
if not len(traffic_data) == 0 and traffic_greenlight_setting == "True": #set trafficlight (green)
self.set_traffic.send_data([False,traffic_data[1],16])
traffic_data[3]=16
#fine_local_path, waypoint
local_path,current_waypoint =findLocalPath(self.global_path,position_x,position_y)
## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
self.vo.get_object(obj_data)
global_obj,local_obj=self.vo.calc_vaild_obj([position_x,position_y,(heading)/180*pi])
if not len(traffic_data) == 0:
self.cc.checkObject(local_path,global_obj,local_obj,traffic_data[1],traffic_data[3])
else:
self.cc.checkObject(local_path,global_obj,local_obj,[],[])
#pure_pursuit. get_steering_angle_value
self.pure_pursuit.getPath(local_path)
self.pure_pursuit.getEgoStatus(position_x,position_y,position_z,velocity,heading)
steering_angle=self.pure_pursuit.steering_angle()
#ACC
cc_vel = self.cc.acc(local_obj,velocity,self.vel_profile[current_waypoint]) ## advanced cruise control 적용한 속도 계획
print(self.vel_profile[current_waypoint])
target_velocity = cc_vel
control_input=self.pid.pid(target_velocity,velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
if control_input > 0 :
accel= control_input
brake= 0
else :
accel= 0
brake= -control_input
ctrl_mode = 2 # 2 = AutoMode / 1 = KeyBoard
Gear = 4 # 4 1 : (P / parking ) 2 (R / reverse) 3 (N / Neutral) 4 : (D / Drive) 5 : (L)
self.ctrl_cmd.send_data([ctrl_mode,Gear,accel,brake,steering_angle])
print('current_waypoint:{0}\tglobal_path_lenght:{1}'.format(self.current_waypoint,len(self.global_path)))
print("sterring_angle:{0}\taccel:{1}\tbrake:{2}".format(steering_angle,accel,brake))
if self.current_waypoint == len(self.global_path)-1: #Scenario끝나면 다시 Scenraio load
self.current_waypoint=0
self.controller.commander(Command.SCEN_SETTING)