본 페이지는 MORAI Simulator Control(MSC) Quick Start 사용 방법을 소개한다.

MSC는 UI를 직접 이용하지않고 통신 API를 이용하여 시뮬레이터를 제어 할 수 있는 기능이고, 이를 이용한 Unit Test Example Manual이다.


환경설정

Example code 다운로드

Python version

  • Example code는 python 3.7.5에서 작성 및 테스트 함.

Morai Launcher 실행

  • MSC를 사용하기 위해 Morai Launcher를 실행 후 좌상단의 ‘MSC is connected’를 확인한다.
    * ‘MSC is disconnect’ 일땐 다른 런쳐가 실행 중 인지 확인 필요.

Code 실행

  • 시뮬레이터 및 Code 실행이 다른 환경(IP 가 다른 환경)에서 진행 되면 params.txt , params.json 의 ip를 수정해주어야 한다.

  • 다운로드 받은 파일 중 params.txt를 열어 사용할 id, pw, version ,차량, 맵을 대소문자 구분 하여 채워준다.
    e.g)
    receive_user_ip : 127.0.0.1
    receive_user_port : 10329
    request_dst_ip : 127.0.0.1
    request_dst_port : 10509
    user_id : id
    user_pw : pw
    version : v.3.9.210226.3
    map/vehicle : K-City/Niro

  • api.py를 실행하면 parmas.txt에 입력 한 id, pw ,version, map/vehicle에 맞춰 시뮬레이터를 실행 하고 알고리즘 테스트를 확인할 수 있다.

Simulator Network setting

  • Network Setting은 Scenario를 저장할때 할때의 네트워크 정보를 적용.

  • 네트워크 setting은 시뮬레이터 상단 메뉴 Network → Network Settings에서 다음과 같이 한다.

    • Host IP : 127.0.0.1

    • Destination IP : 127.0.0.1

    • Ego Ctrl Cmd

      • Host PORT : 7601

      • Destination PORT : 7600

    • Ego Vehicle Status

      • Host PORT : 8001

      • Destination PORT : 8000

    • Object Info

      • Host PORT : 8101

      • Destination PORT : 8100

IP / Ego Ctrl Cmd

Ego Vehicle Status / Object Info


코드

  • class launcher_startparmas.txt에 입력한 정보를 읽어 MSC를 통해 시뮬레이션을 시작해주는 class

  • class gen_planner unit test를 할 알고리즘 class.

    • 해당 class 에서 msc를 사용하고자 할 때 .

      • commander() : define.py의 Command_list에 정의된 기능을 사용 하고자 할 때

      • custom_command() : gen_planner class안에서 사용자가 option을 변경하며
        MSC명령을 사용하고자 할 때

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time,os,json,threading
from lib.utils import *

from lib.define import *


from lib.controller import *


class launcher_start(controller):
        
    def __init__(self):                
        self.controller = controller()

    def launcher_start(self):
        
        while True:           
            
            if self.controller.update():             
                self.controller.is_waitting() #status_wait 
                self.controller.is_downloading() #check_is_download_status                                      

                if self.controller.is_befor_login():
                                        
                    self.controller.commander(Command.LOGIN)#Login명령
                    
                
                if self.controller.is_after_login() or self.controller.is_after_sim_quit_to_launcher():  # is_after_sim_quit_to_launcher : Simulator에서 quit 명령 후 Launcher 복귀 상태 확인

                    self.controller.commander(Command.SELECT_VER)#version select명령

                if self.controller.is_not_find_version(): #version_error         
                    break
                    
                if self.controller.is_can_execute_sim(): 
                                                                    
                    self.controller.commander(Command.EXECUTE_SIM) #Simulator 실행 명령                        

                    self.controller.watting_execute()
                                            
                if self.controller.is_sim_not_install():                                
                    self.controller.commander(Command.INSTALL_SIM) #Simulator 설치 명령                     


                if self.controller.is_sim_lobby():                    

                    self.controller.commander(Command.MAP_VEHICLE_SELECT)#시뮬레이션/옵션 변경 실행 명령

                    self.controller.watting_loading()

                if self.controller.is_sim_playing():          

                    self.controller.commander(Command.NET_SETTING) #Network setting
                    
                    self.controller.commander(Command.SEN_SETTING) #Sensor setting

                    self.controller.commander(Command.SCEN_SETTING) #Scenraio setting
                    planner(self.controller)
                    break

            else :
                print("[NO Simulator Control Data]")
                time.sleep(1)            



