-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path2.py
More file actions
71 lines (62 loc) · 3.19 KB
/
2.py
File metadata and controls
71 lines (62 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from kafka import KafkaConsumer, KafkaProducer
import datetime as dt
import json
import pandas as pd
import numpy as np
import joblib
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
from xgboost.spark import SparkXGBClassifierModel
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import lit, col
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
COLUMNS_LIST = ["StartTime", "Dur", "TotPkts", "TotBytes", "SrcBytes", "Proto_tcp", "Proto_udp", "Dir_one",
"sTosone", "Proto_others", "Dir_others", "Proto", "SrcAddr", "Sport", "Dir", "DstAddr",
"Dport", "State", "sTos", "dTos", "Label"]
def doingpotty(df):
model_save_path = "Model/pyspark"
loaded_model = SparkXGBClassifierModel.load(model_save_path)
feature_cols = ['Dur', 'TotPkts', 'TotBytes', 'SrcBytes', 'Proto_tcp', 'Proto_udp', 'Dir_one', 'sTosone', 'Proto_others', 'Dir_others']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_new = assembler.transform(df)
selected_cols = ["StartTime", "features", "Label"]
df_test = df_new.select(selected_cols)
df_test = df_test.withColumn("Label", col("Label").cast('int'))
predictions = loaded_model.transform(df_test)
evaluator = MulticlassClassificationEvaluator(
labelCol="Label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
predictions = predictions.select(["StartTime", "prediction"])
predictions = predictions.withColumn("prediction", col("prediction").cast('int'))
df_finalboob = df.join(predictions, on='StartTime')
final_cols = ["StartTime", "Dur", "TotPkts", "TotBytes", "SrcBytes", "Proto_tcp", "Proto_udp", "Dir_one",
"sTosone", "Proto_others", "Dir_others", "Proto", "SrcAddr", "Sport", "Dir", "DstAddr",
"Dport", "State", "sTos", "dTos", "prediction", "Label"]
df_finalboob = df_finalboob.select(final_cols)
return df_finalboob
consumer = KafkaConsumer('logsprocessed',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
"""----------------------------------------------------------------------------------------------------------------"""
for message in consumer:
rows = [Row(**{key: value[str(idx)] for key, value in message.value.items()}) for idx in range(len(message.value['StartTime']))]
spark = SparkSession.builder.appName("example").getOrCreate()
df = spark.createDataFrame(rows)
combined_df = doingpotty(df)
row = combined_df.first()
row_dict = row.asDict()
data = json.dumps(row_dict, default=str).encode('utf-8')
print(data)
print("-" * 75)
producer.send(topic="logslabelled", value=data)