-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathrun_in_ci.py
407 lines (332 loc) · 17.5 KB
/
run_in_ci.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
# Built-in
import argparse
import os
import subprocess
import pathlib
import sys
import json
# Needs to be installed via pip
import boto3
current_folder = os.path.dirname(pathlib.Path(__file__).resolve())
if sys.platform == "win32" or sys.platform == "cygwin":
current_folder += "\\"
else:
current_folder += "/"
config_json = None
config_json_arguments_list = []
pfx_certificate_store_location = "CurrentUser\\My"
pfx_password = "" # Setting a password causes issues, but an empty string is valid so we use that
def setup_json_arguments_list(file, input_uuid=None):
global config_json
global config_json_arguments_list
print("Attempting to get credentials from secrets using Boto3...")
secrets_client = boto3.client("secretsmanager", region_name=config_json['runnable_region'])
print("Processing arguments...")
for argument in config_json['arguments']:
# Add the name of the argument
if( 'name' in argument):
config_json_arguments_list.append(argument['name'])
# Based on the data present, we need to process and add the data differently
try:
# Is there a secret? If so, decode it!
if 'secret' in argument:
secret_data = secrets_client.get_secret_value(SecretId=argument['secret'])["SecretString"]
# Is this supposed to be stored in a file?
if 'filename' in argument:
with open(str(current_folder) + argument['filename'], "w") as file:
# lgtm [py/clear-text-storage-sensitive-data]
file.write(secret_data)
config_json_arguments_list.append(str(current_folder) + argument['filename'])
else:
config_json_arguments_list.append(secret_data)
if 'pkcs11_key' in argument:
pkcs11_result = make_softhsm_key(str(current_folder) + argument['filename'])
if (pkcs11_result != 0):
print ("ERROR with PKCS11!")
return pkcs11_result
# Windows 10 certificate store data?
elif 'windows_cert_certificate' in argument and 'windows_cert_certificate_path' in argument \
and 'windows_cert_key' in argument and 'windows_cert_key_path' in argument != None \
and 'windows_cert_pfx_key_path' in argument != None:
windows_cert_data = secrets_client.get_secret_value(SecretId=argument['windows_cert_certificate'])["SecretString"]
with open(str(current_folder) + argument['windows_cert_certificate_path'], "w") as file:
# lgtm [py/clear-text-storage-sensitive-data]
file.write(windows_cert_data)
windows_key_data = secrets_client.get_secret_value(SecretId=argument['windows_cert_key'])["SecretString"]
with open(str(current_folder) + argument['windows_cert_key_path'], "w") as file:
# lgtm [py/clear-text-storage-sensitive-data]
file.write(windows_key_data)
certificate_path = make_windows_pfx_file(
str(current_folder) + argument['windows_cert_certificate_path'],
str(current_folder) + argument['windows_cert_key_path'],
str(current_folder) + argument['windows_cert_pfx_key_path'])
config_json_arguments_list.append(certificate_path)
# Raw data? just add it directly!
elif 'data' in argument:
tmp_value = argument['data']
if isinstance(tmp_value, str) and input_uuid is not None:
if ("$INPUT_UUID" in tmp_value):
tmp_value = tmp_value.replace("$INPUT_UUID", input_uuid)
if (tmp_value != None and tmp_value != ""):
config_json_arguments_list.append(tmp_value)
# None of the above? Just print an error
else:
print("ERROR - unknown or missing argument value!")
except Exception as e:
print(f"Something went wrong processing {argument['name']}: {e}!")
return -1
return 0
def make_softhsm_key(private_key_path):
print ("Setting up private key via SoftHSM")
softhsm_run = subprocess.run("softhsm2-util --init-token --free --label my-token --pin 0000 --so-pin 0000", shell=True)
if (softhsm_run.returncode != 0):
print ("ERROR: SoftHSM could not initialize a new token")
return softhsm_run.returncode
softhsm_run = subprocess.run(f"softhsm2-util --import {private_key_path} --token my-token --label my-key --id BEEFCAFE --pin 0000", shell=True)
if (softhsm_run.returncode != 0):
print ("ERROR: SoftHSM could not import token")
print ("Finished setting up private key in SoftHSM")
return 0
def make_windows_pfx_file(certificate_file_path, private_key_path, pfx_file_path):
global pfx_certificate_store_location
global pfx_password
if sys.platform == "win32" or sys.platform == "cygwin":
if os.path.isfile(certificate_file_path) != True:
print (certificate_file_path)
print("ERROR: Certificate file not found!")
return 1
if os.path.isfile(private_key_path) != True:
print("ERROR: Private key file not found!")
return 1
# Delete old PFX file if it exists
if os.path.isfile(pfx_file_path):
os.remove(pfx_file_path)
# Make a key copy
copy_path = os.path.splitext(certificate_file_path)
with open(copy_path[0] + ".key", 'w') as file:
key_file = open(private_key_path)
file.write(key_file.read())
key_file.close()
# Make a PFX file
arguments = ["certutil", "-mergePFX", certificate_file_path, pfx_file_path]
certutil_run = subprocess.run(args=arguments, shell=True, input=f"{pfx_password}\n{pfx_password}", encoding='ascii')
if (certutil_run.returncode != 0):
print ("ERROR: Could not make PFX file")
return 1
else:
print ("PFX file created successfully")
# Remove the temporary key copy
if os.path.isfile(copy_path[0] + ".key"):
os.remove(copy_path[0] + ".key")
# Import the PFX into the Windows Certificate Store
# (Passing '$mypwd' is required even though it is empty and our certificate has no password. It fails CI otherwise)
import_pfx_arguments = [
"powershell.exe",
# Powershell 7.3 introduced an issue where launching powershell from cmd would not set PSModulePath correctly.
# As a workaround, we set `PSModulePath` to empty so powershell would automatically reset the PSModulePath to default.
# More details: https://github.com/PowerShell/PowerShell/issues/18530
"$env:PSModulePath = '';",
"Import-PfxCertificate",
"-FilePath", pfx_file_path,
"-CertStoreLocation",
"Cert:\\" + pfx_certificate_store_location,
"-Password",
"$mypwd"]
import_pfx_run = subprocess.run(args=import_pfx_arguments, shell=True, stdout=subprocess.PIPE)
if (import_pfx_run.returncode != 0):
print ("ERROR: Could not import PFX certificate into Windows store!")
return 1
else:
print ("Certificate imported to Windows Certificate Store successfully")
# Get the certificate thumbprint from the output:
import_pfx_output = str(import_pfx_run.stdout)
# We know the Thumbprint will always be 40 characters long, so we can find it using that
# TODO: Extract this using a better method
thumbprint = ""
current_str = ""
# The input comes as a string with some special characters still included, so we need to remove them!
import_pfx_output = import_pfx_output.replace("\\r", " ")
import_pfx_output = import_pfx_output.replace("\\n", "\n")
for i in range(0, len(import_pfx_output)):
if (import_pfx_output[i] == " " or import_pfx_output[i] == "\n"):
if (len(current_str) == 40):
thumbprint = current_str
break
current_str = ""
else:
current_str += import_pfx_output[i]
# Did we get a thumbprint?
if (thumbprint == ""):
print ("ERROR: Could not find certificate thumbprint")
return 1
# Construct the certificate path
print ("PFX certificate created and imported successfully!")
return pfx_certificate_store_location + "\\" + thumbprint
else:
print("ERROR - Windows PFX file can only be created on a Windows platform!")
return 1
def setup_runnable(file, input_uuid=None):
global config_json
file_absolute = pathlib.Path(file).resolve()
json_file_data = ""
with open(file_absolute, "r") as json_file:
json_file_data = json_file.read()
# Load the JSON data
config_json = json.loads(json_file_data)
# Make sure required parameters are all there
if not 'language' in config_json or not 'runnable_file' in config_json \
or not 'runnable_region' in config_json or not 'runnable_main_class' in config_json:
return -1
# Preprocess runnable arguments (get secret data, etc)
setup_result = setup_json_arguments_list(file, input_uuid)
if setup_result != 0:
return setup_result
print("JSON config file loaded!")
return 0
def cleanup_runnable():
global config_json
global config_json_arguments_list
for argument in config_json['arguments']:
if( 'name' in argument):
config_json_arguments_list.append(argument['name'])
# Based on the data present, we need to process and add the data differently
try:
# Is there a file? If so, clean it!
if 'filename' in argument:
if (os.path.isfile(str(current_folder) + argument['filename'])):
os.remove(str(current_folder) + argument['filename'])
# Windows 10 certificate store data?
if 'windows_cert_certificate' in argument and 'windows_cert_certificate_path' in argument \
and 'windows_cert_key' in argument and 'windows_cert_key_path' in argument \
and 'windows_cert_pfx_key_path' in argument:
if (os.path.isfile(str(current_folder) + argument['windows_cert_certificate_path'])):
os.remove(str(current_folder) + argument['windows_cert_certificate_path'])
if (os.path.isfile(str(current_folder) + argument['windows_cert_key_path'])):
os.remove(str(current_folder) + argument['windows_cert_key_path'])
if (os.path.isfile(str(current_folder) + argument['windows_cert_pfx_key_path'])):
os.remove(str(current_folder) + argument['windows_cert_pfx_key_path'])
except Exception as e:
print(f"Something went wrong cleaning {argument['name']}!")
return -1
def launch_runnable(runnable_dir):
global config_json
global config_json_arguments_list
if (config_json == None):
print("No configuration JSON file data found!")
return -1
# Prepare data for runnable's STDIN
subprocess_stdin = None
if "stdin_file" in config_json:
stdin_file = os.path.join(runnable_dir, config_json['stdin_file'])
with open(stdin_file, "rb") as file:
subprocess_stdin = file.read()
exit_code = 0
runnable_timeout = None
if ('timeout' in config_json):
runnable_timeout = config_json['timeout']
print("Launching runnable...")
try:
# Java
if (config_json['language'] == "Java"):
# Flatten arguments down into a single string
arguments_as_string = ""
for i in range(0, len(config_json_arguments_list)):
arguments_as_string += str(config_json_arguments_list[i])
if (i+1 < len(config_json_arguments_list)):
arguments_as_string += " "
arguments = ["mvn", "compile", "exec:java"]
arguments.append("-pl")
arguments.append(config_json['runnable_file'])
arguments.append("-Dexec.mainClass=" + config_json['runnable_main_class'])
arguments.append("-Daws.crt.ci=True")
# We have to do this as a string, unfortunately, due to how -Dexec.args= works...
argument_string = subprocess.list2cmdline(arguments) + " -Dexec.args=\"" + arguments_as_string + "\""
print(f"Running cmd: {argument_string}")
runnable_return = subprocess.run(argument_string, input=subprocess_stdin, timeout=runnable_timeout, shell=True)
exit_code = runnable_return.returncode
elif (config_json['language'] == "Java JAR"):
# Flatten arguments down into a single string
arguments_as_string = ""
for i in range(0, len(config_json_arguments_list)):
arguments_as_string += str(config_json_arguments_list[i])
if (i+1 < len(config_json_arguments_list)):
arguments_as_string += " "
runnable_file = os.path.join(runnable_dir, config_json['runnable_file'])
arguments = ["java"]
arguments.append("-Daws.crt.ci=True")
arguments.append("-jar")
arguments.append(runnable_file)
argument_string = subprocess.list2cmdline(arguments) + " " + arguments_as_string
print(f"Running cmd: {argument_string}")
runnable_return = subprocess.run(argument_string, input=subprocess_stdin, timeout=runnable_timeout, shell=True)
exit_code = runnable_return.returncode
# C++
elif (config_json['language'] == "CPP"):
runnable_file = os.path.join(runnable_dir, config_json['runnable_file'])
runnable_return = subprocess.run(args=config_json_arguments_list, input=subprocess_stdin, timeout=runnable_timeout, executable=runnable_file)
exit_code = runnable_return.returncode
elif (config_json['language'] == "Python"):
runnable_file = os.path.join(runnable_dir, config_json['runnable_file'])
runnable_return = subprocess.run(
args=[sys.executable, runnable_file] + config_json_arguments_list, input=subprocess_stdin, timeout=runnable_timeout)
exit_code = runnable_return.returncode
elif (config_json['language'] == "Javascript"):
os.chdir(config_json['runnable_file'])
config_json_arguments_list.append("--is_ci")
config_json_arguments_list.append("true")
runnable_return_one = None
if not 'skip_install' in config_json:
if sys.platform == "win32" or sys.platform == "cygwin":
runnable_return_one = subprocess.run(args=["npm", "install"], shell=True, timeout=runnable_timeout)
else:
runnable_return_one = subprocess.run(args=["npm", "install"], timeout=runnable_timeout)
if not 'skip_install' in config_json and (runnable_return_one == None or runnable_return_one.returncode != 0):
exit_code = runnable_return_one.returncode
else:
runnable_return_two = None
arguments = []
if 'node_cmd' in config_json:
arguments = config_json['node_cmd'].split(" ")
else:
arguments = ["node", "dist/index.js"]
if sys.platform == "win32" or sys.platform == "cygwin":
runnable_return_two = subprocess.run(
args=arguments + config_json_arguments_list, shell=True, check=True, timeout=runnable_timeout)
else:
runnable_return_two = subprocess.run(
args=arguments + config_json_arguments_list, input=subprocess_stdin, timeout=runnable_timeout)
if (runnable_return_two != None):
exit_code = runnable_return_two.returncode
else:
exit_code = 1
except subprocess.CalledProcessError as e:
print(e.output)
exit_code = 1
cleanup_runnable()
return exit_code
def setup_and_launch(file, input_uuid=None, runnable_dir=''):
setup_result = setup_runnable(file, input_uuid)
if setup_result != 0:
print("Setting up runnable failed")
return setup_result
print("About to launch runnable...")
return launch_runnable(runnable_dir)
def main():
argument_parser = argparse.ArgumentParser(
description="Run runnable in CI")
argument_parser.add_argument("--file", required=True, help="Configuration file to pull CI data from")
argument_parser.add_argument("--input_uuid", required=False,
help="UUID data to replace '$INPUT_UUID' with. Only works in Data field")
argument_parser.add_argument("--runnable_dir", required=False, default='',
help="Directory where runnable_file is located")
parsed_commands = argument_parser.parse_args()
file = parsed_commands.file
input_uuid = parsed_commands.input_uuid
runnable_dir = parsed_commands.runnable_dir
print(f"Starting to launch runnable: config {file}; input UUID: {input_uuid}")
test_result = setup_and_launch(file, input_uuid, runnable_dir)
sys.exit(test_result)
if __name__ == "__main__":
main()