Commit a93e01b6 authored by Jose Hugo Torres's avatar Jose Hugo Torres
Browse files

Servidor CHEC - sin cola de entrada

Mensajes que llegan de CHEC pasan directo a POSBC sin cola
parent da7a2689
......@@ -30,7 +30,7 @@ namespace GatewaySCO
public static byte[] ConvierteEnBufferBytes(string mensaje)
{
// Codifica longitud del mensaje en un entero sin signo en los 4 primeros bytes.
uint longitud = Convert.ToUInt32(mensaje.Length);
int longitud = Convert.ToInt32(mensaje.Length);
byte[] bytesConLongMensaje = BitConverter.GetBytes(longitud);
// Bytes mas significativos deben ir primero, usa 'big-endian'.
if (BitConverter.IsLittleEndian)
......@@ -45,17 +45,18 @@ namespace GatewaySCO
}
/// <summary>
/// Longitud del mensaje, retorna entero sin signo representado en los 4 primeros bytes del parámetro.
/// Longitud del mensaje, retorna entero representado en los 4 primeros bytes
/// del arreglo de bytes del parámetro, que corresponde a un mensaje codificado en bytes.
/// </summary>
public static uint LongitudMensaje(byte[] bytesMensaje)
public static int LongitudCodificada(byte[] bytesMensaje)
{
// Extrae longitud. Primeros 4 bytes del arreglo.
byte[] bytesLongitudMensaje = new byte[4];
Buffer.BlockCopy(bytesMensaje, 0, bytesLongitudMensaje, 0, 4);
if (BitConverter.IsLittleEndian)
Array.Reverse(bytesLongitudMensaje);
uint enteroSinSigno = BitConverter.ToUInt32(bytesLongitudMensaje, 0);
return enteroSinSigno;
int longitud = BitConverter.ToInt32(bytesLongitudMensaje, 0);
return longitud;
}
/// <summary>
......@@ -110,7 +111,7 @@ namespace GatewaySCO
while (continua)
{
// Extrae longitud.
int longitud = Convert.ToInt32(LongitudMensaje(buffer));
int longitud = LongitudCodificada(buffer);
contenido.Append($"\n\tlongitud: {longitud}");
if (longitud + puntero > buffer.Length)
{
......
......@@ -16,7 +16,7 @@ public class ClienteServidorPOSBC(string ip, int pto)
private static Socket _socket = null;
public bool ConexionActiva { get; private set; } = false;
private int _nroMsjEnviados = 0;
private int _nroMsjRecibidos = 0;
private int _nroMsjRecibidos;
public void AbreConexion()
{
......@@ -66,8 +66,8 @@ public class ClienteServidorPOSBC(string ip, int pto)
byte[] buffer = Util.ConvierteEnBufferBytes(msj);
await _socket.SendAsync(buffer);
_nroMsjEnviados++;
Log.Information("{tid} Gateway -> POSBC {nro} - {bytes} bytes total", taskId, _nroMsjEnviados, buffer.Length);
Log.Verbose("{tid} Gateway -> POSBC mensaje\n{msj}", taskId, msj);
Log.Information("{tid} GTWY -> POSBC {nro} - {bytes} bytes total", taskId, _nroMsjEnviados, buffer.Length);
Log.Verbose("{tid} GTWY -> POSBC mensaje\n{msj}", taskId, msj);
}
public async Task Recibe(Channel<string> canalSalida)
......@@ -89,21 +89,20 @@ public class ClienteServidorPOSBC(string ip, int pto)
// Mientras tenga conexión, se queda esperando datos.
if (nroBytesLeidos == 0) break;
uint longitudMensaje = Util.LongitudMensaje(bufferLongitud);
int lMensaje = Convert.ToInt32(longitudMensaje);
Log.Debug("{tid} POSBC bytes cabecera {nroBytes}, longitud mensaje {longitud} uint {uint}", taskId, nroBytesLeidos, lMensaje, longitudMensaje);
int longitudMensaje = Util.LongitudCodificada(bufferLongitud);
Log.Debug("{tid} POSBC bytes cabecera {nroBytes}, longitud mensaje {longitud}", taskId, nroBytesLeidos, longitudMensaje);
// Prepara buffer de entrada según la longitud esperada del mensaje.
var bufferMensaje = new byte[longitudMensaje];
nroBytesLeidos = await _socket.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
Log.Debug("{tid} POSBC bytes mensaje {nroBytes}", taskId, nroBytesLeidos);
_nroMsjRecibidos++;
Log.Information("{tid} Gateway <- POSBC {nro} - {bytes} bytes cuerpo", taskId, _nroMsjRecibidos, nroBytesLeidos);
Log.Information("{tid} GTWY <- POSBC {nro} - {bytes} bytes cuerpo", taskId, _nroMsjRecibidos, nroBytesLeidos);
// Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje.
string mensaje = Encoding.UTF8.GetString(bufferMensaje, 0, nroBytesLeidos);
Log.Verbose("{tid} Gateway <- POSBC mensaje\n{msj}", taskId, mensaje);
Log.Verbose("{tid} GTWY <- POSBC mensaje\n{msj}", taskId, mensaje);
// Se agrega mensaje a la cola de entrada de mensajes.
await canalSalida.Writer.WriteAsync(mensaje);
......
......@@ -276,7 +276,7 @@ namespace EvaPosSCOSrv
Log.Debug("Arriba un mensaje.");
// Lee porción de datos del mensaje, hasta la longitud indicada en los 4 primeros bytes.
uint longitudMensaje = Util.LongitudMensaje(bufferLongitud);
int longitudMensaje = Util.LongitudCodificada(bufferLongitud);
if (longitudMensaje > LongMaxMensaje) throw new Exception($"Mensaje {longitudMensaje} bytes supera máximo permitido de {LongMaxMensaje} bytes.");
var bufferEntrada = new byte[longitudMensaje];
bytesLeidos = 0;
......
......@@ -111,7 +111,7 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
Log.Information("Gateway servidor socket en {ip} : {puerto}, proceso id {id}", IpGateway, PuertoGateway, idProceso);
if (_isDebug)
{
Log.Debug("Versión framework {version} #6", Environment.Version);
Log.Debug("Versión framework {version} # 10", Environment.Version);
Log.Debug("Tcp Socket configuración:");
Log.Debug($" Blocking {tcpSocket.Blocking}");
Log.Debug($" ExclusiveAddressUse {tcpSocket.ExclusiveAddressUse}");
......@@ -128,7 +128,7 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
{
// Activa el POSBC
if (_posbc == null) ActivaPOSBC();
var tareaProcesaMsj = Task.Run(() => ProcesaEntradaCHEC());
// var tareaProcesaMsj = Task.Run(() => ProcesaEntradaCHEC());
var tareaRecibePOSBC = Task.Run(() => _posbc.Recibe(_canalSalida));
var tareaRemiteRespuesta = Task.Run(() => ProcesaSalidaPOSBC());
......@@ -155,39 +155,74 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
Log.Debug("{tid} Tarea acepta entradas iniciada", taskId);
while (true)
{
// ------------------- LECTURA SINCRONICA
// Lee longitud mensaje entrante, 4 bytes.
Log.Information("{tid} Esperando CHEC", taskId);
var bufferLongitud = new byte[4];
int bytesLeidos = 0;
while (bytesLeidos < 4)
{
bytesLeidos += cliente.Receive(bufferLongitud, bytesLeidos, bufferLongitud.Length, SocketFlags.None);
}
// Lee porción de datos del mensaje, hasta la longitud indicada en los 4 primeros bytes.
int longitudMensaje = Util.LongitudCodificada(bufferLongitud);
Log.Debug("{tid} CHEC -> GTWY entra nuevo mensaje, longitud en cabecera {nroBytes}", taskId, longitudMensaje);
var bufferEntrada = new byte[longitudMensaje];
bytesLeidos = 0;
while (bytesLeidos < longitudMensaje)
{
bytesLeidos += cliente.Receive(bufferEntrada, bytesLeidos, bufferEntrada.Length, SocketFlags.None);
}
// ---------------------- LECTURA ASINCRONICA
// Lee los primeros 4 bytes de los datos de entrada, los cuales indican
// la longitud del resto del mensaje.
int nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferLongitud), SocketFlags.None);
//int nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferLongitud), SocketFlags.None);
// Si el número de bytes leidos es cero, la conexión se ha cerrado.
// El método Receive es sincróno, luego si retorna sin datos, es que no hay conexión.
// Mientras tenga conexión, se queda esperando datos.
if (nroBytesLeidos == 0) break;
uint longitudMensaje = Util.LongitudMensaje(bufferLongitud);
Log.Debug("{tid} CHECK bytes cabecera {nroBytes}, longitud mensaje {longitud}", taskId, nroBytesLeidos, longitudMensaje);
// Prepara buffer de entrada según la longitud esperada del mensaje.
var bufferMensaje = new byte[longitudMensaje];
//nroBytesLeidos = cliente.Receive(bufferMensaje, 0, bufferMensaje.Length, SocketFlags.None);
nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
Log.Debug("{tid} CHECK bytes mensaje {nroBytes}", taskId, nroBytesLeidos);
// if (nroBytesLeidos == 0)
// {
// Log.Warning("{tid} CHEC -> GTWY emite 0 bytes - señal cierra conexión.");
// break;
// }
// int longitudMensaje = Util.LongitudCodificada(bufferLongitud);
// Log.Debug("{tid} CHEC -> GTWY bytes cabecera {nroBytes}, longitud cuerpo mensaje {longitud}", taskId, nroBytesLeidos, longitudMensaje);
// // Prepara buffer de entrada según la longitud esperada del mensaje.
// var bufferMensaje = new byte[longitudMensaje];
// nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
_nroMsjEntrada++;
Log.Information("{tid} CHEC -> Gateway {nro} - {bytes} bytes cuerpo", taskId, _nroMsjEntrada, nroBytesLeidos);
Log.Information("{tid} CHEC -> GTWY nuevo mensaje #{nro} - cuerpo {bytes} bytes", taskId, _nroMsjEntrada, bytesLeidos);
// Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje.
string mensaje = Encoding.UTF8.GetString(bufferMensaje, 0, nroBytesLeidos);
Log.Verbose("{tid} CHEC -> Gateway mensaje\n{msj}", taskId, mensaje);
string mensaje = Encoding.UTF8.GetString(bufferEntrada, 0, bytesLeidos);
if (bytesLeidos != longitudMensaje)
{
Log.Warning("{tid} Longitud en cabecera {longitudMensaje} no corresponde a longitud {bytesLeidos} mensaje leido.", taskId, longitudMensaje, bytesLeidos);
}
Log.Verbose("{tid} CHEC -> GTWY mensaje\n{msj}", taskId, mensaje);
// Se agrega mensaje a la cola de entrada de mensajes.
await _canalEntrada.Writer.WriteAsync(mensaje);
Log.Debug("{tid} mensaje -> [cola entrada]", taskId);
// await _canalEntrada.Writer.WriteAsync(mensaje);
// Log.Debug("{tid} mensaje {nro} -> [cola entrada]", taskId, _nroMsjEntrada);
//
//
// PROCESA EL MENSAJE CHEC -> GATEWAY
//
//
await _posbc.Envia(mensaje);
}
cliente.Close();
Log.Warning("{tid} CHEC conexión cerrada", taskId);
}
private static async Task ProcesaEntradaCHEC()
......@@ -225,8 +260,8 @@ public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSB
byte[] bufferSalida = Util.ConvierteEnBufferBytes(mensaje);
await _socketCHEC.SendAsync(bufferSalida);
_nroMsjSalida++;
Log.Information("{tid} CHEC <- Gateway {nro} - {bytes} bytes total", taskId, _nroMsjSalida, bufferSalida.Length);
Log.Verbose("{tid} CHEC <- Gateway mensaje\n{msj}", taskId, mensaje);
Log.Information("{tid} CHEC <- GTWY {nro} - {bytes} bytes total", taskId, _nroMsjSalida, bufferSalida.Length);
Log.Verbose("{tid} CHEC <- GTWY mensaje\n{msj}", taskId, mensaje);
}
}
}
\ No newline at end of file
......@@ -165,7 +165,8 @@ public class SocketClientECO
byte[] bufferMsj = Util.ConvierteEnBufferBytes(message);
await socket.SendAsync(new ArraySegment<byte>(bufferMsj), SocketFlags.None);
Console.WriteLine($"Mensaje enviado: {message}");
Console.WriteLine($"Mensaje enviado: {message} espera 1 segundo");
Task.Delay(1000).Wait();
}
// Recibir mensajes de forma asincrónica en un ciclo infinito
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment