Commit 494da98c authored by Jose Hugo Torres's avatar Jose Hugo Torres
Browse files

Gateway simplificado

Se eliminaron las colas de mensajes.
parent a93e01b6
...@@ -13,20 +13,20 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -13,20 +13,20 @@ public class ClienteServidorPOSBC(string ip, int pto)
readonly ILogger log = Log.ForContext<ClienteServidorPOSBC>(); readonly ILogger log = Log.ForContext<ClienteServidorPOSBC>();
private int _pto = pto; private int _pto = pto;
private string _ip = ip; private string _ip = ip;
private static Socket _socket = null; public Socket Socket { get; private set; }
public bool ConexionActiva { get; private set; } = false; public bool ConexionActiva { get; private set; } = false;
private int _nroMsjEnviados = 0; private int _nroMsjEnviados = 0;
private int _nroMsjRecibidos; private int _nroMsjRecibidos;
public void AbreConexion() public void AbreConexion()
{ {
_socket = new(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); Socket = new(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try try
{ {
// Conectarse al servidor // Conectarse al servidor
_socket.Connect(new IPEndPoint(IPAddress.Parse(_ip), _pto)); Socket.Connect(new IPEndPoint(IPAddress.Parse(_ip), _pto));
ConexionActiva = true; ConexionActiva = true;
Log.Information("Conectado a POSBC {ip}:{pto} - {conectado}", _ip, _pto, _socket.Connected); Log.Information("Conectado a POSBC #1 {ip}:{pto} - {conectado}", _ip, _pto, Socket.Connected);
} }
catch (SocketException ex) catch (SocketException ex)
{ {
...@@ -46,14 +46,14 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -46,14 +46,14 @@ public class ClienteServidorPOSBC(string ip, int pto)
{ {
// Cerrar el socket // Cerrar el socket
ConexionActiva = false; ConexionActiva = false;
_socket.Shutdown(SocketShutdown.Both); Socket.Shutdown(SocketShutdown.Both);
_socket.Close(); Socket.Close();
} }
catch (Exception e) catch (Exception e)
{ {
Log.Warning("Excepción cerrando conexión a POSBC {ip}:{pto} - {e}", _ip, _pto, e); Log.Warning("Excepción cerrando conexión a POSBC {ip}:{pto} - {e}", _ip, _pto, e);
} }
Log.Information("Conexión cerrada a POSBC {ip}:{pto} - {conectado}", _ip, _pto, _socket.Connected); Log.Information("Conexión cerrada a POSBC {ip}:{pto} - {conectado}", _ip, _pto, Socket.Connected);
} }
/// <summary> /// <summary>
...@@ -64,51 +64,51 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -64,51 +64,51 @@ public class ClienteServidorPOSBC(string ip, int pto)
var taskId = Task.CurrentId?.ToString() ?? "no-task"; var taskId = Task.CurrentId?.ToString() ?? "no-task";
// Empaca mensaje en bytes. // Empaca mensaje en bytes.
byte[] buffer = Util.ConvierteEnBufferBytes(msj); byte[] buffer = Util.ConvierteEnBufferBytes(msj);
await _socket.SendAsync(buffer); await Socket.SendAsync(buffer);
_nroMsjEnviados++; _nroMsjEnviados++;
Log.Information("{tid} GTWY -> POSBC {nro} - {bytes} bytes total", taskId, _nroMsjEnviados, buffer.Length); Log.Information("{tid} GTWY -> POSBC {nro} - {bytes} bytes total", taskId, _nroMsjEnviados, buffer.Length);
Log.Verbose("{tid} GTWY -> POSBC mensaje\n{msj}", taskId, msj); Log.Verbose("{tid} GTWY -> POSBC mensaje\n{msj}", taskId, msj);
} }
public async Task Recibe(Channel<string> canalSalida) // public async Task Recibe(Channel<string> canalSalida)
{ // {
var taskId = Task.CurrentId?.ToString() ?? "no-task"; // var taskId = Task.CurrentId?.ToString() ?? "no-task";
Log.Debug("{tid} Tarea POSBC acepta entradas iniciada", taskId); // Log.Debug("{tid} Tarea POSBC acepta entradas iniciada", taskId);
while (true) // while (true)
{ // {
// Lee longitud mensaje entrante, 4 bytes. // // Lee longitud mensaje entrante, 4 bytes.
Log.Information("{tid} Esperando POSBC", taskId); // Log.Information("{tid} Esperando POSBC", taskId);
var bufferLongitud = new byte[4]; // var bufferLongitud = new byte[4];
// Lee los primeros 4 bytes de los datos de entrada, los cuales indican // // Lee los primeros 4 bytes de los datos de entrada, los cuales indican
// la longitud del resto del mensaje. // // la longitud del resto del mensaje.
int nroBytesLeidos = await _socket.ReceiveAsync(new ArraySegment<byte>(bufferLongitud), SocketFlags.None); // int nroBytesLeidos = await Socket.ReceiveAsync(new ArraySegment<byte>(bufferLongitud), SocketFlags.None);
// Si el número de bytes leidos es cero, la conexión se ha cerrado. // // Si el número de bytes leidos es cero, la conexión se ha cerrado.
// El método Receive es sincróno, luego si retorna sin datos, es que no hay conexión. // // El método Receive es sincróno, luego si retorna sin datos, es que no hay conexión.
// Mientras tenga conexión, se queda esperando datos. // // Mientras tenga conexión, se queda esperando datos.
if (nroBytesLeidos == 0) break; // if (nroBytesLeidos == 0) break;
int longitudMensaje = Util.LongitudCodificada(bufferLongitud); // int longitudMensaje = Util.LongitudCodificada(bufferLongitud);
Log.Debug("{tid} POSBC bytes cabecera {nroBytes}, longitud mensaje {longitud}", taskId, nroBytesLeidos, longitudMensaje); // Log.Debug("{tid} POSBC bytes cabecera {nroBytes}, longitud mensaje {longitud}", taskId, nroBytesLeidos, longitudMensaje);
// Prepara buffer de entrada según la longitud esperada del mensaje. // // Prepara buffer de entrada según la longitud esperada del mensaje.
var bufferMensaje = new byte[longitudMensaje]; // var bufferMensaje = new byte[longitudMensaje];
nroBytesLeidos = await _socket.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None); // nroBytesLeidos = await Socket.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
Log.Debug("{tid} POSBC bytes mensaje {nroBytes}", taskId, nroBytesLeidos); // Log.Debug("{tid} POSBC bytes mensaje {nroBytes}", taskId, nroBytesLeidos);
_nroMsjRecibidos++; // _nroMsjRecibidos++;
Log.Information("{tid} GTWY <- POSBC {nro} - {bytes} bytes cuerpo", taskId, _nroMsjRecibidos, nroBytesLeidos); // Log.Information("{tid} GTWY <- POSBC {nro} - {bytes} bytes cuerpo", taskId, _nroMsjRecibidos, nroBytesLeidos);
// Arreglo de bytes de entrada se transforma en string, // // Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje. // // constituye el mensaje.
string mensaje = Encoding.UTF8.GetString(bufferMensaje, 0, nroBytesLeidos); // string mensaje = Encoding.UTF8.GetString(bufferMensaje, 0, nroBytesLeidos);
Log.Verbose("{tid} GTWY <- POSBC mensaje\n{msj}", taskId, mensaje); // Log.Verbose("{tid} GTWY <- POSBC mensaje\n{msj}", taskId, mensaje);
// Se agrega mensaje a la cola de entrada de mensajes. // // Se agrega mensaje a la cola de entrada de mensajes.
await canalSalida.Writer.WriteAsync(mensaje); // await canalSalida.Writer.WriteAsync(mensaje);
Log.Debug("{tid} POSBC mensaje -> [cola salida]", taskId); // Log.Debug("{tid} POSBC mensaje -> [cola salida]", taskId);
} // }
} // }
// Envia mensaje, espera respuesta y la retorna como un arreglo de bytes. // Envia mensaje, espera respuesta y la retorna como un arreglo de bytes.
// El mensaje es un string, convertido a arreglo de bytes, y le antepone // El mensaje es un string, convertido a arreglo de bytes, y le antepone
...@@ -117,7 +117,7 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -117,7 +117,7 @@ public class ClienteServidorPOSBC(string ip, int pto)
public byte[] EnviaRecibe(byte[] bytesMensaje) public byte[] EnviaRecibe(byte[] bytesMensaje)
{ {
// Remite mensaje. // Remite mensaje.
_socket.Send(bytesMensaje); Socket.Send(bytesMensaje);
_nroMsjEnviados++; _nroMsjEnviados++;
Log.Information("Mensaje #{contador} para POSBIC {ip}", _nroMsjEnviados, _ip); Log.Information("Mensaje #{contador} para POSBIC {ip}", _nroMsjEnviados, _ip);
Log.Information("Esperando respuesta.."); Log.Information("Esperando respuesta..");
...@@ -131,13 +131,13 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -131,13 +131,13 @@ public class ClienteServidorPOSBC(string ip, int pto)
// Usar un MemoryStream para manejar datos de longitud arbitraria // Usar un MemoryStream para manejar datos de longitud arbitraria
using (var ms = new System.IO.MemoryStream(1024)) using (var ms = new System.IO.MemoryStream(1024))
{ {
while ((bytesRead = _socket.Receive(buffer)) > 0) while ((bytesRead = Socket.Receive(buffer)) > 0)
{ {
ms.Write(buffer, 0, bytesRead); ms.Write(buffer, 0, bytesRead);
totalBytesRead += bytesRead; totalBytesRead += bytesRead;
// Romper el ciclo si se ha leído todo el mensaje // Romper el ciclo si se ha leído todo el mensaje
if (_socket.Available == 0) break; if (Socket.Available == 0) break;
} }
// Obtener todos los datos recibidos como un arreglo de bytes // Obtener todos los datos recibidos como un arreglo de bytes
......
...@@ -53,8 +53,10 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB ...@@ -53,8 +53,10 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
/// </summary> /// </summary>
static ClienteServidorPOSBC _posbc; static ClienteServidorPOSBC _posbc;
long _numeroConexionesEntrantes = 0; long _numeroConexionesEntrantes = 0;
static long _nroMsjEntrada = 0; static long _nroMsjEntradaCHEC = 0;
static long _nroMsjSalida = 0; static long _nroMsjEntradaPOSBC = 0;
static long _nroMsjSalidaCHEC = 0;
static long _nroMsjSalidaPOSBC = 0;
static Socket _socketCHEC; static Socket _socketCHEC;
// Canal de entrada. // Canal de entrada.
...@@ -111,7 +113,7 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB ...@@ -111,7 +113,7 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
Log.Information("Gateway servidor socket en {ip} : {puerto}, proceso id {id}", IpGateway, PuertoGateway, idProceso); Log.Information("Gateway servidor socket en {ip} : {puerto}, proceso id {id}", IpGateway, PuertoGateway, idProceso);
if (_isDebug) if (_isDebug)
{ {
Log.Debug("Versión framework {version} # 10", Environment.Version); Log.Debug("Versión framework {version} # 13", Environment.Version);
Log.Debug("Tcp Socket configuración:"); Log.Debug("Tcp Socket configuración:");
Log.Debug($" Blocking {tcpSocket.Blocking}"); Log.Debug($" Blocking {tcpSocket.Blocking}");
Log.Debug($" ExclusiveAddressUse {tcpSocket.ExclusiveAddressUse}"); Log.Debug($" ExclusiveAddressUse {tcpSocket.ExclusiveAddressUse}");
...@@ -126,19 +128,20 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB ...@@ -126,19 +128,20 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
} }
try try
{ {
// Activa el POSBC //var tareaRecibePOSBC = Task.Run(() => _posbc.Recibe(_canalSalida));
if (_posbc == null) ActivaPOSBC();
// var tareaProcesaMsj = Task.Run(() => ProcesaEntradaCHEC());
var tareaRecibePOSBC = Task.Run(() => _posbc.Recibe(_canalSalida));
var tareaRemiteRespuesta = Task.Run(() => ProcesaSalidaPOSBC());
// Procesa conexiones al Gateway. // Procesa conexiones al Gateway.
while (true) while (true)
{ {
Log.Information("Esperando conexión"); Log.Information("Esperando conexión");
Socket cliente = tcpSocket.Accept(); Socket clienteCHEC = tcpSocket.Accept();
Log.Information("Conexión remota ip {ip} en puerto {pto}", IPAddress.Parse(((IPEndPoint)cliente.RemoteEndPoint).Address.ToString()), ((IPEndPoint)cliente.RemoteEndPoint).Port.ToString()); Log.Information("Conexión remota ip {ip} en puerto {pto}",
_ = Task.Run(() => AceptaEntradas(cliente)); IPAddress.Parse(((IPEndPoint)clienteCHEC.RemoteEndPoint).Address.ToString()),
((IPEndPoint)clienteCHEC.RemoteEndPoint).Port.ToString());
// Activa el POSBC
ActivaPOSBC();
var tareaAceptaEntradasCHEC = Task.Run(() => AceptaEntradasCHEC(clienteCHEC));
var tareaRemiteRespuestaPOSBC = Task.Run(() => ProcesaSalidaPOSBC(clienteCHEC, _posbc.Socket));
} }
} }
finally finally
...@@ -148,7 +151,13 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB ...@@ -148,7 +151,13 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
} }
} }
public static async Task AceptaEntradas(Socket cliente) /// <summary>
/// Acepta entrada de datos desde CHEC.
/// Procesa la entrada.
/// </summary>
/// <param name="cliente">Socket de conexión a CHEC.</param>
/// <returns></returns>
public async Task AceptaEntradasCHEC(Socket cliente)
{ {
var taskId = Task.CurrentId?.ToString() ?? "no-task"; var taskId = Task.CurrentId?.ToString() ?? "no-task";
_socketCHEC = cliente; _socketCHEC = cliente;
...@@ -198,8 +207,8 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB ...@@ -198,8 +207,8 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
// var bufferMensaje = new byte[longitudMensaje]; // var bufferMensaje = new byte[longitudMensaje];
// nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None); // nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
_nroMsjEntrada++; _nroMsjEntradaCHEC++;
Log.Information("{tid} CHEC -> GTWY nuevo mensaje #{nro} - cuerpo {bytes} bytes", taskId, _nroMsjEntrada, bytesLeidos); Log.Information("{tid} CHEC -> GTWY nuevo mensaje #{nro} - cuerpo {bytes} bytes", taskId, _nroMsjEntradaCHEC, bytesLeidos);
// Arreglo de bytes de entrada se transforma en string, // Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje. // constituye el mensaje.
...@@ -214,54 +223,96 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB ...@@ -214,54 +223,96 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
// await _canalEntrada.Writer.WriteAsync(mensaje); // await _canalEntrada.Writer.WriteAsync(mensaje);
// Log.Debug("{tid} mensaje {nro} -> [cola entrada]", taskId, _nroMsjEntrada); // Log.Debug("{tid} mensaje {nro} -> [cola entrada]", taskId, _nroMsjEntrada);
// // Punto de procesamiento mensajes desde CHEC antes de enviar a POSBC.
// string mensajeProcesado = ProcesaMensajeCHEC(mensaje);
// PROCESA EL MENSAJE CHEC -> GATEWAY await _posbc.Envia(mensajeProcesado);
//
//
await _posbc.Envia(mensaje);
} }
cliente.Close(); cliente.Close();
Log.Warning("{tid} CHEC conexión cerrada", taskId); Log.Warning("{tid} Tarea acepta entradas CHEC TERMINADA", taskId);
} }
private static async Task ProcesaEntradaCHEC() /// <summary>
/// Procesa los mensajes d entrada CHEC.
/// Punto de tratamiento de mensajes CHEC si hay lugar a procesarlos.
/// Retorna mensaje procesado para remitir a POSBC.
/// </summary>
/// <param name="mensaje"></param>
/// <returns></returns>
public string ProcesaMensajeCHEC(string mensaje)
{ {
var taskId = Task.CurrentId?.ToString() ?? "no-task"; Log.Debug("GTWY procesador mensaje CHEC para POSBC invocado.");
Log.Debug("{tid} Tarea procesa entrada CHEC inicia", taskId); var copiaMensaje = new String(mensaje);
while (true) return copiaMensaje;
{
// Espera por mensajes en cola.
string mensaje = await _canalEntrada.Reader.ReadAsync();
Log.Verbose("Procesa mensaje de [cola entrada CHEC]\n{msj}", mensaje);
//
//
// PROCESA EL MENSAJE CHEC -> GATEWAY
//
//
// Remita mensaje procesado al POSBC
await _posbc.Envia(mensaje);
}
} }
private static async Task ProcesaSalidaPOSBC()
/// <summary>
/// Toma mensajes de la cola de mensaje de POSBC, los procesa y los remite a CHEC.
/// </summary>
/// <param name="clienteCHEC">Socket de conexión de CHEC</param>
/// <returns></returns>
private async Task ProcesaSalidaPOSBC(Socket clienteCHEC, Socket clientePOSBC)
{ {
var taskId = Task.CurrentId?.ToString() ?? "no-task"; var taskId = Task.CurrentId?.ToString() ?? "no-task";
Log.Debug("{tid} Tarea procesa salida POSBC inicia", taskId); Log.Debug("{tid} Tarea procesa salida POSBC inicia", taskId);
while (true) while (true)
{ {
string mensaje = await _canalSalida.Reader.ReadAsync(); // Lee longitud mensaje entrante, 4 bytes.
Log.Verbose("Procesa mensaje de [cola salida POSBC]\n{msj}", mensaje); Log.Information("{tid} Esperando POSBC", taskId);
// var bufferLongitud = new byte[4];
// int bytesLeidos = 0;
while (bytesLeidos < 4)
{
bytesLeidos += clientePOSBC.Receive(bufferLongitud, bytesLeidos, bufferLongitud.Length, SocketFlags.None);
}
// Lee porción de datos del mensaje, hasta la longitud indicada en los 4 primeros bytes.
int longitudMensaje = Util.LongitudCodificada(bufferLongitud);
Log.Debug("{tid} GTWY <- POSBC entra nuevo mensaje, longitud en cabecera {nroBytes}", taskId, longitudMensaje);
var bufferEntrada = new byte[longitudMensaje];
bytesLeidos = 0;
while (bytesLeidos < longitudMensaje)
{
bytesLeidos += clientePOSBC.Receive(bufferEntrada, bytesLeidos, bufferEntrada.Length, SocketFlags.None);
}
_nroMsjEntradaPOSBC++;
Log.Information("{tid} GTWY <- POSBC {nro} - {bytes} bytes cuerpo", taskId, _nroMsjEntradaPOSBC, bytesLeidos);
// Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje.
string mensaje = Encoding.UTF8.GetString(bufferEntrada, 0, bytesLeidos);
Log.Verbose("{tid} GTWY <- POSBC mensaje\n{msj}", taskId, mensaje);
// PROCESA MENSAJE POSBC --> CHEC // PROCESA MENSAJE POSBC --> CHEC
// String mensajeProcesado = ProcesaMensajePOSBC(mensaje);
// if (mensajeProcesado.Length > 0)
byte[] bufferSalida = Util.ConvierteEnBufferBytes(mensaje); {
await _socketCHEC.SendAsync(bufferSalida); byte[] bufferSalida = Util.ConvierteEnBufferBytes(mensajeProcesado);
_nroMsjSalida++; await clienteCHEC.SendAsync(bufferSalida);
Log.Information("{tid} CHEC <- GTWY {nro} - {bytes} bytes total", taskId, _nroMsjSalida, bufferSalida.Length); _nroMsjSalidaCHEC++;
Log.Verbose("{tid} CHEC <- GTWY mensaje\n{msj}", taskId, mensaje); Log.Information("{tid} CHEC <- GTWY {nro} - {bytes} bytes total", taskId, _nroMsjSalidaCHEC, bufferSalida.Length);
Log.Verbose("{tid} CHEC <- GTWY mensaje\n{msj}", taskId, mensaje);
}
else
{
Log.Warning("GTWY mensaje longitud 0 - no se remite a CHEC");
}
} }
Log.Debug("{tid} Tarea procesa salida POSBC TERMINADA", taskId);
}
/// <summary>
/// Procesa los mensajes de respuesta de POSBC.
/// Punto de tratamiento de mensajes POSBC si hay lugar a procesarlos.
/// Retorna mensaje procesado para remitir a CHEC.
/// </summary>
/// <param name="mensaje"></param>
/// <returns></returns>
public string ProcesaMensajePOSBC(string mensaje)
{
Log.Debug("GTWY procesador mensaje POSBC para CHEC invocado.");
var copiaMensaje = new String(mensaje);
return copiaMensaje;
} }
} }
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment