-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgenerate.py
106 lines (88 loc) · 3.43 KB
/
generate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
import os
import glob
import time
import socket
import simplejson as json
import argparse
from recognize import test_with_data, test_with_kinect, test_with_webcam, load_model
from recognize import load_kinect_module, load_openpose_module
from robot.adapter import adapt_behavior, JOINT_NAMES
from robot.selector import select_behavior
from setting import LSTM_MODEL_PATH
# gather all existing models
model_files = glob.glob(os.path.join(LSTM_MODEL_PATH, "*.pth"))
model_numbers = list()
for model_file in model_files:
model_name, _ = os.path.splitext(os.path.basename(model_file))
model_numbers.append(int(model_name[6:10]))
# argument parser
parser = argparse.ArgumentParser()
parser.add_argument('--use_robot', help='use robot', action='store_true')
parser.add_argument('-m', '--mode', type=str, help='mode to run', choices=['data', 'kinect', 'webcam'], required=True)
parser.add_argument('-l', '--model', type=int, help='model number', choices=model_numbers, default=26)
args = parser.parse_args()
MODEL_FILE = os.path.join(LSTM_MODEL_PATH, f"model_{args.model:04d}.pth")
model = load_model(MODEL_FILE)
def main():
# create socket connection
if args.use_robot:
HOST = "127.0.0.1"
CMD_PORT = 10240
cmd_sock = init_socket(HOST, CMD_PORT)
stand_pose = {JOINT_NAMES[idx]: float(adapt_behavior('stand')[idx]) for idx in range(len(JOINT_NAMES))}
send_behavior(cmd_sock, stand_pose)
else:
cmd_sock = None
# user behavior recognition
if args.mode == "data":
input_generator = test_with_data(model)
if args.mode == "kinect":
load_kinect_module()
input_generator = test_with_kinect(model)
if args.mode == "webcam":
load_openpose_module()
input_generator = test_with_webcam(model)
# select and adapt behavior
user_behaviors = list()
last_change_time = time.time()
prev_behavior = ""
for user_pose, user_behavior in input_generator:
if user_behavior is None:
continue
user_behaviors.append(user_behavior)
user_behaviors = user_behaviors[-5:]
if len(user_behaviors) < 5:
continue
robot_behavior = select_behavior(user_behaviors)
if robot_behavior is None:
continue
print("robot:", robot_behavior)
if args.use_robot:
if prev_behavior != robot_behavior:
if time.time() - last_change_time < 1.0:
continue
else:
last_change_time = time.time()
robot_pose = adapt_behavior(robot_behavior, user_pose)
pose = {JOINT_NAMES[idx]: float(robot_pose[idx]) for idx in range(len(JOINT_NAMES))}
send_behavior(cmd_sock, pose)
prev_behavior = robot_behavior
# wait for connection to server
def init_socket(HOST, CMD_PORT):
while True:
try:
cmd_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cmd_sock.connect((HOST, CMD_PORT))
print("Connect to %s" % str(HOST))
return cmd_sock
except socket.error:
print("Connection failed, retrying...")
time.sleep(3)
# send pose to server
def send_behavior(cmd_sock, pose):
json_string = json.dumps({'target_angles': pose})
cmd_sock.send(str(len(json_string)).ljust(16).encode('utf-8'))
cmd_sock.sendall(json_string.encode('utf-8'))
if __name__ == '__main__':
main()