API réception alertes chute (SmartEye/YOLO), analyse IA (Gemini 2.5 Flash), gestion alertes avec escalade (watchdog), notifications Firebase, dashboard web, documentation MkDocs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
451 lines
17 KiB
Python
Executable File
451 lines
17 KiB
Python
Executable File
import warnings
|
|
# ON FAIT TAIRE LES AVERTISSEMENTS POUR QUE LE JSON SOIT PROPRE
|
|
warnings.filterwarnings("ignore")
|
|
import sys
|
|
import json
|
|
import os
|
|
# Import pour Google AI (Gemini) - NOUVEAU SDK
|
|
from google import genai
|
|
from google.genai import types
|
|
# Import pour Firebase (Notifications App)
|
|
import firebase_admin
|
|
from firebase_admin import credentials, messaging
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
# Import du gestionnaire d'alertes
|
|
from alert_manager import (
|
|
creer_alerte,
|
|
compter_alertes_actives,
|
|
AlertStatus
|
|
)
|
|
|
|
# --- CONFIGURATION VIA config_loader ---
|
|
from config_loader import get_config, get_client_info, get_client_fcm_tokens
|
|
|
|
_cfg = get_config()
|
|
API_KEY = _cfg.get("gemini", "api_key")
|
|
|
|
# --- INITIALISATION FIREBASE ---
|
|
if not firebase_admin._apps:
|
|
try:
|
|
firebase_cred_path = _cfg.get("firebase", "credentials_path", fallback="/var/www/lucas/admin_key.json")
|
|
cred = credentials.Certificate(firebase_cred_path)
|
|
firebase_admin.initialize_app(cred)
|
|
except Exception as e:
|
|
pass
|
|
|
|
# --- CONFIGURATION GEMINI (NOUVEAU SDK) ---
|
|
client = genai.Client(api_key=API_KEY)
|
|
|
|
generation_config = types.GenerateContentConfig(
|
|
temperature=0.1,
|
|
top_p=0.95,
|
|
top_k=40,
|
|
max_output_tokens=512,
|
|
response_mime_type="application/json",
|
|
safety_settings=[
|
|
types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"),
|
|
types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"),
|
|
types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"),
|
|
types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"),
|
|
]
|
|
)
|
|
|
|
# --- CHARGEMENT INFOS CLIENT DEPUIS DATABASE.JSON ---
|
|
def charger_infos_client():
|
|
"""Charge les infos du client depuis database.json via LUCAS_CLIENT_ID (name)"""
|
|
client_id = os.environ.get("LUCAS_CLIENT_ID", "")
|
|
if not client_id:
|
|
return {}
|
|
return get_client_info(client_id)
|
|
|
|
def deriver_camera_urls():
|
|
"""Dérive camera_urls (liste RTSP pour grille vidéo) depuis camera_map"""
|
|
camera_map_str = os.environ.get("SMARTEYE_CAMERA_MAP", "{}")
|
|
rtsp_url = os.environ.get("SMARTEYE_RTSP_URL", "")
|
|
try:
|
|
camera_map = json.loads(camera_map_str)
|
|
# camera_map = {"rtsp://...:8554/12": "192.168.1.143", ...}
|
|
# On extrait toutes les URLs RTSP sauf celle de la caméra principale
|
|
urls = [url for url in camera_map.keys() if url != rtsp_url]
|
|
return json.dumps(urls)
|
|
except Exception:
|
|
return "[]"
|
|
|
|
# --- FONCTIONS UTILITAIRES POUR JSON ---
|
|
def nettoyer_json_response(text_response):
|
|
"""Nettoie agressivement la réponse pour extraire du JSON valide"""
|
|
try:
|
|
cleaned = text_response.strip()
|
|
if cleaned.startswith("```json"):
|
|
cleaned = cleaned[7:]
|
|
elif cleaned.startswith("```"):
|
|
cleaned = cleaned[3:]
|
|
if cleaned.endswith("```"):
|
|
cleaned = cleaned[:-3]
|
|
cleaned = cleaned.strip()
|
|
start = cleaned.find('{')
|
|
end = cleaned.rfind('}') + 1
|
|
if start != -1 and end > start:
|
|
cleaned = cleaned[start:end]
|
|
cleaned = ''.join(char for char in cleaned if ord(char) >= 32 or char in '\n\r\t')
|
|
return cleaned.strip()
|
|
except Exception as e:
|
|
return text_response
|
|
|
|
def extraire_json_intelligent(text_response):
|
|
"""Tente plusieurs stratégies pour extraire du JSON valide"""
|
|
strategies = []
|
|
strategies.append(nettoyer_json_response(text_response))
|
|
try:
|
|
start = text_response.find('{')
|
|
end = text_response.rfind('}') + 1
|
|
if start != -1 and end > start:
|
|
strategies.append(text_response[start:end])
|
|
except:
|
|
pass
|
|
try:
|
|
lines = text_response.split('\n')
|
|
json_lines = [l for l in lines if '{' in l or '}' in l or '"' in l]
|
|
if json_lines:
|
|
strategies.append(''.join(json_lines))
|
|
except:
|
|
pass
|
|
for strategy_text in strategies:
|
|
try:
|
|
result = json.loads(strategy_text)
|
|
if isinstance(result, dict):
|
|
return result
|
|
except:
|
|
continue
|
|
return None
|
|
|
|
def valider_reponse_ia(result):
|
|
"""Valide que la réponse IA contient les champs requis"""
|
|
if not isinstance(result, dict):
|
|
return False
|
|
if "urgence" not in result:
|
|
return False
|
|
if not isinstance(result["urgence"], bool):
|
|
if str(result["urgence"]).lower() in ["true", "1", "yes", "oui"]:
|
|
result["urgence"] = True
|
|
elif str(result["urgence"]).lower() in ["false", "0", "no", "non"]:
|
|
result["urgence"] = False
|
|
else:
|
|
return False
|
|
if "confiance" not in result:
|
|
result["confiance"] = 70
|
|
if "message" not in result:
|
|
result["message"] = "Analyse effectuée"
|
|
return True
|
|
|
|
# --- FONCTION D'ALERTE APP ---
|
|
def envoyer_alerte_app(image_path, alert_id):
|
|
"""
|
|
Envoie une notification à TOUS les tokens FCM du client via Firebase.
|
|
Payload dynamique : infos client depuis database.json + camera_urls dérivé.
|
|
"""
|
|
try:
|
|
client_id = os.environ.get("LUCAS_CLIENT_ID", "")
|
|
fcm_tokens = get_client_fcm_tokens(client_id)
|
|
if not fcm_tokens:
|
|
return
|
|
|
|
domain = _cfg.get("server", "domain", fallback="https://lucas.unigest.fr")
|
|
url_image = f"{domain}/{image_path}"
|
|
|
|
# Charger infos client dynamiquement
|
|
client_info = charger_infos_client()
|
|
senior_name = client_info.get("senior_name", client_info.get("name", "Senior"))
|
|
senior_nickname = client_info.get("senior_nickname", senior_name)
|
|
senior_photo_rel = client_info.get("senior_photo", "photos/mamie.jpg")
|
|
senior_photo = f"{domain}/{senior_photo_rel}" if senior_photo_rel else f"{domain}/photos/mamie.jpg"
|
|
senior_lat = str(client_info.get("latitude", ""))
|
|
senior_lon = str(client_info.get("longitude", ""))
|
|
emergency_number = client_info.get("emergency_number", "15")
|
|
|
|
# Dériver camera_urls depuis camera_map
|
|
camera_urls = deriver_camera_urls()
|
|
|
|
now = datetime.now()
|
|
timeline_standing = (now - timedelta(seconds=45)).strftime("%H:%M:%S")
|
|
timeline_fall_detected = (now - timedelta(seconds=30)).strftime("%H:%M:%S")
|
|
timeline_immobile = (now - timedelta(seconds=15)).strftime("%H:%M:%S")
|
|
timeline_alert_sent = now.strftime("%H:%M:%S")
|
|
|
|
data_payload = {
|
|
"urgence": "true",
|
|
"type": "chute",
|
|
"alert_id": alert_id,
|
|
"is_reminder": "false",
|
|
"escalation_level": "0",
|
|
"senior_name": senior_name,
|
|
"senior_nickname": senior_nickname,
|
|
"senior_photo_url": senior_photo,
|
|
"emergency_number": emergency_number,
|
|
"timeline_standing": timeline_standing,
|
|
"timeline_fall_detected": timeline_fall_detected,
|
|
"timeline_immobile": timeline_immobile,
|
|
"timeline_alert_sent": timeline_alert_sent,
|
|
"senior_latitude": senior_lat,
|
|
"senior_longitude": senior_lon,
|
|
"image_url": url_image,
|
|
"rtsp_url": os.environ.get("SMARTEYE_RTSP_URL", ""),
|
|
"camera_urls": camera_urls,
|
|
"audio_relay_ws": os.environ.get("SMARTEYE_AUDIO_WS", ""),
|
|
"camera_ip_map": os.environ.get("SMARTEYE_CAMERA_MAP", "{}"),
|
|
"click_action": "FLUTTER_NOTIFICATION_CLICK",
|
|
}
|
|
|
|
# Envoyer à chaque token FCM enregistré pour ce client
|
|
for token in fcm_tokens:
|
|
try:
|
|
message = messaging.Message(
|
|
notification=messaging.Notification(
|
|
title="ALERTE CHUTE",
|
|
body="Appuyez pour voir la preuve photo",
|
|
),
|
|
android=messaging.AndroidConfig(
|
|
priority='high',
|
|
notification=messaging.AndroidNotification(
|
|
channel_id='fall_alert_channel',
|
|
priority='high',
|
|
default_sound=True,
|
|
default_vibrate_timings=True,
|
|
visibility='public',
|
|
),
|
|
),
|
|
data=data_payload,
|
|
token=token,
|
|
)
|
|
messaging.send(message)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
pass
|
|
|
|
# --- FONCTION D'ANALYSE AVEC RETRY ---
|
|
def appeler_gemini_avec_retry(image_path, prompt, max_retries=3):
|
|
"""Appelle Gemini avec retry automatique et backoff exponentiel"""
|
|
import time
|
|
|
|
with open(image_path, "rb") as f:
|
|
image_bytes = f.read()
|
|
|
|
last_error = None
|
|
last_response_text = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = client.models.generate_content(
|
|
model="gemini-2.5-flash",
|
|
contents=[
|
|
types.Part(text=prompt),
|
|
types.Part(inline_data=types.Blob(mime_type="image/jpeg", data=image_bytes))
|
|
],
|
|
config=generation_config
|
|
)
|
|
|
|
if not response or not response.text:
|
|
raise ValueError("Réponse vide de Gemini")
|
|
|
|
if hasattr(response, 'candidates') and response.candidates:
|
|
candidate = response.candidates[0]
|
|
finish_reason = getattr(candidate, 'finish_reason', None)
|
|
if finish_reason and 'STOP' not in str(finish_reason):
|
|
raise ValueError(f"Réponse incomplète: {finish_reason}")
|
|
|
|
text_response = response.text.strip()
|
|
last_response_text = text_response
|
|
|
|
result = extraire_json_intelligent(text_response)
|
|
|
|
if result and valider_reponse_ia(result):
|
|
return {"success": True, "data": result, "attempt": attempt + 1}
|
|
|
|
raise json.JSONDecodeError(
|
|
"JSON invalide après extraction intelligente",
|
|
text_response, 0
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
last_error = e
|
|
try:
|
|
with open("/tmp/smarteye_gemini_errors.log", 'a') as f:
|
|
f.write(f"\n--- TENTATIVE {attempt + 1}/{max_retries} - {datetime.now()} ---\n")
|
|
f.write(f"JSONDecodeError: {str(e)}\n")
|
|
if last_response_text:
|
|
f.write(f"Réponse brute: {repr(last_response_text[:500])}\n")
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
try:
|
|
with open("/tmp/smarteye_gemini_errors.log", 'a') as f:
|
|
f.write(f"\n--- TENTATIVE {attempt + 1}/{max_retries} - {datetime.now()} ---\n")
|
|
f.write(f"Erreur {type(e).__name__}: {str(e)}\n")
|
|
if last_response_text:
|
|
f.write(f"Réponse brute: {repr(last_response_text[:500])}\n")
|
|
except:
|
|
pass
|
|
|
|
if attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt)
|
|
time.sleep(wait_time)
|
|
|
|
return {
|
|
"success": False,
|
|
"error": last_error,
|
|
"error_type": type(last_error).__name__ if last_error else "UnknownError",
|
|
"last_response": last_response_text[:200] if last_response_text else None
|
|
}
|
|
|
|
# --- FONCTION D'ANALYSE ---
|
|
def analyze_image(image_path):
|
|
if not os.path.exists(image_path):
|
|
return {"urgence": False, "message": "Erreur : Image introuvable"}
|
|
|
|
prompt = """Caméra de surveillance au domicile d'une personne âgée vivant seule. Le détecteur de chute a déclenché une alerte. Analyse cette image.
|
|
|
|
URGENCE (true) : personne au sol, affalée, inerte, en détresse, position anormale.
|
|
PAS URGENCE (false) : pièce vide, personne debout, assise sur meuble (chaise/canapé/lit), accroupie volontairement, en train de se pencher.
|
|
|
|
En cas de doute réel entre chute et activité normale → true.
|
|
|
|
Réponds en JSON : {"urgence": bool, "confiance": int 0-100, "message": "description courte"}"""
|
|
|
|
try:
|
|
api_result = appeler_gemini_avec_retry(image_path, prompt, max_retries=3)
|
|
|
|
if api_result["success"]:
|
|
result = api_result["data"]
|
|
|
|
# Gemini a répondu → reset du compteur d'erreurs consécutives
|
|
try:
|
|
error_state_file = "/tmp/smarteye_error_state.json"
|
|
if os.path.exists(error_state_file):
|
|
os.remove(error_state_file)
|
|
except:
|
|
pass
|
|
|
|
# --- DÉCLENCHEUR AVEC GESTION D'ALERTE ---
|
|
if result.get("urgence") is True:
|
|
|
|
# Anti-doublon : ne pas créer 10 alertes si la personne est toujours au sol
|
|
alertes_actives = compter_alertes_actives()
|
|
if alertes_actives >= 3:
|
|
# Trop d'alertes actives non traitées → ne pas en rajouter
|
|
result["message"] += " [alerte existante en cours]"
|
|
return result
|
|
|
|
# Générer un ID unique pour cette alerte
|
|
alert_id = str(uuid.uuid4())
|
|
|
|
# 1. Envoyer la notification Firebase (avec l'alert_id)
|
|
envoyer_alerte_app(image_path, alert_id)
|
|
|
|
# 2. Enregistrer l'alerte comme PENDING dans le gestionnaire
|
|
creer_alerte(
|
|
alert_id=alert_id,
|
|
image_path=image_path,
|
|
camera_id=os.environ.get("SMARTEYE_CAMERA_ID", "cam_default"),
|
|
audio_relay_ws=os.environ.get("SMARTEYE_AUDIO_WS", "ws://57.128.74.87:8800"),
|
|
camera_ip_map=os.environ.get("SMARTEYE_CAMERA_MAP", "{}"),
|
|
analyse_data=result
|
|
)
|
|
|
|
# 3. Ajouter l'alert_id au résultat pour traçabilité
|
|
result["alert_id"] = alert_id
|
|
|
|
return result
|
|
else:
|
|
raise Exception(f"{api_result['error_type']}: Échec après 3 tentatives")
|
|
|
|
except Exception as e:
|
|
import time
|
|
import traceback
|
|
|
|
error_state_file = "/tmp/smarteye_error_state.json"
|
|
now = time.time()
|
|
error_type = type(e).__name__
|
|
|
|
# Log vers fichier dédié (pas stderr, car api.php utilise 2>&1)
|
|
try:
|
|
with open("/tmp/lucas_gemini_errors.log", 'a') as logf:
|
|
logf.write(f"\n{'='*50}\n")
|
|
logf.write(f"[{datetime.now()}] {error_type}: {e}\n")
|
|
logf.write(f"Image: {image_path}\n")
|
|
traceback.print_exc(file=logf)
|
|
except:
|
|
pass
|
|
|
|
# --- Charger l'état des erreurs ---
|
|
error_state = {"consecutive": 0, "first_error": now, "last_alert_sent": 0}
|
|
try:
|
|
if os.path.exists(error_state_file):
|
|
with open(error_state_file, 'r') as f:
|
|
error_state = json.loads(f.read())
|
|
except:
|
|
pass
|
|
|
|
# Reset si dernière erreur date de plus d'1h (Gemini avait récupéré)
|
|
if now - error_state.get("first_error", now) > 3600:
|
|
error_state = {"consecutive": 0, "first_error": now, "last_alert_sent": 0}
|
|
|
|
error_state["consecutive"] = error_state.get("consecutive", 0) + 1
|
|
error_state["last_error"] = now
|
|
if error_state["consecutive"] == 1:
|
|
error_state["first_error"] = now
|
|
|
|
# --- Sauvegarder l'état ---
|
|
try:
|
|
with open(error_state_file, 'w') as f:
|
|
json.dump(error_state, f)
|
|
except:
|
|
pass
|
|
|
|
consecutive = error_state["consecutive"]
|
|
last_alert = error_state.get("last_alert_sent", 0)
|
|
cooldown = 1800 # 30 minutes entre deux alertes d'erreur
|
|
|
|
# --- SÉCURITÉ : Gemini n'a PAS analysé l'image → TOUJOURS alerter ---
|
|
# Principe fondateur : mieux vaut 10 fausses alertes qu'une chute ratée.
|
|
# Anti-doublon : on ne spamme pas si des alertes sont déjà actives.
|
|
alert_id = str(uuid.uuid4())
|
|
alertes_actives = compter_alertes_actives()
|
|
|
|
if alertes_actives < 3:
|
|
envoyer_alerte_app(image_path, alert_id)
|
|
creer_alerte(
|
|
alert_id=alert_id,
|
|
image_path=image_path,
|
|
camera_id=os.environ.get("SMARTEYE_CAMERA_ID", "cam_default"),
|
|
audio_relay_ws=os.environ.get("SMARTEYE_AUDIO_WS", "ws://57.128.74.87:8800"),
|
|
camera_ip_map=os.environ.get("SMARTEYE_CAMERA_MAP", "{}"),
|
|
analyse_data={"error": True, "error_type": error_type, "consecutive": consecutive}
|
|
)
|
|
error_state["last_alert_sent"] = now
|
|
try:
|
|
with open(error_state_file, 'w') as f:
|
|
json.dump(error_state, f)
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"urgence": True,
|
|
"confiance": 30,
|
|
"message": f"⚠️ Erreur IA ({error_type}), alerte de sécurité par précaution",
|
|
"alert_id": alert_id
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print(json.dumps({"urgence": False, "message": "No image provided"}))
|
|
sys.exit(1)
|
|
|
|
image_path = sys.argv[1]
|
|
result = analyze_image(image_path)
|
|
print(json.dumps(result))
|