-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
107 lines (87 loc) · 3.1 KB
/
main.py
File metadata and controls
107 lines (87 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
Description:
- This module assumes that the `certs/` directory is located one level
above the location of this module (i.e. `../certs/`)
- The purpose of this module is to provide a mechanism for developers
to use for having their X509 Certificate Signing Requests from
crypto coprocessors such as the ATECC608 signed by AWS IoT Core. If
not using a custom Private Key and CSR, this module can create them.
"""
from OpenSSL import crypto
import os
import sys
import datetime
import requests
# Variables
TYPE_RSA = crypto.TYPE_RSA
TYPE_DSA = crypto.TYPE_DSA
HERE = os.path.dirname(os.path.abspath(__file__))
CERTS_DIR = os.path.join(HERE, 'certs')
now = datetime.datetime.now()
d = now.date()
# Pull these out of scope
global key
def download_root_CA():
with open('certs/AmazonRootCA1.pem', 'w+') as f:
f.write(
requests.get(
'https://www.amazontrust.com/repository/AmazonRootCA1.pem'
).content.decode()
)
def generatekey(cn, bitlength=4096):
global key
keypath = os.path.join(CERTS_DIR, cn + '.key.pem')
if os.path.exists(keypath):
print(f"Using existing Private Key file: {keypath}")
with open(keypath, 'r') as f:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
else:
key = crypto.PKey()
print("Generating Key...")
key.generate_key(TYPE_RSA, bitlength)
with open(keypath, "w") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode())
print(f"Private Key created: {keypath}")
return keypath
def generatecsr(keypath, cn):
global key
csrpath = os.path.join(CERTS_DIR, cn + '.csr.pem')
req = crypto.X509Req()
req.get_subject().CN = cn
req.get_subject().C = "US"
req.get_subject().ST = "MN"
req.get_subject().L = "Lake Elmo"
req.get_subject().O = "Amazon Web Services"
req.get_subject().OU = "Partner Solution Architecture"
req.set_pubkey(key)
req.sign(key, "sha256")
if os.path.exists(csrpath):
print(f"Using existing CSR: {csrpath}")
else:
with open(csrpath, "w") as f:
f.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, req).decode())
print(f"Created CSR: {csrpath}")
return csrpath
def generate_key_and_csr(thing_name, bitlength=4096):
keypath = None
csrpath = os.path.join(CERTS_DIR, thing_name + '.csr.pem')
if os.path.exists(csrpath):
print(f"CSR already exists: {csrpath}")
else:
keypath = generatekey(thing_name, bitlength=bitlength)
csrpath = generatecsr(keypath, thing_name)
return keypath, csrpath
def resolve():
csrpath = 'C:\\projects\\python\\dev\\crypto\\certs\\temp-sensor.csr.pem'
csr = open(csrpath).read()
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
print("req", req)
thing_name = req.get_subject().CN
print("thing", thing_name)
return req
if __name__ == '__main__':
print(sys.argv)
if len(sys.argv) < 2:
exit("Provide thing_name as arg.")
download_root_CA()
generate_key_and_csr(sys.argv[1])