import warnings # ON FAIT TAIRE LES AVERTISSEMENTS POUR QUE LE JSON SOIT PROPRE warnings.filterwarnings("ignore") import sys import json import os # Import pour Google AI (Gemini) - NOUVEAU SDK from google import genai from google.genai import types # Import pour Firebase (Notifications App) import firebase_admin from firebase_admin import credentials, messaging import uuid from datetime import datetime, timedelta # Import du gestionnaire d'alertes from alert_manager import ( creer_alerte, compter_alertes_actives, AlertStatus ) # --- CONFIGURATION VIA config_loader --- from config_loader import get_config, get_client_info, get_client_fcm_tokens _cfg = get_config() API_KEY = _cfg.get("gemini", "api_key") # --- INITIALISATION FIREBASE --- if not firebase_admin._apps: try: firebase_cred_path = _cfg.get("firebase", "credentials_path", fallback="/var/www/lucas/admin_key.json") cred = credentials.Certificate(firebase_cred_path) firebase_admin.initialize_app(cred) except Exception as e: pass # --- CONFIGURATION GEMINI (NOUVEAU SDK) --- client = genai.Client(api_key=API_KEY) generation_config = types.GenerateContentConfig( temperature=0.1, top_p=0.95, top_k=40, max_output_tokens=512, response_mime_type="application/json", safety_settings=[ types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"), types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"), types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"), types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"), ] ) # --- CHARGEMENT INFOS CLIENT DEPUIS DATABASE.JSON --- def charger_infos_client(): """Charge les infos du client depuis database.json via LUCAS_CLIENT_ID (name)""" client_id = os.environ.get("LUCAS_CLIENT_ID", "") if not client_id: return {} return get_client_info(client_id) def deriver_camera_urls(): """Dérive camera_urls (liste RTSP pour grille vidéo) depuis camera_map""" camera_map_str = os.environ.get("SMARTEYE_CAMERA_MAP", "{}") rtsp_url = os.environ.get("SMARTEYE_RTSP_URL", "") try: camera_map = json.loads(camera_map_str) # camera_map = {"rtsp://...:8554/12": "192.168.1.143", ...} # On extrait toutes les URLs RTSP sauf celle de la caméra principale urls = [url for url in camera_map.keys() if url != rtsp_url] return json.dumps(urls) except Exception: return "[]" # --- FONCTIONS UTILITAIRES POUR JSON --- def nettoyer_json_response(text_response): """Nettoie agressivement la réponse pour extraire du JSON valide""" try: cleaned = text_response.strip() if cleaned.startswith("```json"): cleaned = cleaned[7:] elif cleaned.startswith("```"): cleaned = cleaned[3:] if cleaned.endswith("```"): cleaned = cleaned[:-3] cleaned = cleaned.strip() start = cleaned.find('{') end = cleaned.rfind('}') + 1 if start != -1 and end > start: cleaned = cleaned[start:end] cleaned = ''.join(char for char in cleaned if ord(char) >= 32 or char in '\n\r\t') return cleaned.strip() except Exception as e: return text_response def extraire_json_intelligent(text_response): """Tente plusieurs stratégies pour extraire du JSON valide""" strategies = [] strategies.append(nettoyer_json_response(text_response)) try: start = text_response.find('{') end = text_response.rfind('}') + 1 if start != -1 and end > start: strategies.append(text_response[start:end]) except: pass try: lines = text_response.split('\n') json_lines = [l for l in lines if '{' in l or '}' in l or '"' in l] if json_lines: strategies.append(''.join(json_lines)) except: pass for strategy_text in strategies: try: result = json.loads(strategy_text) if isinstance(result, dict): return result except: continue return None def valider_reponse_ia(result): """Valide que la réponse IA contient les champs requis""" if not isinstance(result, dict): return False if "urgence" not in result: return False if not isinstance(result["urgence"], bool): if str(result["urgence"]).lower() in ["true", "1", "yes", "oui"]: result["urgence"] = True elif str(result["urgence"]).lower() in ["false", "0", "no", "non"]: result["urgence"] = False else: return False if "confiance" not in result: result["confiance"] = 70 if "message" not in result: result["message"] = "Analyse effectuée" return True # --- FONCTION D'ALERTE APP --- def envoyer_alerte_app(image_path, alert_id): """ Envoie une notification à TOUS les tokens FCM du client via Firebase. Payload dynamique : infos client depuis database.json + camera_urls dérivé. """ try: client_id = os.environ.get("LUCAS_CLIENT_ID", "") fcm_tokens = get_client_fcm_tokens(client_id) if not fcm_tokens: return domain = _cfg.get("server", "domain", fallback="https://lucas.unigest.fr") url_image = f"{domain}/{image_path}" # Charger infos client dynamiquement client_info = charger_infos_client() senior_name = client_info.get("senior_name", client_info.get("name", "Senior")) senior_nickname = client_info.get("senior_nickname", senior_name) senior_photo_rel = client_info.get("senior_photo", "photos/mamie.jpg") senior_photo = f"{domain}/{senior_photo_rel}" if senior_photo_rel else f"{domain}/photos/mamie.jpg" senior_lat = str(client_info.get("latitude", "")) senior_lon = str(client_info.get("longitude", "")) emergency_number = client_info.get("emergency_number", "15") # Dériver camera_urls depuis camera_map camera_urls = deriver_camera_urls() now = datetime.now() timeline_standing = (now - timedelta(seconds=45)).strftime("%H:%M:%S") timeline_fall_detected = (now - timedelta(seconds=30)).strftime("%H:%M:%S") timeline_immobile = (now - timedelta(seconds=15)).strftime("%H:%M:%S") timeline_alert_sent = now.strftime("%H:%M:%S") data_payload = { "urgence": "true", "type": "chute", "alert_id": alert_id, "is_reminder": "false", "escalation_level": "0", "senior_name": senior_name, "senior_nickname": senior_nickname, "senior_photo_url": senior_photo, "emergency_number": emergency_number, "timeline_standing": timeline_standing, "timeline_fall_detected": timeline_fall_detected, "timeline_immobile": timeline_immobile, "timeline_alert_sent": timeline_alert_sent, "senior_latitude": senior_lat, "senior_longitude": senior_lon, "image_url": url_image, "rtsp_url": os.environ.get("SMARTEYE_RTSP_URL", ""), "camera_urls": camera_urls, "audio_relay_ws": os.environ.get("SMARTEYE_AUDIO_WS", ""), "camera_ip_map": os.environ.get("SMARTEYE_CAMERA_MAP", "{}"), "click_action": "FLUTTER_NOTIFICATION_CLICK", } # Envoyer à chaque token FCM enregistré pour ce client for token in fcm_tokens: try: message = messaging.Message( notification=messaging.Notification( title="ALERTE CHUTE", body="Appuyez pour voir la preuve photo", ), android=messaging.AndroidConfig( priority='high', notification=messaging.AndroidNotification( channel_id='fall_alert_channel', priority='high', default_sound=True, default_vibrate_timings=True, visibility='public', ), ), data=data_payload, token=token, ) messaging.send(message) except Exception: pass except Exception as e: pass # --- FONCTION D'ANALYSE AVEC RETRY --- def appeler_gemini_avec_retry(image_path, prompt, max_retries=3): """Appelle Gemini avec retry automatique et backoff exponentiel""" import time with open(image_path, "rb") as f: image_bytes = f.read() last_error = None last_response_text = None for attempt in range(max_retries): try: response = client.models.generate_content( model="gemini-2.5-flash", contents=[ types.Part(text=prompt), types.Part(inline_data=types.Blob(mime_type="image/jpeg", data=image_bytes)) ], config=generation_config ) if not response or not response.text: raise ValueError("Réponse vide de Gemini") if hasattr(response, 'candidates') and response.candidates: candidate = response.candidates[0] finish_reason = getattr(candidate, 'finish_reason', None) if finish_reason and 'STOP' not in str(finish_reason): raise ValueError(f"Réponse incomplète: {finish_reason}") text_response = response.text.strip() last_response_text = text_response result = extraire_json_intelligent(text_response) if result and valider_reponse_ia(result): return {"success": True, "data": result, "attempt": attempt + 1} raise json.JSONDecodeError( "JSON invalide après extraction intelligente", text_response, 0 ) except json.JSONDecodeError as e: last_error = e try: with open("/tmp/smarteye_gemini_errors.log", 'a') as f: f.write(f"\n--- TENTATIVE {attempt + 1}/{max_retries} - {datetime.now()} ---\n") f.write(f"JSONDecodeError: {str(e)}\n") if last_response_text: f.write(f"Réponse brute: {repr(last_response_text[:500])}\n") except: pass except Exception as e: last_error = e try: with open("/tmp/smarteye_gemini_errors.log", 'a') as f: f.write(f"\n--- TENTATIVE {attempt + 1}/{max_retries} - {datetime.now()} ---\n") f.write(f"Erreur {type(e).__name__}: {str(e)}\n") if last_response_text: f.write(f"Réponse brute: {repr(last_response_text[:500])}\n") except: pass if attempt < max_retries - 1: wait_time = (2 ** attempt) time.sleep(wait_time) return { "success": False, "error": last_error, "error_type": type(last_error).__name__ if last_error else "UnknownError", "last_response": last_response_text[:200] if last_response_text else None } # --- FONCTION D'ANALYSE --- def analyze_image(image_path): if not os.path.exists(image_path): return {"urgence": False, "message": "Erreur : Image introuvable"} prompt = """Caméra de surveillance au domicile d'une personne âgée vivant seule. Le détecteur de chute a déclenché une alerte. Analyse cette image. URGENCE (true) : personne au sol, affalée, inerte, en détresse, position anormale. PAS URGENCE (false) : pièce vide, personne debout, assise sur meuble (chaise/canapé/lit), accroupie volontairement, en train de se pencher. En cas de doute réel entre chute et activité normale → true. Réponds en JSON : {"urgence": bool, "confiance": int 0-100, "message": "description courte"}""" try: api_result = appeler_gemini_avec_retry(image_path, prompt, max_retries=3) if api_result["success"]: result = api_result["data"] # Gemini a répondu → reset du compteur d'erreurs consécutives try: error_state_file = "/tmp/smarteye_error_state.json" if os.path.exists(error_state_file): os.remove(error_state_file) except: pass # --- DÉCLENCHEUR AVEC GESTION D'ALERTE --- if result.get("urgence") is True: # Anti-doublon : ne pas créer 10 alertes si la personne est toujours au sol alertes_actives = compter_alertes_actives() if alertes_actives >= 3: # Trop d'alertes actives non traitées → ne pas en rajouter result["message"] += " [alerte existante en cours]" return result # Générer un ID unique pour cette alerte alert_id = str(uuid.uuid4()) # 1. Envoyer la notification Firebase (avec l'alert_id) envoyer_alerte_app(image_path, alert_id) # 2. Enregistrer l'alerte comme PENDING dans le gestionnaire creer_alerte( alert_id=alert_id, image_path=image_path, camera_id=os.environ.get("SMARTEYE_CAMERA_ID", "cam_default"), audio_relay_ws=os.environ.get("SMARTEYE_AUDIO_WS", "ws://57.128.74.87:8800"), camera_ip_map=os.environ.get("SMARTEYE_CAMERA_MAP", "{}"), analyse_data=result ) # 3. Ajouter l'alert_id au résultat pour traçabilité result["alert_id"] = alert_id return result else: raise Exception(f"{api_result['error_type']}: Échec après 3 tentatives") except Exception as e: import time import traceback error_state_file = "/tmp/smarteye_error_state.json" now = time.time() error_type = type(e).__name__ # Log vers fichier dédié (pas stderr, car api.php utilise 2>&1) try: with open("/tmp/lucas_gemini_errors.log", 'a') as logf: logf.write(f"\n{'='*50}\n") logf.write(f"[{datetime.now()}] {error_type}: {e}\n") logf.write(f"Image: {image_path}\n") traceback.print_exc(file=logf) except: pass # --- Charger l'état des erreurs --- error_state = {"consecutive": 0, "first_error": now, "last_alert_sent": 0} try: if os.path.exists(error_state_file): with open(error_state_file, 'r') as f: error_state = json.loads(f.read()) except: pass # Reset si dernière erreur date de plus d'1h (Gemini avait récupéré) if now - error_state.get("first_error", now) > 3600: error_state = {"consecutive": 0, "first_error": now, "last_alert_sent": 0} error_state["consecutive"] = error_state.get("consecutive", 0) + 1 error_state["last_error"] = now if error_state["consecutive"] == 1: error_state["first_error"] = now # --- Sauvegarder l'état --- try: with open(error_state_file, 'w') as f: json.dump(error_state, f) except: pass consecutive = error_state["consecutive"] last_alert = error_state.get("last_alert_sent", 0) cooldown = 1800 # 30 minutes entre deux alertes d'erreur # --- SÉCURITÉ : Gemini n'a PAS analysé l'image → TOUJOURS alerter --- # Principe fondateur : mieux vaut 10 fausses alertes qu'une chute ratée. # Anti-doublon : on ne spamme pas si des alertes sont déjà actives. alert_id = str(uuid.uuid4()) alertes_actives = compter_alertes_actives() if alertes_actives < 3: envoyer_alerte_app(image_path, alert_id) creer_alerte( alert_id=alert_id, image_path=image_path, camera_id=os.environ.get("SMARTEYE_CAMERA_ID", "cam_default"), audio_relay_ws=os.environ.get("SMARTEYE_AUDIO_WS", "ws://57.128.74.87:8800"), camera_ip_map=os.environ.get("SMARTEYE_CAMERA_MAP", "{}"), analyse_data={"error": True, "error_type": error_type, "consecutive": consecutive} ) error_state["last_alert_sent"] = now try: with open(error_state_file, 'w') as f: json.dump(error_state, f) except: pass return { "urgence": True, "confiance": 30, "message": f"⚠️ Erreur IA ({error_type}), alerte de sécurité par précaution", "alert_id": alert_id } if __name__ == "__main__": if len(sys.argv) < 2: print(json.dumps({"urgence": False, "message": "No image provided"})) sys.exit(1) image_path = sys.argv[1] result = analyze_image(image_path) print(json.dumps(result))