diff --git a/conf/config.json b/conf/config.json index 6dbc4ef..023fe64 100644 --- a/conf/config.json +++ b/conf/config.json @@ -3,5 +3,6 @@ "topics_config": "s3:///topics.json", "token_provider_url": "https://", "token_public_key_url": "https://", - "kafka_bootstrap_server": "localhost:9092" + "kafka_bootstrap_server": "localhost:9092", + "event_bus_arn": "arn:aws:events:" } \ No newline at end of file diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index fc43dee..ffdf1f8 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -41,6 +41,7 @@ aws_session = boto3.Session() aws_s3 = aws_session.resource('s3', verify=False) +aws_eventbridge = boto3.client('events') if CONFIG["topics_config"].startswith("s3://"): name_parts = CONFIG["topics_config"].split('/') @@ -61,6 +62,12 @@ ACCESS = json.load(file) TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] + +if "event_bus_arn" in CONFIG: + EVENT_BUS_ARN = CONFIG["event_bus_arn"] +else: + EVENT_BUS_ARN = "" + logger.info("Loaded configs") token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"] @@ -85,7 +92,7 @@ kafka_producer = Producer(producer_config) logger.info("Initialized kafka producer") -def kafkaWrite(topicName, message): +def kafka_write(topicName, message): logger.info(f"Sending to kafka {topicName}") error = [] kafka_producer.produce(topicName, @@ -94,26 +101,41 @@ def kafkaWrite(topicName, message): callback = lambda err, msg: error.append(err) if err is not None else None) kafka_producer.flush() if error: - logger.error(error) - return 500 - else: - logger.info("OK") - return 202 - -def getApi(): + raise Exception(error) + +def event_bridge_write(topicName, message): + if not EVENT_BUS_ARN: + logger.info("No EventBus Arn - skipping") + return + + logger.info(f"Sending to eventBridge {topicName}") + response = aws_eventbridge.put_events( + Entries=[ + { + "Source": topicName, + 'DetailType': 'JSON', + 'Detail': json.dumps(message), + 'EventBusName': EVENT_BUS_ARN, + } + ] + ) + if response["FailedEntryCount"] > 0: + raise Exception(response) + +def get_api(): return { "statusCode": 200, "body": API } -def getToken(): +def get_token(): logger.info("Handling GET Token") return { "statusCode": 303, "headers": {"Location": TOKEN_PROVIDER_URL} } -def getTopics(): +def get_topics(): logger.info("Handling GET Topics") return { "statusCode": 200, @@ -121,7 +143,7 @@ def getTopics(): "body": json.dumps([topicName for topicName in TOPICS]) } -def getTopicSchema(topicName): +def get_topic_schema(topicName): logger.info(f"Handling GET TopicSchema({topicName})") if topicName not in TOPICS: return { "statusCode": 404 } @@ -132,7 +154,7 @@ def getTopicSchema(topicName): "body": json.dumps(TOPICS[topicName]) } -def postTopicMessage(topicName, topicMessage, tokenEncoded): +def post_topic_message(topicName, topicMessage, tokenEncoded): logger.info(f"Handling POST {topicName}") try: token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"]) @@ -159,21 +181,35 @@ def postTopicMessage(topicName, topicMessage, tokenEncoded): "body": e.message } - return {"statusCode": kafkaWrite(topicName, topicMessage)} + wasError = False + try: + kafka_write(topicName, topicMessage) + except Exception as e: + logger.error(str(e)) + wasError = True + try: + event_bridge_write(topicName, topicMessage) + except Exception as e: + logger.error(str(e)) + wasError = True + if wasError: + return {"statusCode": 500} + else: + return {"statusCode": 202} def lambda_handler(event, context): try: if event["resource"].lower() == "/api": - return getApi() + return get_api() if event["resource"].lower() == "/token": - return getToken() + return get_token() if event["resource"].lower() == "/topics": - return getTopics() + return get_topics() if event["resource"].lower() == "/topics/{topic_name}": if event["httpMethod"] == "GET": - return getTopicSchema(event["pathParameters"]["topic_name"].lower()) + return get_topic_schema(event["pathParameters"]["topic_name"].lower()) if event["httpMethod"] == "POST": - return postTopicMessage(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), event["headers"]["bearer"]) + return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), event["headers"]["bearer"]) if event["resource"].lower() == "/terminate": sys.exit("TERMINATING") return {"statusCode": 404}