mi-proyecto/app/integrations.py

324 lines
12 KiB
Python

from __future__ import annotations
import re
from datetime import datetime
from typing import Any
import requests
from bs4 import BeautifulSoup
from . import db
from .models import ObraSocialCatalog, ObraSocialPageSnapshot
from .utils import hash_payload, safe_json_dumps
GEOREF_BASE = 'https://apis.datos.gob.ar/georef/api'
OBRAS_SOCIALES_URL = 'https://www.sssalud.gob.ar/?page=listRnosc&tipo={tipo}'
class IntegrationError(Exception):
pass
def normalize_space(text: str) -> str:
return re.sub(r'\s+', ' ', (text or '')).strip()
def get_requests_session():
session = requests.Session()
session.headers.update({
'User-Agent': 'BookAppointmentsPro/3.0 (+Flask admin integration)'
})
return session
def parse_categoria_y_origen(html: str):
soup = BeautifulSoup(html, 'html.parser')
categoria = ''
origen = ''
for div in soup.select('div.col-md-8.col-md-offset-2.text-center, div.text-center'):
text = normalize_space(div.get_text(' ', strip=True))
if 'Orígen de datos:' in text or 'Origen de datos:' in text:
b = div.find('b')
if b:
categoria = normalize_space(b.get_text(' ', strip=True))
m = re.search(r'Or[ií]gen de datos:\s*(.+)', text, re.I)
if m:
origen = normalize_space(m.group(1))
break
if not categoria:
header_candidates = soup.find_all(['h1', 'h2', 'h3', 'b'])
for node in header_candidates:
txt = normalize_space(node.get_text(' ', strip=True))
if txt and 'RNAS' not in txt and len(txt) > 8:
categoria = txt
break
return {
'categoria_oficial': categoria,
'origen_datos': origen,
}
def parse_obras_sociales_rows(html: str) -> dict[str, Any]:
soup = BeautifulSoup(html, 'html.parser')
meta = parse_categoria_y_origen(html)
body_text = normalize_space(soup.get_text(' ', strip=True))
status = 'ok'
if 'No se reportan datos' in body_text:
status = 'no_data'
if 'error de conexión' in body_text.lower() or 'error de base' in body_text.lower() or 'error de conexion' in body_text.lower():
status = 'warning'
rows = []
table = soup.find('table')
if table:
trs = table.find_all('tr')
else:
trs = soup.find_all('tr')
for tr in trs:
cells = [normalize_space(td.get_text(' ', strip=True)) for td in tr.find_all(['td', 'th'])]
if len(cells) < 5:
continue
header_join = ' '.join(cells[:5]).lower()
if 'rnas' in header_join and 'denominación' in header_join:
continue
row = {
'rnas': cells[0],
'denominacion': cells[1],
'domicilio': cells[2],
'localidad': cells[3],
'telefono': cells[4] if len(cells) > 4 else '',
'linea_gratuita': cells[5] if len(cells) > 5 else '',
'habilitada_opciones': cells[6] if len(cells) > 6 else '',
}
if row['rnas'] and row['denominacion']:
rows.append(row)
return {
'categoria_oficial': meta['categoria_oficial'],
'origen_datos': meta['origen_datos'],
'status': status,
'rows': rows,
}
def fetch_obras_sociales_tipo(tipo: int) -> dict[str, Any]:
session = get_requests_session()
resp = session.get(OBRAS_SOCIALES_URL.format(tipo=tipo), timeout=30)
resp.raise_for_status()
parsed = parse_obras_sociales_rows(resp.text)
parsed['tipo'] = tipo
return parsed
def sync_obras_sociales(tipo_desde: int = 1, tipo_hasta: int = 20) -> dict[str, Any]:
summary = {
'pages_processed': 0,
'pages_changed': 0,
'rows_new': 0,
'rows_updated': 0,
'pages_with_error': 0,
'messages': [],
}
now = datetime.utcnow()
seen_ids = set()
for tipo in range(tipo_desde, tipo_hasta + 1):
page = ObraSocialPageSnapshot.query.filter_by(tipo=tipo).first()
if not page:
page = ObraSocialPageSnapshot(tipo=tipo)
db.session.add(page)
db.session.flush()
try:
snapshot = fetch_obras_sociales_tipo(tipo)
payload_for_hash = {
'tipo': tipo,
'categoria_oficial': snapshot['categoria_oficial'],
'origen_datos': snapshot['origen_datos'],
'status': snapshot['status'],
'rows': snapshot['rows'],
}
current_hash = hash_payload(payload_for_hash)
changed = page.last_hash != current_hash
if changed:
summary['pages_changed'] += 1
page.categoria_oficial = snapshot['categoria_oficial']
page.origen_datos = snapshot['origen_datos']
page.status = snapshot['status']
page.row_count = len(snapshot['rows'])
page.last_hash = current_hash
page.last_error = None
page.last_synced_at = now
for row in snapshot['rows']:
row_hash = hash_payload({
'tipo': tipo,
'categoria_oficial': snapshot['categoria_oficial'],
**row,
})
item = ObraSocialCatalog.query.filter_by(rnas=row['rnas']).first()
if not item:
item = ObraSocialCatalog(rnas=row['rnas'])
db.session.add(item)
summary['rows_new'] += 1
else:
if item.row_hash != row_hash:
summary['rows_updated'] += 1
item.tipo = tipo
item.categoria_oficial = snapshot['categoria_oficial']
item.denominacion = row['denominacion']
item.domicilio = row['domicilio']
item.localidad = row['localidad']
item.telefono = row['telefono']
item.linea_gratuita = row['linea_gratuita']
item.habilitada_opciones = row['habilitada_opciones']
item.vigente = True
item.row_hash = row_hash
item.last_seen_at = now
item.page_snapshot_id = page.id
db.session.flush()
seen_ids.add(item.id)
summary['pages_processed'] += 1
except Exception as exc:
page.status = 'error'
page.last_error = str(exc)
page.last_synced_at = now
summary['pages_with_error'] += 1
summary['messages'].append(f'tipo={tipo}: {exc}')
# marcar como no vigentes las no vistas en esta corrida
if seen_ids:
ObraSocialCatalog.query.filter(~ObraSocialCatalog.id.in_(seen_ids)).update(
{'vigente': False}, synchronize_session=False
)
db.session.commit()
return summary
def georef_get(path: str, params: dict[str, Any] | None = None):
session = get_requests_session()
resp = session.get(f'{GEOREF_BASE}/{path}', params=params or {}, timeout=30)
resp.raise_for_status()
return resp.json()
def get_provinces():
data = georef_get('provincias', {'campos': 'id,nombre', 'max': 100})
return data.get('provincias', [])
def get_municipios(provincia_id: str):
data = georef_get('municipios', {'provincia': provincia_id, 'campos': 'id,nombre', 'max': 500})
return data.get('municipios', [])
def get_localidades(provincia_id: str = '', municipio_id: str = ''):
params = {'campos': 'id,nombre', 'max': 500}
if provincia_id:
params['provincia'] = provincia_id
if municipio_id:
params['municipio'] = municipio_id
data = georef_get('localidades', params)
return data.get('localidades', [])
def normalize_sisa_item(item: Any) -> dict[str, Any]:
if isinstance(item, dict):
getter = item.get
else:
getter = lambda k, default=None: getattr(item, k, default)
name_parts = [getter('apellido'), getter('nombre')]
visible = normalize_space(' '.join([p for p in name_parts if p]))
if not visible:
visible = getter('nombreCompleto') or getter('displayName') or getter('nombre_apellido') or ''
specialty = getter('especialidad') or getter('specialty') or getter('especialidadNombre') or ''
jurisdiction = getter('jurisdiccion') or getter('jurisdiccionNombre') or getter('provincia') or ''
return {
'documento': getter('dni') or getter('documento') or '',
'display_name': normalize_space(visible),
'matricula': getter('matricula') or getter('matriculaNacional') or getter('numeroMatricula') or '',
'profession_name': getter('profesion') or getter('profesionNombre') or getter('titulo') or '',
'specialty': specialty,
'jurisdiction_name': jurisdiction,
'state_name': getter('estado') or getter('estadoRegistro') or getter('situacion') or '',
'raw': safe_json_dumps(item),
}
def _zeep_client(wsdl: str):
from zeep import Client
from zeep.transports import Transport
session = get_requests_session()
transport = Transport(session=session, timeout=30)
return Client(wsdl=wsdl, transport=transport)
def sisa_test_connection(wsdl: str, user: str, password: str, operation: str) -> dict[str, Any]:
if not wsdl:
raise IntegrationError('Falta configurar la URL WSDL.')
if not user or not password:
raise IntegrationError('Faltan las credenciales SISA.')
client = _zeep_client(wsdl)
service = client.service
if not hasattr(service, operation):
ops = ', '.join(sorted(client.wsdl.bindings[next(iter(client.wsdl.bindings))].all())[:10]) if client.wsdl.bindings else ''
raise IntegrationError(f'La operación {operation} no existe en el WSDL. Operaciones detectadas: {ops}')
return {'ok': True, 'message': f'WSDL cargado correctamente. Operación disponible: {operation}'}
def sisa_search_professionals(wsdl: str, user: str, password: str, operation: str, dni: str = '', query: str = '', matricula: str = '') -> list[dict[str, Any]]:
if not wsdl:
raise IntegrationError('Falta configurar la URL WSDL.')
if not user or not password:
raise IntegrationError('Faltan las credenciales SISA.')
client = _zeep_client(wsdl)
service = client.service
if not hasattr(service, operation):
raise IntegrationError(f'La operación {operation} no existe en el WSDL configurado.')
method = getattr(service, operation)
candidates = [
{'usuario': user, 'clave': password, 'dni': dni, 'apellidoNombre': query, 'matricula': matricula},
{'usuario': user, 'password': password, 'dni': dni, 'apellidoNombre': query, 'matricula': matricula},
{'user': user, 'password': password, 'dni': dni, 'apellidoNombre': query, 'matricula': matricula},
{'usuario': user, 'clave': password, 'documento': dni, 'nombreApellido': query, 'matricula': matricula},
{'usuario': user, 'clave': password, 'matricula': matricula, 'query': query, 'dni': dni},
{'dni': dni, 'apellidoNombre': query, 'matricula': matricula, 'usuario': user, 'clave': password},
]
last_error = None
response = None
for kwargs in candidates:
clean_kwargs = {k: v for k, v in kwargs.items() if v not in (None, '')}
try:
response = method(**clean_kwargs)
break
except Exception as exc:
last_error = exc
continue
if response is None:
raise IntegrationError(f'No se pudo ejecutar la operación SISA: {last_error}')
if response is None:
return []
if isinstance(response, list):
items = response
elif isinstance(response, dict):
for key in ('profesionales', 'items', 'return', 'resultado', 'results'):
if key in response and response[key]:
items = response[key]
break
else:
items = [response]
else:
possible = None
for key in ('profesionales', 'items', 'return', 'resultado', 'results'):
possible = getattr(response, key, None)
if possible:
break
items = possible if possible is not None else [response]
normalized = [normalize_sisa_item(item) for item in items if item]
normalized = [item for item in normalized if item['display_name'] or item['documento'] or item['matricula']]
return normalized