From 785e70aa96db3876a7fc06a7831536fa3400d255 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Thu, 15 May 2025 15:14:49 +0200 Subject: [PATCH 01/12] fixing topics --- conf/config.json | 1 - conf/topic_dlchange.json | 55 ++++++++++++++++++ conf/topic_runs.json | 73 ++++++++++++++++++++++++ conf/topics.json | 119 --------------------------------------- src/event_gate_lambda.py | 30 +++++----- 5 files changed, 143 insertions(+), 135 deletions(-) create mode 100644 conf/topic_dlchange.json create mode 100644 conf/topic_runs.json delete mode 100644 conf/topics.json diff --git a/conf/config.json b/conf/config.json index 023fe64..84ab9f8 100644 --- a/conf/config.json +++ b/conf/config.json @@ -1,6 +1,5 @@ { "access_config": "s3:///access.json", - "topics_config": "s3:///topics.json", "token_provider_url": "https://", "token_public_key_url": "https://", "kafka_bootstrap_server": "localhost:9092", diff --git a/conf/topic_dlchange.json b/conf/topic_dlchange.json new file mode 100644 index 0000000..7cd7c22 --- /dev/null +++ b/conf/topic_dlchange.json @@ -0,0 +1,55 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID)" + }, + "tenant_id": { + "type": "string", + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": "string", + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "number", + "description": "Timestamp of the event in epoch milliseconds" + }, + "catalog_id": { + "type": "string", + "description": "Identifier for the data definition (Glue/Hive) database and table name for example " + }, + "operation": { + "type": "string", + "enum": ["overwrite", "append", "archive", "delete"], + "description": "Operation performed" + }, + "location": { + "type": "string", + "description": "Location of the data" + }, + "format": { + "type": "string", + "description": "Format of the data (parquet, delta, crunch, etc)." + }, + "format_options": { + "type": "object", + "description": "When possible, add additional options related to the format" + }, + "additional_info": { + "type": "object", + "description": "Optional additional fields structured as an inner JSON" + } + }, + "required": ["event_id", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_event", "catalog_id", "operation", "format"] +} diff --git a/conf/topic_runs.json b/conf/topic_runs.json new file mode 100644 index 0000000..a13b6f5 --- /dev/null +++ b/conf/topic_runs.json @@ -0,0 +1,73 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID), generated for each unique event, for de-duplication purposes" + }, + "job_ref": { + "type": "string", + "description": "Identifier of the job in it’s respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, + "tenant_id ": { + "type": "string", + "description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)" + }, + "source_app": { + "type": "string", + "description": "Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_start": { + "type": "number", + "description": "Start timestamp of the run in epoch milliseconds" + }, + "timestamp_end": { + "type": "number", + "description": "End timestamp of the run in epoch milliseconds" + }, + "jobs": { + "type": "array", + "description": "List of individual jobs withing the run", + "items": { + "type": "object", + "properties": { + "catalog_id": { + "type": "string", + "description": "Identifier for the data definition (Glue/Hive) database and table name for example" + }, + "status": { + "type": "string", + "enum": ["succeeded", "failed", "killed", "skipped"], + "description": "Status of the job." + }, + "timestamp_start": { + "type": "number", + "description": "Start timestamp of a job that is a part of a run in epoch milliseconds" + }, + "timestamp_end": { + "type": "number", + "description": "End timestamp of a job that is a part of a run in epoch milliseconds" + }, + "message": { + "type": "string", + "description": "Job status/error message." + }, + "additional_info": { + "type": "object", + "description": "Optional additional fields structured as an inner JSON" + } + }, + "required": ["catalog_id", "status", "timestamp_start", "timestamp_end"] + } + } + }, + "required": ["event_id", "job_ref", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_start", "timestamp_end", "jobs"] +} diff --git a/conf/topics.json b/conf/topics.json deleted file mode 100644 index 2f83383..0000000 --- a/conf/topics.json +++ /dev/null @@ -1,119 +0,0 @@ -{ - "run.topic": { - "type": "object", - "properties": { - "event_id": { - "type": "string", - "description": "Unique identifier for the event (GUID), generated for each unique event, for de-duplication purposes" - }, - "job_ref": { - "type": "string", - "description": "Identifier of the job in it’s respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." - }, - "tenant_id ": { - "type": "string", - "description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)" - }, - "source_app": { - "type": "string", - "description": "Standardized source application name (aqueduct, unify, lum, etc)" - }, - "source_app_version": { - "type": "string", - "description": "Source application version (SemVer preferred)" - }, - "environment": { - "type": "string", - "description": "Environment (dev, uat, pre-prod, prod, test or others)" - }, - "timestamp_start": { - "type": "number", - "description": "Start timestamp of the run in epoch milliseconds" - }, - "timestamp_end": { - "type": "number", - "description": "End timestamp of the run in epoch milliseconds" - }, - "jobs": { - "type": "array", - "description": "List of individual jobs withing the run", - "element_type": "object", - "object_schema": { - "catalog_id": { - "type": "string", - "description": "Identifier for the data definition (Glue/Hive) database and table name for example" - }, - "status": { - "type": "string", - "enum": ["succeeded", "failed", "killed", "skipped"], - "description": "Status of the job." - }, - "timestamp_start": { - "type": "number", - "description": "Start timestamp of a job that is a part of a run in epoch milliseconds" - }, - "timestamp_end": { - "type": "number", - "description": "End timestamp of a job that is a part of a run in epoch milliseconds" - }, - "message": { - "type": "string", - "description": "Job status/error message." - } - } - } - }, - "required": ["event_id", "job_ref", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_start", "timestamp_end", "jobs"] - }, - "edla.change.topic": { - "type": "object", - "properties": { - "event_id": { - "type": "string", - "description": "Unique identifier for the event (GUID)" - }, - "tenant_id": { - "type": "string", - "description": "Application ID or ServiceNow identifier" - }, - "source_app": { - "type": "string", - "description": " Standardized source application name (aqueduct, unify, lum, etc)" - }, - "source_app_version": { - "type": "string", - "description": "Source application version (SemVer preferred)" - }, - "environment": { - "type": "string", - "description": "Environment (dev, uat, pre-prod, prod, test or others)" - }, - "timestamp_event": { - "type": "number", - "description": "Timestamp of the event in epoch milliseconds" - }, - "catalog_id": { - "type": "string", - "description": "Identifier for the data definition (Glue/Hive) database and table name for example " - }, - "operation": { - "type": "string", - "enum": ["overwrite", "append", "archive", "delete"], - "description": "Operation performed" - }, - "location": { - "type": "string", - "description": "Location of the data" - }, - "format": { - "type": "string", - "description": "Format of the data (parquet, delta, crunch, etc)." - }, - "format_options": { - "type": "object", - "description": "When possible, add additional options related to the format" - } - }, - "required": ["event_id", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_event", "catalog_id", "operation", "format"] - } -} diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 5345566..709b113 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -34,25 +34,27 @@ log_level = os.environ.get('LOG_LEVEL', 'INFO') logger.setLevel(log_level) logger.addHandler(logging.StreamHandler()) +logger.debug("Initialized LOGGER") with open("conf/api.yaml", "r") as file: API = file.read() +logger.debug("Loaded API definition") + +TOPICS = {} +with open("conf/topic_runs.json", "r") as file: + TOPICS["public.cps.za.runs"] = json.load(file) +with open("conf/topic_dlchange.json", "r") as file: + TOPICS["public.cps.za.dlchange"] = json.load(file) +logger.debug("Loaded TOPICS") with open("conf/config.json", "r") as file: CONFIG = json.load(file) +logger.debug("Loaded main CONFIG") aws_session = boto3.Session() aws_s3 = aws_session.resource('s3', verify=False) aws_eventbridge = boto3.client('events') - -if CONFIG["topics_config"].startswith("s3://"): - name_parts = CONFIG["topics_config"].split('/') - bucket_name = name_parts[2] - bucket_object = "/".join(name_parts[3:]) - TOPICS = json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object).get()["Body"].read().decode("utf-8")) -else: - with open(CONFIG["topics_config"], "r") as file: - TOPICS = json.load(file) +logger.debug("Initialized AWS Clients") if CONFIG["access_config"].startswith("s3://"): name_parts = CONFIG["access_config"].split('/') @@ -62,19 +64,17 @@ else: with open(CONFIG["access_config"], "r") as file: ACCESS = json.load(file) +logger.debug("Loaded ACCESS definitions") -TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] - if "event_bus_arn" in CONFIG: EVENT_BUS_ARN = CONFIG["event_bus_arn"] else: EVENT_BUS_ARN = "" - -logger.debug("Loaded configs") +TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"] TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded)) -logger.debug("Loaded token public key") +logger.debug("Loaded TOKEN_PUBLIC_KEY") producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]} if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG: @@ -92,7 +92,7 @@ logger.debug("producer will use SASL_SSL") kafka_producer = Producer(producer_config) -logger.debug("Initialized kafka producer") +logger.debug("Initialized KAFKA producer") def kafka_write(topicName, message): logger.debug(f"Sending to kafka {topicName}") From c95f41b761f445d1da4f4e8bcb9ad6fad86971c0 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Fri, 16 May 2025 14:36:54 +0200 Subject: [PATCH 02/12] edla postgres writer --- src/event_gate_lambda.py | 78 ++++++++++++++++++++++++++++++++++++++++ src/requirements.txt | 3 +- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 709b113..a882354 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -26,6 +26,7 @@ from jsonschema.exceptions import ValidationError import jwt import requests +import psycopg2 import boto3 from confluent_kafka import Producer @@ -124,6 +125,78 @@ def event_bridge_write(topicName, message): if response["FailedEntryCount"] > 0: raise Exception(response) +def postgres_edla_write(cursor, table, message): + cursor.execute(f""" + INSERT INTO {table} + ( + event_id, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_event, + catalog_id, + operation, + "location", + "format", + format_options, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_event"], + message["catalog_id"], + message["operation"], + message["location"] if "location" in message else None, + message["format"], + json.dumps(message["format_options"]) if "format_options" in message else None, + json.dumps(message["additional_info"] if "additional_info" in message else None) + ) + ) + +def postgres_run_write(message): + pass + +def postgres_write(topicName, message): + if not POSTGRES: + logger.debug("No Postgress - skipping") + return + + with psycopg2.connect( + database=POSTGRES["database"], + host=POSTGRES["host"], + user=POSTGRES["user"], + password=POSTGRES["password"], + port=POSTGRESS["port"] + ) as connection: + with connection.cursor() as cursor: + if topicName == "public.cps.za.dlchange": + postgres_edla_write(cursor, "public_cps_za_dlchange", message) + else if topic == "public.cps.za.runs" + postgres_run_write(cursor, "public_cps_za_runs", message) + else: + raise Exception(f"unknown topic for postgres {topicName}") + + connection.commit() + def get_api(): return { "statusCode": 200, @@ -194,6 +267,11 @@ def post_topic_message(topicName, topicMessage, tokenEncoded): except Exception as e: logger.error(str(e)) wasError = True + try: + postgress_write(topicName, topicMessage) + except Exception as e: + logger.error(str(e)) + wasError = True if wasError: return {"statusCode": 500} else: diff --git a/src/requirements.txt b/src/requirements.txt index 6411363..89ddd0e 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -4,4 +4,5 @@ jsonschema PyJWT requests boto3 -confluent_kafka \ No newline at end of file +confluent_kafka +psycopg2 \ No newline at end of file From 76bf64b501140edae6316b13c8cce651b89bc823 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Mon, 19 May 2025 08:44:59 +0200 Subject: [PATCH 03/12] syntax fix --- src/event_gate_lambda.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index a882354..9d17d44 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -190,7 +190,7 @@ def postgres_write(topicName, message): with connection.cursor() as cursor: if topicName == "public.cps.za.dlchange": postgres_edla_write(cursor, "public_cps_za_dlchange", message) - else if topic == "public.cps.za.runs" + elif topic == "public.cps.za.runs": postgres_run_write(cursor, "public_cps_za_runs", message) else: raise Exception(f"unknown topic for postgres {topicName}") @@ -268,7 +268,7 @@ def post_topic_message(topicName, topicMessage, tokenEncoded): logger.error(str(e)) wasError = True try: - postgress_write(topicName, topicMessage) + postgres_write(topicName, topicMessage) except Exception as e: logger.error(str(e)) wasError = True From 4e1fe9c0d7f02689267ed591e1ff469571e30738 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Mon, 19 May 2025 09:39:05 +0200 Subject: [PATCH 04/12] postgres config --- src/event_gate_lambda.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 9d17d44..d800918 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -32,7 +32,7 @@ from confluent_kafka import Producer logger = logging.getLogger(__name__) -log_level = os.environ.get('LOG_LEVEL', 'INFO') +log_level = os.environ.get("LOG_LEVEL", "INFO") logger.setLevel(log_level) logger.addHandler(logging.StreamHandler()) logger.debug("Initialized LOGGER") @@ -66,11 +66,8 @@ with open(CONFIG["access_config"], "r") as file: ACCESS = json.load(file) logger.debug("Loaded ACCESS definitions") - -if "event_bus_arn" in CONFIG: - EVENT_BUS_ARN = CONFIG["event_bus_arn"] -else: - EVENT_BUS_ARN = "" + +EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else "" TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"] @@ -91,10 +88,19 @@ "ssl.key.password": CONFIG["kafka_ssl_key_password"] }) logger.debug("producer will use SASL_SSL") - kafka_producer = Producer(producer_config) logger.debug("Initialized KAFKA producer") +POSTGRES = { + "host": os.environ.get("POSTGRES_HOST", ""), + "port": os.environ.get("POSTGRES_PORT", ""), + "user": os.environ.get("POSTGRES_USER", ""), + "password": os.environ.get("POSTGRES_PASSWORD", ""), + "database": os.environ.get("POSTGRES_DATABASE", "") +} + +logger.debug("Loaded POSTGRES parameters") + def kafka_write(topicName, message): logger.debug(f"Sending to kafka {topicName}") error = [] @@ -126,6 +132,7 @@ def event_bridge_write(topicName, message): raise Exception(response) def postgres_edla_write(cursor, table, message): + logger.debug(f"Sending to Postgres - {table}") cursor.execute(f""" INSERT INTO {table} ( @@ -176,7 +183,7 @@ def postgres_run_write(message): pass def postgres_write(topicName, message): - if not POSTGRES: + if not POSTGRES["database"]: logger.debug("No Postgress - skipping") return @@ -185,7 +192,7 @@ def postgres_write(topicName, message): host=POSTGRES["host"], user=POSTGRES["user"], password=POSTGRES["password"], - port=POSTGRESS["port"] + port=POSTGRES["port"] ) as connection: with connection.cursor() as cursor: if topicName == "public.cps.za.dlchange": From 7bff6a992fba400b3ba7119daf3ea1eb3d78adca Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Mon, 19 May 2025 09:47:26 +0200 Subject: [PATCH 05/12] up to date configs/notebook --- conf/access.json | 4 ++-- scripts/notebook.ipynb | 37 ++++++++++++++++++++++++++++++------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/conf/access.json b/conf/access.json index a401f38..9329d15 100644 --- a/conf/access.json +++ b/conf/access.json @@ -1,8 +1,8 @@ { - "run.topic": [ + "public.cps.za.runs": [ "FooBarUser" ], - "edla.change.topic": [ + "public.cps.za.dlchange": [ "FooUser", "BarUser" ] diff --git a/scripts/notebook.ipynb b/scripts/notebook.ipynb index ccdb3b8..ab6e1b6 100644 --- a/scripts/notebook.ipynb +++ b/scripts/notebook.ipynb @@ -3,16 +3,32 @@ { "cell_type": "code", "execution_count": null, - "id": "e8e37945-f8c5-4769-bdde-226edeb8465f", + "id": "b0ddfccc-0a08-4c37-89c0-fa144ef516e3", "metadata": {}, "outputs": [], "source": [ - "%load_ext autoreload\n", - "%autoreload 2\n", + "# Set postgres variables\n", + "import os\n", "\n", + "os.environ[\"POSTGRES_HOST\"] = \"\"\n", + "os.environ[\"POSTGRES_PORT\"] = \"5432\"\n", + "os.environ[\"POSTGRES_USER\"] = \"postgres\"\n", + "os.environ[\"POSTGRES_PASSWORD\"] = \"postgres\"\n", + "os.environ[\"POSTGRES_DATABASE\"] = \"postgres\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e8e37945-f8c5-4769-bdde-226edeb8465f", + "metadata": {}, + "outputs": [], + "source": [ + "# Load lambda core\n", "# Jump out of the \"Scripts\" box for the lambda source\n", "import sys\n", "import os\n", + "os.environ[\"LOG_LEVEL\"] = \"DEBUG\"\n", "parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))\n", "sys.path.insert(0, parent_dir)\n", "if not os.path.exists('src'):\n", @@ -27,7 +43,8 @@ "metadata": {}, "outputs": [], "source": [ - "jwtToken = \"eyJhb...\"" + "# Set token for querying lambda\n", + "jwtToken = \"eyJhb\"" ] }, { @@ -37,6 +54,7 @@ "metadata": {}, "outputs": [], "source": [ + "# GET API\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/api\"\n", @@ -50,6 +68,7 @@ "metadata": {}, "outputs": [], "source": [ + "# GET TOKEN => path to token source\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/token\"\n", @@ -63,6 +82,7 @@ "metadata": {}, "outputs": [], "source": [ + "# GET TOPICS\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/topics\"\n", @@ -76,10 +96,11 @@ "metadata": {}, "outputs": [], "source": [ + "# GET TOPIC SCHEMA\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/topics/{topic_name}\",\n", - " \"pathParameters\": {\"topic_name\": \"run.topic\"}\n", + " \"pathParameters\": {\"topic_name\": \"public.cps.za.dlchange\"}\n", "}, {})" ] }, @@ -90,17 +111,18 @@ "metadata": {}, "outputs": [], "source": [ + "# POST MESSAGE\n", "import json\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"POST\",\n", " \"resource\": \"/topics/{topic_name}\",\n", - " \"pathParameters\": {\"topic_name\": \"edla.change.topic\"},\n", + " \"pathParameters\": {\"topic_name\": \"public.cps.za.dlchange\"},\n", " \"headers\": {\"bearer\": jwtToken},\n", " \"body\": json.dumps({\n", " \"event_id\": \"JupyterEventId\",\n", " \"tenant_id\": \"JupyterTenantId\",\n", " \"source_app\": \"JupyterSrc\",\n", - " \"source_app_version\": \"v2024-10-17\",\n", + " \"source_app_version\": \"v2025-05-19\",\n", " \"environment\": \"JupyterEnv\",\n", " \"timestamp_event\": 1729602770000,\n", " \"catalog_id\": \"TestCatalog\",\n", @@ -119,6 +141,7 @@ "metadata": {}, "outputs": [], "source": [ + "# CYCLE LAMBDA ENVIRONMENT\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"POST\",\n", " \"resource\": \"/terminate\"\n", From 8da21be2262644ad4c6604fd7fcb57085cca07a7 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Mon, 19 May 2025 13:04:15 +0200 Subject: [PATCH 06/12] runs postgres formating --- src/event_gate_lambda.py | 75 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index d800918..fa02127 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -175,12 +175,77 @@ def postgres_edla_write(cursor, table, message): message["location"] if "location" in message else None, message["format"], json.dumps(message["format_options"]) if "format_options" in message else None, - json.dumps(message["additional_info"] if "additional_info" in message else None) + json.dumps(message["additional_info"]) if "additional_info" in message else None ) ) -def postgres_run_write(message): - pass +def postgres_run_write(cursor, table_runs, table_jobs, message): + logger.debug(f"Sending to Postgres - {table_runs} and {table_jobs}") + cursor.execute(f""" + INSERT INTO {table_runs} + ( + event_id, + job_ref, + tenant_id, + soure_app, + source_app_version, + environment, + timestamp_start, + timestamp_end + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["job_ref"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_start"], + message["timestamp_end"] + ) + ) + + for job in message["jobs"]: + cursor.execute(f""" + INSERT INTO {table_jobs} + ( + event_id, + catalog_id, + status, + timestamp_start, + timestamp_end, + message, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + job["catalog_id"], + job["status"], + job["timestamp_start"], + job["timestamp_end"], + job["message"] if "message" in job else None, + json.dumps(job["additional_info"]) if "additional_info" in job else None + ) + ) def postgres_write(topicName, message): if not POSTGRES["database"]: @@ -197,8 +262,8 @@ def postgres_write(topicName, message): with connection.cursor() as cursor: if topicName == "public.cps.za.dlchange": postgres_edla_write(cursor, "public_cps_za_dlchange", message) - elif topic == "public.cps.za.runs": - postgres_run_write(cursor, "public_cps_za_runs", message) + elif topicName == "public.cps.za.runs": + postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) else: raise Exception(f"unknown topic for postgres {topicName}") From d9512012bdf831849c76ab5411e2a6471d16c9ec Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 20 May 2025 10:23:29 +0200 Subject: [PATCH 07/12] docker fix --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 494f094..7e2c05e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ # test via (provide payload): # curl http://localhost:9000/2015-03-31/functions/function/invocations -d "{payload}" # -# Deploy to AWS Lambda via ACR +# Deploy to AWS Lambda via ECR FROM --platform=linux/arm64 public.ecr.aws/lambda/python:3.13-arm64 @@ -53,7 +53,7 @@ RUN \ echo "###################" && \ echo "### pip installs ###" && \ echo "###################" && \ - pip install requests==2.31.0 urllib3==1.26.18 setuptools cryptography jsonschema PyJWT && \ + pip install requests==2.31.0 urllib3==1.26.18 setuptools cryptography jsonschema PyJWT psycopg2-binary && \ echo "######################" && \ echo "### confluent-kafka ###" && \ echo "######################" && \ From 17cb04fec772e70359f240d4985eb23ed5c328b6 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 20 May 2025 13:28:54 +0200 Subject: [PATCH 08/12] split to more files --- src/event_gate_lambda.py | 246 +++----------------------------------- src/writer_eventbridge.py | 34 ++++++ src/writer_kafka.py | 40 +++++++ src/writer_postgres.py | 157 ++++++++++++++++++++++++ 4 files changed, 250 insertions(+), 227 deletions(-) create mode 100644 src/writer_eventbridge.py create mode 100644 src/writer_kafka.py create mode 100644 src/writer_postgres.py diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index fa02127..a5870fe 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -13,23 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os import base64 import json import logging +import os import sys import urllib3 -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +import jwt +import requests from cryptography.hazmat.primitives import serialization from jsonschema import validate from jsonschema.exceptions import ValidationError -import jwt -import requests -import psycopg2 -import boto3 -from confluent_kafka import Producer +import writer_eventbridge +import writer_kafka +import writer_postgres + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger(__name__) log_level = os.environ.get("LOG_LEVEL", "INFO") @@ -52,10 +53,8 @@ CONFIG = json.load(file) logger.debug("Loaded main CONFIG") -aws_session = boto3.Session() -aws_s3 = aws_session.resource('s3', verify=False) -aws_eventbridge = boto3.client('events') -logger.debug("Initialized AWS Clients") +aws_s3 = boto3.Session().resource('s3', verify=False) +logger.debug("Initialized AWS S3 Client") if CONFIG["access_config"].startswith("s3://"): name_parts = CONFIG["access_config"].split('/') @@ -67,207 +66,14 @@ ACCESS = json.load(file) logger.debug("Loaded ACCESS definitions") -EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else "" - TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"] TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded)) logger.debug("Loaded TOKEN_PUBLIC_KEY") -producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]} -if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG: - producer_config.update({ - "security.protocol": "SASL_SSL", - "sasl.mechanism": "GSSAPI", - "sasl.kerberos.service.name": "kafka", - "sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"], - "sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"], - "ssl.ca.location": CONFIG["kafka_ssl_ca_path"], - "ssl.certificate.location": CONFIG["kafka_ssl_cert_path"], - "ssl.key.location": CONFIG["kafka_ssl_key_path"], - "ssl.key.password": CONFIG["kafka_ssl_key_password"] - }) - logger.debug("producer will use SASL_SSL") -kafka_producer = Producer(producer_config) -logger.debug("Initialized KAFKA producer") - -POSTGRES = { - "host": os.environ.get("POSTGRES_HOST", ""), - "port": os.environ.get("POSTGRES_PORT", ""), - "user": os.environ.get("POSTGRES_USER", ""), - "password": os.environ.get("POSTGRES_PASSWORD", ""), - "database": os.environ.get("POSTGRES_DATABASE", "") -} - -logger.debug("Loaded POSTGRES parameters") - -def kafka_write(topicName, message): - logger.debug(f"Sending to kafka {topicName}") - error = [] - kafka_producer.produce(topicName, - key="", - value=json.dumps(message).encode("utf-8"), - callback = lambda err, msg: error.append(err) if err is not None else None) - kafka_producer.flush() - if error: - raise Exception(error) - -def event_bridge_write(topicName, message): - if not EVENT_BUS_ARN: - logger.debug("No EventBus Arn - skipping") - return - - logger.debug(f"Sending to eventBridge {topicName}") - response = aws_eventbridge.put_events( - Entries=[ - { - "Source": topicName, - 'DetailType': 'JSON', - 'Detail': json.dumps(message), - 'EventBusName': EVENT_BUS_ARN, - } - ] - ) - if response["FailedEntryCount"] > 0: - raise Exception(response) - -def postgres_edla_write(cursor, table, message): - logger.debug(f"Sending to Postgres - {table}") - cursor.execute(f""" - INSERT INTO {table} - ( - event_id, - tenant_id, - source_app, - source_app_version, - environment, - timestamp_event, - catalog_id, - operation, - "location", - "format", - format_options, - additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", ( - message["event_id"], - message["tenant_id"], - message["source_app"], - message["source_app_version"], - message["environment"], - message["timestamp_event"], - message["catalog_id"], - message["operation"], - message["location"] if "location" in message else None, - message["format"], - json.dumps(message["format_options"]) if "format_options" in message else None, - json.dumps(message["additional_info"]) if "additional_info" in message else None - ) - ) - -def postgres_run_write(cursor, table_runs, table_jobs, message): - logger.debug(f"Sending to Postgres - {table_runs} and {table_jobs}") - cursor.execute(f""" - INSERT INTO {table_runs} - ( - event_id, - job_ref, - tenant_id, - soure_app, - source_app_version, - environment, - timestamp_start, - timestamp_end - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", ( - message["event_id"], - message["job_ref"], - message["tenant_id"], - message["source_app"], - message["source_app_version"], - message["environment"], - message["timestamp_start"], - message["timestamp_end"] - ) - ) - - for job in message["jobs"]: - cursor.execute(f""" - INSERT INTO {table_jobs} - ( - event_id, - catalog_id, - status, - timestamp_start, - timestamp_end, - message, - additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", ( - message["event_id"], - job["catalog_id"], - job["status"], - job["timestamp_start"], - job["timestamp_end"], - job["message"] if "message" in job else None, - json.dumps(job["additional_info"]) if "additional_info" in job else None - ) - ) - -def postgres_write(topicName, message): - if not POSTGRES["database"]: - logger.debug("No Postgress - skipping") - return - - with psycopg2.connect( - database=POSTGRES["database"], - host=POSTGRES["host"], - user=POSTGRES["user"], - password=POSTGRES["password"], - port=POSTGRES["port"] - ) as connection: - with connection.cursor() as cursor: - if topicName == "public.cps.za.dlchange": - postgres_edla_write(cursor, "public_cps_za_dlchange", message) - elif topicName == "public.cps.za.runs": - postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) - else: - raise Exception(f"unknown topic for postgres {topicName}") - - connection.commit() +writer_eventbridge.init() +writer_kafka.init() +writer_postgres.init() def get_api(): return { @@ -328,26 +134,12 @@ def post_topic_message(topicName, topicMessage, tokenEncoded): "body": e.message } - wasError = False - try: - kafka_write(topicName, topicMessage) - except Exception as e: - logger.error(str(e)) - wasError = True - try: - event_bridge_write(topicName, topicMessage) - except Exception as e: - logger.error(str(e)) - wasError = True - try: - postgres_write(topicName, topicMessage) - except Exception as e: - logger.error(str(e)) - wasError = True - if wasError: - return {"statusCode": 500} - else: - return {"statusCode": 202} + success = ( + writer_kafka.write(topicName, topicMessage) and + writer_eventbridge.write(topicName, topicMessage) and + writer_postgres.write(topicName, topicMessage) + ) + return {"statusCode": 202} if success else {"statusCode": 500} def lambda_handler(event, context): try: diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py new file mode 100644 index 0000000..fde3234 --- /dev/null +++ b/src/writer_eventbridge.py @@ -0,0 +1,34 @@ +import json + +import boto3 + +def init(): + aws_eventbridge = boto3.client('events') + EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else "" + logger.debug("Initialized EVENTBRIDGE writer") + +def write(topicName, message): + if not EVENT_BUS_ARN: + logger.debug("No EventBus Arn - skipping") + return True + + try: + logger.debug(f"Sending to eventBridge {topicName}") + response = aws_eventbridge.put_events( + Entries=[ + { + "Source": topicName, + 'DetailType': 'JSON', + 'Detail': json.dumps(message), + 'EventBusName': EVENT_BUS_ARN, + } + ] + ) + if response["FailedEntryCount"] > 0: + logger.error(str(response)) + return False + except Exception as e: + logger.error(str(e)) + return False + + return True diff --git a/src/writer_kafka.py b/src/writer_kafka.py new file mode 100644 index 0000000..07e1671 --- /dev/null +++ b/src/writer_kafka.py @@ -0,0 +1,40 @@ +import json + +import boto3 +from confluent_kafka import Producer + +def init(): + producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]} + if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG: + producer_config.update({ + "security.protocol": "SASL_SSL", + "sasl.mechanism": "GSSAPI", + "sasl.kerberos.service.name": "kafka", + "sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"], + "sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"], + "ssl.ca.location": CONFIG["kafka_ssl_ca_path"], + "ssl.certificate.location": CONFIG["kafka_ssl_cert_path"], + "ssl.key.location": CONFIG["kafka_ssl_key_path"], + "ssl.key.password": CONFIG["kafka_ssl_key_password"] + }) + logger.debug("producer will use SASL_SSL") + kafka_producer = Producer(producer_config) + logger.debug("Initialized KAFKA writer") + + +def write(topicName, message): + try: + logger.debug(f"Sending to kafka {topicName}") + error = [] + kafka_producer.produce(topicName, + key="", + value=json.dumps(message).encode("utf-8"), + callback = lambda err, msg: error.append(err) if err is not None else None) + kafka_producer.flush() + if error: + logger.error(str(e)) + return False + except Exception as e: + return False + + return True diff --git a/src/writer_postgres.py b/src/writer_postgres.py new file mode 100644 index 0000000..92e8549 --- /dev/null +++ b/src/writer_postgres.py @@ -0,0 +1,157 @@ +import psycopg2 + +def init(): + POSTGRES = { + "host": os.environ.get("POSTGRES_HOST", ""), + "port": os.environ.get("POSTGRES_PORT", ""), + "user": os.environ.get("POSTGRES_USER", ""), + "password": os.environ.get("POSTGRES_PASSWORD", ""), + "database": os.environ.get("POSTGRES_DATABASE", "") + } + logger.debug("Initialized POSTGRES writer") + + +def postgres_edla_write(cursor, table, message): + logger.debug(f"Sending to Postgres - {table}") + cursor.execute(f""" + INSERT INTO {table} + ( + event_id, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_event, + catalog_id, + operation, + "location", + "format", + format_options, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_event"], + message["catalog_id"], + message["operation"], + message["location"] if "location" in message else None, + message["format"], + json.dumps(message["format_options"]) if "format_options" in message else None, + json.dumps(message["additional_info"]) if "additional_info" in message else None + ) + ) + +def postgres_run_write(cursor, table_runs, table_jobs, message): + logger.debug(f"Sending to Postgres - {table_runs} and {table_jobs}") + cursor.execute(f""" + INSERT INTO {table_runs} + ( + event_id, + job_ref, + tenant_id, + soure_app, + source_app_version, + environment, + timestamp_start, + timestamp_end + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["job_ref"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_start"], + message["timestamp_end"] + ) + ) + + for job in message["jobs"]: + cursor.execute(f""" + INSERT INTO {table_jobs} + ( + event_id, + catalog_id, + status, + timestamp_start, + timestamp_end, + message, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + job["catalog_id"], + job["status"], + job["timestamp_start"], + job["timestamp_end"], + job["message"] if "message" in job else None, + json.dumps(job["additional_info"]) if "additional_info" in job else None + ) + ) + +def write(topicName, message): + try: + if not POSTGRES["database"]: + logger.debug("No Postgress - skipping") + return + + with psycopg2.connect( + database=POSTGRES["database"], + host=POSTGRES["host"], + user=POSTGRES["user"], + password=POSTGRES["password"], + port=POSTGRES["port"] + ) as connection: + with connection.cursor() as cursor: + if topicName == "public.cps.za.dlchange": + postgres_edla_write(cursor, "public_cps_za_dlchange", message) + elif topicName == "public.cps.za.runs": + postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) + else: + logger.error(f"unknown topic for postgres {topicName}") + return False + + connection.commit() + except Exception as e: + logger.error(str(e)) + return False + + return True From 46823b248d96842c8695072a89d2d544435ba9b8 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 20 May 2025 14:24:05 +0200 Subject: [PATCH 09/12] polishing and fixes --- src/event_gate_lambda.py | 13 +++++++------ src/writer_eventbridge.py | 20 +++++++++++++------- src/writer_kafka.py | 17 +++++++++++------ src/writer_postgres.py | 23 +++++++++++++++-------- 4 files changed, 46 insertions(+), 27 deletions(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index a5870fe..92ad8cf 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -20,15 +20,16 @@ import sys import urllib3 +import boto3 import jwt import requests from cryptography.hazmat.primitives import serialization from jsonschema import validate from jsonschema.exceptions import ValidationError -import writer_eventbridge -import writer_kafka -import writer_postgres +from . import writer_eventbridge +from . import writer_kafka +from . import writer_postgres urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -71,9 +72,9 @@ TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded)) logger.debug("Loaded TOKEN_PUBLIC_KEY") -writer_eventbridge.init() -writer_kafka.init() -writer_postgres.init() +writer_eventbridge.init(logger, CONFIG) +writer_kafka.init(logger, CONFIG) +writer_postgres.init(logger) def get_api(): return { diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py index fde3234..1a9446d 100644 --- a/src/writer_eventbridge.py +++ b/src/writer_eventbridge.py @@ -2,18 +2,24 @@ import boto3 -def init(): +def init(logger, CONFIG): + global _logger + global EVENT_BUS_ARN + global aws_eventbridge + + _logger = logger + aws_eventbridge = boto3.client('events') EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else "" - logger.debug("Initialized EVENTBRIDGE writer") + _logger.debug("Initialized EVENTBRIDGE writer") def write(topicName, message): - if not EVENT_BUS_ARN: - logger.debug("No EventBus Arn - skipping") + if not aws_eventbridge: + _logger.debug("No EventBus Arn - skipping") return True try: - logger.debug(f"Sending to eventBridge {topicName}") + _logger.debug(f"Sending to eventBridge {topicName}") response = aws_eventbridge.put_events( Entries=[ { @@ -25,10 +31,10 @@ def write(topicName, message): ] ) if response["FailedEntryCount"] > 0: - logger.error(str(response)) + _logger.error(str(response)) return False except Exception as e: - logger.error(str(e)) + _logger.error(str(e)) return False return True diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 07e1671..13289d8 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -3,7 +3,12 @@ import boto3 from confluent_kafka import Producer -def init(): +def init(logger, CONFIG): + global _logger + global kafka_producer + + _logger = logger + producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]} if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG: producer_config.update({ @@ -17,14 +22,13 @@ def init(): "ssl.key.location": CONFIG["kafka_ssl_key_path"], "ssl.key.password": CONFIG["kafka_ssl_key_password"] }) - logger.debug("producer will use SASL_SSL") + _logger.debug("producer will use SASL_SSL") kafka_producer = Producer(producer_config) - logger.debug("Initialized KAFKA writer") - + _logger.debug("Initialized KAFKA writer") def write(topicName, message): try: - logger.debug(f"Sending to kafka {topicName}") + _logger.debug(f"Sending to kafka {topicName}") error = [] kafka_producer.produce(topicName, key="", @@ -32,9 +36,10 @@ def write(topicName, message): callback = lambda err, msg: error.append(err) if err is not None else None) kafka_producer.flush() if error: - logger.error(str(e)) + _logger.error(str(error)) return False except Exception as e: + _logger.error(str(e)) return False return True diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 92e8549..86d6593 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -1,6 +1,14 @@ +import json +import os + import psycopg2 -def init(): +def init(logger): + global _logger + global POSTGRES + + _logger = logger + POSTGRES = { "host": os.environ.get("POSTGRES_HOST", ""), "port": os.environ.get("POSTGRES_PORT", ""), @@ -8,11 +16,10 @@ def init(): "password": os.environ.get("POSTGRES_PASSWORD", ""), "database": os.environ.get("POSTGRES_DATABASE", "") } - logger.debug("Initialized POSTGRES writer") - + _logger.debug("Initialized POSTGRES writer") def postgres_edla_write(cursor, table, message): - logger.debug(f"Sending to Postgres - {table}") + _logger.debug(f"Sending to Postgres - {table}") cursor.execute(f""" INSERT INTO {table} ( @@ -60,7 +67,7 @@ def postgres_edla_write(cursor, table, message): ) def postgres_run_write(cursor, table_runs, table_jobs, message): - logger.debug(f"Sending to Postgres - {table_runs} and {table_jobs}") + _logger.debug(f"Sending to Postgres - {table_runs} and {table_jobs}") cursor.execute(f""" INSERT INTO {table_runs} ( @@ -130,7 +137,7 @@ def postgres_run_write(cursor, table_runs, table_jobs, message): def write(topicName, message): try: if not POSTGRES["database"]: - logger.debug("No Postgress - skipping") + _logger.debug("No Postgress - skipping") return with psycopg2.connect( @@ -146,12 +153,12 @@ def write(topicName, message): elif topicName == "public.cps.za.runs": postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) else: - logger.error(f"unknown topic for postgres {topicName}") + _logger.error(f"unknown topic for postgres {topicName}") return False connection.commit() except Exception as e: - logger.error(str(e)) + _logger.error(str(e)) return False return True From 3d3e824f600360531986f392746f72634569bb7f Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 20 May 2025 15:17:45 +0200 Subject: [PATCH 10/12] postgres config sourced from secrets --- scripts/notebook.ipynb | 13 +++++-------- src/writer_postgres.py | 17 ++++++++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/scripts/notebook.ipynb b/scripts/notebook.ipynb index ab6e1b6..945a0d9 100644 --- a/scripts/notebook.ipynb +++ b/scripts/notebook.ipynb @@ -7,14 +7,11 @@ "metadata": {}, "outputs": [], "source": [ - "# Set postgres variables\n", + "# Set postgres secret\n", "import os\n", "\n", - "os.environ[\"POSTGRES_HOST\"] = \"\"\n", - "os.environ[\"POSTGRES_PORT\"] = \"5432\"\n", - "os.environ[\"POSTGRES_USER\"] = \"postgres\"\n", - "os.environ[\"POSTGRES_PASSWORD\"] = \"postgres\"\n", - "os.environ[\"POSTGRES_DATABASE\"] = \"postgres\"" + "os.environ[\"POSTGRES_SECRET_NAME\"] = \"\"\n", + "os.environ[\"POSTGRES_SECRET_REGION\"] = \"\"" ] }, { @@ -122,14 +119,14 @@ " \"event_id\": \"JupyterEventId\",\n", " \"tenant_id\": \"JupyterTenantId\",\n", " \"source_app\": \"JupyterSrc\",\n", - " \"source_app_version\": \"v2025-05-19\",\n", + " \"source_app_version\": \"v2025-05-20\",\n", " \"environment\": \"JupyterEnv\",\n", " \"timestamp_event\": 1729602770000,\n", " \"catalog_id\": \"TestCatalog\",\n", " \"operation\": \"delete\",\n", " \"location\": \"UnitTest\",\n", " \"format\": \"TestFormat\",\n", - " \"formatOptions\": {\"Foo\" : \"Bar\"}\n", + " \"format_options\": {\"Foo\" : \"Bar\"}\n", " })\n", "}, {})" ] diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 86d6593..cd56488 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -1,6 +1,8 @@ import json import os +import boto3 +from botocore.exceptions import ClientError import psycopg2 def init(logger): @@ -9,13 +11,14 @@ def init(logger): _logger = logger - POSTGRES = { - "host": os.environ.get("POSTGRES_HOST", ""), - "port": os.environ.get("POSTGRES_PORT", ""), - "user": os.environ.get("POSTGRES_USER", ""), - "password": os.environ.get("POSTGRES_PASSWORD", ""), - "database": os.environ.get("POSTGRES_DATABASE", "") - } + secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") + secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") + + if secret_name and secret_region: + aws_secrets = boto3.Session().client(service_name='secretsmanager', region_name=secret_region) + postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] + POSTGRES = json.loads(postgres_secret) + _logger.debug("Initialized POSTGRES writer") def postgres_edla_write(cursor, table, message): From 1df79d531cc52542d4353d6f416ce2977d81fb1b Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 20 May 2025 15:22:06 +0200 Subject: [PATCH 11/12] docker update --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 7e2c05e..726afef 100644 --- a/Dockerfile +++ b/Dockerfile @@ -71,7 +71,7 @@ RUN \ # Lambda and SASL_SSL_Artifacts COPY $SASL_SSL_ARTIFACTS /opt/sasl_ssl_artifacts/ -COPY src/event_gate_lambda.py $LAMBDA_TASK_ROOT +COPY src/ $LAMBDA_TASK_ROOT/ COPY conf $LAMBDA_TASK_ROOT/conf # Mark librdkafka to LD_LIBRARY_PATH From 0d509df8f099fd700aea44887b79fc4072062bb0 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Wed, 21 May 2025 10:12:45 +0200 Subject: [PATCH 12/12] local module import fixes --- src/event_gate_lambda.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 92ad8cf..49eaa06 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -27,9 +27,11 @@ from jsonschema import validate from jsonschema.exceptions import ValidationError -from . import writer_eventbridge -from . import writer_kafka -from . import writer_postgres +sys.path.append(os.path.join(os.path.dirname(__file__))) + +import writer_eventbridge +import writer_kafka +import writer_postgres urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)