forked from TonyYoun77/Embedded_SW_Team_2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathExample_code
More file actions
155 lines (131 loc) · 5.15 KB
/
Example_code
File metadata and controls
155 lines (131 loc) · 5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import time
import json
import subprocess
import cv2
import numpy as np
import requests
import RPi.GPIO as GPIO
import tflite_runtime.interpreter as tflite
# === 센서 핀 ===
PIR_PIN = 17
# === 경로 설정 ===
BASE_DIR = "/home/pi/security_project"
MODEL_PATH = os.path.join(BASE_DIR, "yolov8n.tflite")
RECORD_DIR = os.path.join(BASE_DIR, "recordings")
os.makedirs(RECORD_DIR, exist_ok=True)
# === 위험 클래스 정의 ===
RISK_CLASSES = {"knife", "gun", "fire", "person"} # person은 예시로 포함
# YOLOv8 COCO 클래스 일부 (필요시 전체 목록 넣기)
COCO_CLASSES = [
"person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck",
"boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
"bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra",
"giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
"skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove",
"skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork",
"knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli",
"carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant",
"bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote",
"keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator",
"book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush"
]
# === Seafile 설정 ===
SEAFILE_UPLOAD_URL = "https://your-seafile-server.com/seafhttp/upload-api-url"
SEAFILE_TOKEN = "your-token"
# === GPIO 설정 ===
GPIO.setmode(GPIO.BCM)
GPIO.setup(PIR_PIN, GPIO.IN)
# === 밝기 감지 함수 (LDR + ADC 없을 경우 더미 처리) ===
def is_dark():
return True # 실제 조도 센서 추가 시 구현
# === 센서 트리거 조건 ===
def sensor_triggered():
return GPIO.input(PIR_PIN) == GPIO.HIGH and is_dark()
# === 녹화 ===
def record_video(path, duration=10):
command = [
'ffmpeg',
'-y',
'-f', 'v4l2', '-framerate', '20', '-video_size', '640x480', '-i', '/dev/video0',
'-f', 'alsa', '-i', 'plughw:1',
'-c:v', 'libx264', '-preset', 'ultrafast',
'-c:a', 'aac',
'-t', str(duration),
path
]
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# === TFLite YOLO 위험 감지 ===
def detect_risk_tflite(video_path):
interpreter = tflite.Interpreter(model_path=MODEL_PATH)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
cap = cv2.VideoCapture(video_path)
risk_detected = False
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 전처리: 640x640, normalize
img = cv2.resize(frame, (640, 640))
img = img / 255.0
img = img.astype(np.float32)
input_tensor = np.expand_dims(img, axis=0)
interpreter.set_tensor(input_details[0]['index'], input_tensor)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index']) # [1, N, 6]
detections = output_data[0]
for det in detections:
conf = det[4]
cls_id = int(det[5])
if conf > 0.4:
label = COCO_CLASSES[cls_id]
print(f"감지: {label} ({conf:.2f})")
if label in RISK_CLASSES:
risk_detected = True
break
if risk_detected:
break
cap.release()
return risk_detected
# === Seafile 업로드 ===
def upload_to_seafile(file_path):
headers = {'Authorization': f'Token {SEAFILE_TOKEN}'}
files = {'file': open(file_path, 'rb')}
data = {'parent_dir': '/'}
response = requests.post(SEAFILE_UPLOAD_URL, headers=headers, data=data, files=files)
return response.status_code == 200
# === 메인 루프 ===
def main():
print("✅ 시스템 시작: 센서 감지 대기 중...")
while True:
if sensor_triggered():
timestamp = time.strftime('%Y-%m-%d_%H-%M-%S')
folder = os.path.join(RECORD_DIR, timestamp)
os.makedirs(folder, exist_ok=True)
video_path = os.path.join(folder, "video.mp4")
flag_path = os.path.join(folder, "flag.json")
print("🎬 녹화 중...")
record_video(video_path)
print("🔍 위험 분석 중...")
risk = detect_risk_tflite(video_path)
if risk:
with open(flag_path, 'w') as f:
json.dump({"timestamp": timestamp, "risk": True}, f)
print("⚠️ 위험 플래그 저장 완료")
print("☁️ Seafile 업로드 중...")
upload_to_seafile(video_path)
if risk:
upload_to_seafile(flag_path)
print("✅ 처리 완료. 대기 중...\n")
time.sleep(3)
else:
time.sleep(1)
# === 실행 ===
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
GPIO.cleanup()
print("\n🛑 종료됨.")