-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathci_iot_thing.py
134 lines (104 loc) · 4.51 KB
/
ci_iot_thing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
import sys
import boto3, time
from botocore.exceptions import ClientError, WaiterError
class ThingDetachedWaiter:
"""
Wait until principal (cert or Cognito identity) is detached from a thing.
Raise WaiterError after timeout seconds.
"""
def __init__(self, client, delay=2.0, max_delay=10.0, timeout=60.0):
self._client = client
self._delay = delay
self._max_delay = max_delay
self._timeout = timeout
def wait(self, thing_name):
start = time.monotonic()
sleep = self._delay
while True:
try:
resp = self._client.list_thing_principals(thingName=thing_name)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
return
raise
if not resp.get("principals"):
# No principals, we can move on.
return
if time.monotonic() - start > self._timeout:
raise WaiterError(
name="ThingDetached",
reason="timeout",
last_response=resp,
)
time.sleep(sleep)
sleep = min(sleep * 1.6, self._max_delay) # exponential backoff on retrys
def create_iot_thing(thing_name, region, policy_name, certificate_path, key_path, thing_group=None):
""" Create IoT thing along with policy and credentials. """
iot_client = boto3.client('iot', region_name=region)
print(f"Creating thing '{thing_name}'", file=sys.stderr)
iot_client.create_thing(thingName=thing_name)
if thing_group:
iot_client.add_thing_to_thing_group(thingGroupName=thing_group, thingName=thing_name)
try:
print("Creating certificate", file=sys.stderr)
create_cert_response = iot_client.create_keys_and_certificate(
setAsActive=True
)
f = open(certificate_path, "w")
f.write(create_cert_response['certificatePem'])
f.close()
f = open(key_path, "w")
f.write(create_cert_response['keyPair']['PrivateKey'])
f.close()
certificate_arn = create_cert_response['certificateArn']
print("Attaching policy to certificate", file=sys.stderr)
iot_client.attach_policy(policyName=policy_name, target=certificate_arn)
print("Attaching certificate to thing", file=sys.stderr)
iot_client.attach_thing_principal(thingName=thing_name, principal=certificate_arn)
except Exception:
try:
iot_client.delete_thing(thingName=thing_name)
except Exception:
print("ERROR: Could not delete thing", file=sys.stderr)
raise
print("IoT thing created successfully", file=sys.stderr)
def delete_iot_thing(thing_name, region):
""" Delete IoT thing and all its principals. """
try:
iot_client = boto3.client('iot', region_name=region)
except Exception as e:
print(f"ERROR: Could not make Boto3 client. Credentials likely could not be sourced", file=sys.stderr)
raise
cert_ids = []
# Detach and delete thing's principals.
try:
thing_principals = iot_client.list_thing_principals(thingName=thing_name)
print(f"Detaching and deleting principals: {thing_principals}", file=sys.stderr)
for principal in thing_principals["principals"]:
certificate_id = principal.split("/")[1]
iot_client.detach_thing_principal(thingName=thing_name, principal=principal)
iot_client.update_certificate(certificateId=certificate_id, newStatus='INACTIVE')
cert_ids.append(certificate_id)
except Exception:
print(f"ERROR: Could not detatch principal or set its certificate to INACTIVE for {thing_name}, probably thing does not exist",
file=sys.stderr)
raise
# Wait for thing to be free of principals
ThingDetachedWaiter(iot_client, timeout=10).wait(thing_name)
# Delete all the certificates
for cert in cert_ids:
try:
iot_client.delete_certificate(certificateId=cert, forceDelete=True)
except Exception:
print(f"ERROR: Could not delete certificate for IoT thing {thing_name}.",
file=sys.stderr)
raise
# Delete thing.
try:
iot_client.delete_thing(thingName=thing_name)
except Exception:
raise
print("IoT thing deleted successfully", file=sys.stderr)
return 0