-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathingestion_lambda.py
More file actions
52 lines (45 loc) · 1.98 KB
/
Copy pathingestion_lambda.py
File metadata and controls
52 lines (45 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#Importing libraries
import requests
import json
import boto3
from datetime import datetime
import os
#Get the base date and time
def get_current_datetime():
return datetime.now().strftime('%Y-%m-%d %H:%M')
#Get the response
def get_response():
url = "https://weatherapi-com.p.rapidapi.com/current.json"
querystring = {"q":"Nigeria"}
headers = {
"X-RapidAPI-Key": os.environ.get('RAPID_API_KEY'),
"X-RapidAPI-Host": "weatherapi-com.p.rapidapi.com"}
response = requests.request("GET", url, headers=headers, params=querystring)
return response
#Get events in json format
def get_values(response):
event=json.loads(response.content.decode('utf8'))
push_event={'name':event['location']['name'],'region':event['location']['name'],
'country':event['location']['country'],'latitude':event['location']['lat'],
'longitude':event['location']['lon'],'local_time':event['location']['localtime'],
'last_updated_time':event['current']['last_updated'],'temp_c':event['current']['temp_c'],
'condition':event['current']['condition']['text'],'wind_mph':event['current']['wind_mph'],
'wind_dir':event['current']['wind_dir'],'humidity':event['current']['humidity'],'cloud':event['current']['cloud'],
'pressure_mb':event['current']['pressure_mb'],'precip_mm':event['current']['precip_mm']}
return bytes(json.dumps(push_event).encode('utf8'))
#Get the s3 client using Boto3's client api
def get_s3_client():
s3=boto3.client('s3',aws_access_key_id=os.environ.get('ACCESS_KEY'),
aws_secret_access_key=os.environ.get('SECRET_KEY'),region_name='us-east-1')
return s3
#Upload files to s3
def upload_to_s3(client):
response=client.put_object(Body=get_values(get_response()),Bucket='weather-api-raw-s3',Key=f'{get_current_datetime()}.json')
#Run the ingestion pipeline
def main():
upload_to_s3(get_s3_client())
return f'Successfully uploaded {str(get_values(get_response()))}'
def lambda_handler(event, context):
# TODO implement
main()
print(event)