class planner:

    def __init__(self,controller):
        path = os.path.dirname( os.path.abspath( __file__ ) )      

        with open(os.path.join(path,("params.json")),'r') as fp :
            params = json.load(fp)

        params=params["params"]
        user_ip = params["user_ip"]
        host_ip = params["host_ip"]


        status_port =params["vehicle_status_dst_port"]
        object_port =params["object_info_dst_port"]
        get_traffic_port=params["get_traffic_dst_port"]

        set_traffic_port=params["set_traffic_host_port"]
        ctrl_cmd_port = params["ctrl_cmd_host_port"]

        planner_path_file_name = params["planner_path_file_name"]

        traffic_greenlight_setting= params["traffic_greenlight_setting"]
        
        
        self.status=udp_parser(user_ip, status_port,'status')
        self.obj=udp_parser(user_ip, object_port,'obj')
        self.traffic=udp_parser(user_ip, get_traffic_port,'get_traffic')


        self.ctrl_cmd=udp_sender(host_ip,ctrl_cmd_port,'ctrl_cmd')
        self.set_traffic=udp_sender(host_ip,set_traffic_port,'set_traffic')
        

        self.txt_reader=pathReader()
        self.global_path=self.txt_reader.read(planner_path_file_name)


        vel_planner=velocityPlanning(60,2) ## 경로기반 속도 계획
        self.vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)        

        self.pure_pursuit=purePursuit()
        self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
        
        self.vo=vaildObject()## 장애물 유무 확인

        self.pid=pidController() ## pidController import        
        self.current_waypoint=0
  

        self._is_status=False
        while not self._is_status :
            # if not self.status.get_data() :
            controller.commander(Command.SCEN_SETTING)
            print('No Status Data Cannot run main_loop')
            time.sleep(0.5)
        # else :
            self._is_status=True                

        self.main_loop()

    
    def main_loop(self):        
        
        self.timer=threading.Timer(0.1,self.main_loop)
        self.timer.start()
        
        status_data=self.status.get_data()
        obj_data=self.obj.get_data()
        traffic_data = self.traffic.get_data()
        position_x=status_data[4]
        position_y=status_data[5]
        position_z=status_data[6]
        heading=status_data[9]# degree
        velocity=status_data[10]

        #set trafficlight (green)
        if not len(traffic_data) == 0 and traffic_greenlight_setting == "True": #set trafficlight (green)                        
            self.set_traffic.send_data([False,traffic_data[1],16])
            traffic_data[3]=16


        #fine_local_path, waypoint
        local_path,current_waypoint =findLocalPath(self.global_path,position_x,position_y)
        
        
        ## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
        self.vo.get_object(obj_data)
        global_obj,local_obj=self.vo.calc_vaild_obj([position_x,position_y,(heading)/180*pi])

        if not len(traffic_data) == 0:
            self.cc.checkObject(local_path,global_obj,local_obj,traffic_data[1],traffic_data[3])
        else:
            self.cc.checkObject(local_path,global_obj,local_obj,[],[])         

        #pure_pursuit. get_steering_angle_value                
        self.pure_pursuit.getPath(local_path) 
        self.pure_pursuit.getEgoStatus(position_x,position_y,position_z,velocity,heading)
            
        steering_angle=self.pure_pursuit.steering_angle()


        #ACC                 
        cc_vel = self.cc.acc(local_obj,velocity,self.vel_profile[current_waypoint]) ## advanced cruise control 적용한 속도 계획        
        print(self.vel_profile[current_waypoint])
        target_velocity = cc_vel
              

        control_input=self.pid.pid(target_velocity,velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
        if control_input > 0 :
            accel= control_input
            brake= 0
        else :
            accel= 0
            brake= -control_input

        ctrl_mode = 2 # 2 = AutoMode / 1 = KeyBoard
        Gear = 4 # 4 1 : (P / parking ) 2 (R / reverse) 3 (N / Neutral)  4 : (D / Drive) 5 : (L)


        self.ctrl_cmd.send_data([ctrl_mode,Gear,accel,brake,steering_angle])     
        
        print('current_waypoint:{0}\tglobal_path_lenght:{1}'.format(self.current_waypoint,len(self.global_path)))
        print("sterring_angle:{0}\taccel:{1}\tbrake:{2}".format(steering_angle,accel,brake))
        
        
        if self.current_waypoint == len(self.global_path)-1: #Scenario끝나면 다시 Scenraio load           
            self.current_waypoint=0
            self.controller.commander(Command.SCEN_SETTING)
PY