Commit da7a2689 authored by Jose Hugo Torres's avatar Jose Hugo Torres
Browse files

Gateway colas mensaje

Maneja colas de procesamiento mensajes para aceptar múltiples respuestas desde POSBC.
parent 7990dd1d
...@@ -24,9 +24,10 @@ namespace GatewaySCO ...@@ -24,9 +24,10 @@ namespace GatewaySCO
/// <summary> /// <summary>
/// Convierte el string del parámetro en un arreglo de bytes, anteponiendo /// Convierte el string del parámetro en un arreglo de bytes, anteponiendo
/// al inicio del arreglo 4 bytes con la longitud del resto del arreglo. /// al inicio del arreglo 4 bytes con la longitud del resto del arreglo, en los
/// primeros 4 bytes.
/// </summary> /// </summary>
public static byte[] ConvertirEnBytes(string mensaje) public static byte[] ConvierteEnBufferBytes(string mensaje)
{ {
// Codifica longitud del mensaje en un entero sin signo en los 4 primeros bytes. // Codifica longitud del mensaje en un entero sin signo en los 4 primeros bytes.
uint longitud = Convert.ToUInt32(mensaje.Length); uint longitud = Convert.ToUInt32(mensaje.Length);
...@@ -37,14 +38,14 @@ namespace GatewaySCO ...@@ -37,14 +38,14 @@ namespace GatewaySCO
// Codifica en bytes texto del mensaje. // Codifica en bytes texto del mensaje.
byte[] bytesConMensaje = Encoding.UTF8.GetBytes(mensaje); byte[] bytesConMensaje = Encoding.UTF8.GetBytes(mensaje);
// Copia los 2 arreglos de bytes en un arreglo unificado. // Copia los 2 arreglos de bytes en un arreglo unificado.
byte[] bytes = new byte[bytesConLongMensaje.Length + bytesConMensaje.Length]; byte[] buffer = new byte[bytesConLongMensaje.Length + bytesConMensaje.Length];
Buffer.BlockCopy(bytesConLongMensaje, 0, bytes, 0, bytesConLongMensaje.Length); Buffer.BlockCopy(bytesConLongMensaje, 0, buffer, 0, bytesConLongMensaje.Length);
Buffer.BlockCopy(bytesConMensaje, 0, bytes, bytesConLongMensaje.Length, bytesConMensaje.Length); Buffer.BlockCopy(bytesConMensaje, 0, buffer, bytesConLongMensaje.Length, bytesConMensaje.Length);
return bytes; return buffer;
} }
/// <summary> /// <summary>
/// Longitud del mensaje, retorna entero representado en 4 bytes, sin signo. /// Longitud del mensaje, retorna entero sin signo representado en los 4 primeros bytes del parámetro.
/// </summary> /// </summary>
public static uint LongitudMensaje(byte[] bytesMensaje) public static uint LongitudMensaje(byte[] bytesMensaje)
{ {
...@@ -53,7 +54,8 @@ namespace GatewaySCO ...@@ -53,7 +54,8 @@ namespace GatewaySCO
Buffer.BlockCopy(bytesMensaje, 0, bytesLongitudMensaje, 0, 4); Buffer.BlockCopy(bytesMensaje, 0, bytesLongitudMensaje, 0, 4);
if (BitConverter.IsLittleEndian) if (BitConverter.IsLittleEndian)
Array.Reverse(bytesLongitudMensaje); Array.Reverse(bytesLongitudMensaje);
return BitConverter.ToUInt32(bytesLongitudMensaje, 0); uint enteroSinSigno = BitConverter.ToUInt32(bytesLongitudMensaje, 0);
return enteroSinSigno;
} }
/// <summary> /// <summary>
...@@ -102,17 +104,17 @@ namespace GatewaySCO ...@@ -102,17 +104,17 @@ namespace GatewaySCO
public static string DetalleMensajes(byte[] buffer) public static string DetalleMensajes(byte[] buffer)
{ {
StringBuilder contenido = new(); StringBuilder contenido = new();
contenido.Append($"Mensje en buffer de {buffer.Length} bytes."); contenido.Append($"\tMensaje en buffer de {buffer.Length} bytes.");
bool continua = true; bool continua = true;
int puntero = 4; // Se salta los primeros 4 bytes con la longitud. int puntero = 3; // Se salta los primeros 4 bytes con la longitud.
while (continua) while (continua)
{ {
// Extrae longitud. // Extrae longitud.
int longitud = Convert.ToInt32(LongitudMensaje(buffer)); int longitud = Convert.ToInt32(LongitudMensaje(buffer));
contenido.Append($"\nlongitud: {longitud}"); contenido.Append($"\n\tlongitud: {longitud}");
if (longitud + puntero > buffer.Length) if (longitud + puntero > buffer.Length)
{ {
contenido.Append($"\nlongitud {longitud} codificada en mensaje supera longitud restante en buffer."); contenido.Append($"\n\tlongitud {longitud} codificada en mensaje supera longitud restante en buffer.");
continua = false; continua = false;
continue; continue;
} }
...@@ -120,10 +122,10 @@ namespace GatewaySCO ...@@ -120,10 +122,10 @@ namespace GatewaySCO
// Notar que se asume que puede haber mas de un mensaje incluido en el buffer. // Notar que se asume que puede haber mas de un mensaje incluido en el buffer.
// Extrae encabezado. // Extrae encabezado.
string encabezado = ExtraeEncabezado(mensaje); string encabezado = ExtraeEncabezado(mensaje);
contenido.Append($"\nencabezado:\n{encabezado}"); contenido.Append($"\n\tencabezado:\n{encabezado}");
// Extrae mensaje. // Extrae mensaje.
string cuerpo = ExtraeCuerpo(mensaje); string cuerpo = ExtraeCuerpo(mensaje);
contenido.Append($"\ncuerpo:\n{cuerpo}"); contenido.Append($"\n\tcuerpo:\n{cuerpo}");
// Hay mas datos?. // Hay mas datos?.
if (longitud + puntero < buffer.Length) if (longitud + puntero < buffer.Length)
{ {
......
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading.Channels;
using GatewaySCO; using GatewaySCO;
using Serilog; using Serilog;
...@@ -9,13 +10,13 @@ namespace gatewaySCO.POSBC; ...@@ -9,13 +10,13 @@ namespace gatewaySCO.POSBC;
// Maneja conexión al POSBC. // Maneja conexión al POSBC.
public class ClienteServidorPOSBC(string ip, int pto) public class ClienteServidorPOSBC(string ip, int pto)
{ {
ILogger log = Log.ForContext<ClienteServidorPOSBC>(); readonly ILogger log = Log.ForContext<ClienteServidorPOSBC>();
private int _pto = pto; private int _pto = pto;
private string _ip = ip; private string _ip = ip;
private Socket _socket = null; private static Socket _socket = null;
public bool ConexionActiva { get; private set; } = false; public bool ConexionActiva { get; private set; } = false;
private int _contadorMensajesEnviados = 0; private int _nroMsjEnviados = 0;
private int _contadorMensajesRecibidos = 0; private int _nroMsjRecibidos = 0;
public void AbreConexion() public void AbreConexion()
{ {
...@@ -55,6 +56,61 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -55,6 +56,61 @@ public class ClienteServidorPOSBC(string ip, int pto)
Log.Information("Conexión cerrada a POSBC {ip}:{pto} - {conectado}", _ip, _pto, _socket.Connected); Log.Information("Conexión cerrada a POSBC {ip}:{pto} - {conectado}", _ip, _pto, _socket.Connected);
} }
/// <summary>
/// Envia mensaje al POSBC
/// </summary>
public async Task Envia(string msj)
{
var taskId = Task.CurrentId?.ToString() ?? "no-task";
// Empaca mensaje en bytes.
byte[] buffer = Util.ConvierteEnBufferBytes(msj);
await _socket.SendAsync(buffer);
_nroMsjEnviados++;
Log.Information("{tid} Gateway -> POSBC {nro} - {bytes} bytes total", taskId, _nroMsjEnviados, buffer.Length);
Log.Verbose("{tid} Gateway -> POSBC mensaje\n{msj}", taskId, msj);
}
public async Task Recibe(Channel<string> canalSalida)
{
var taskId = Task.CurrentId?.ToString() ?? "no-task";
Log.Debug("{tid} Tarea POSBC acepta entradas iniciada", taskId);
while (true)
{
// Lee longitud mensaje entrante, 4 bytes.
Log.Information("{tid} Esperando POSBC", taskId);
var bufferLongitud = new byte[4];
// Lee los primeros 4 bytes de los datos de entrada, los cuales indican
// la longitud del resto del mensaje.
int nroBytesLeidos = await _socket.ReceiveAsync(new ArraySegment<byte>(bufferLongitud), SocketFlags.None);
// Si el número de bytes leidos es cero, la conexión se ha cerrado.
// El método Receive es sincróno, luego si retorna sin datos, es que no hay conexión.
// Mientras tenga conexión, se queda esperando datos.
if (nroBytesLeidos == 0) break;
uint longitudMensaje = Util.LongitudMensaje(bufferLongitud);
int lMensaje = Convert.ToInt32(longitudMensaje);
Log.Debug("{tid} POSBC bytes cabecera {nroBytes}, longitud mensaje {longitud} uint {uint}", taskId, nroBytesLeidos, lMensaje, longitudMensaje);
// Prepara buffer de entrada según la longitud esperada del mensaje.
var bufferMensaje = new byte[longitudMensaje];
nroBytesLeidos = await _socket.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
Log.Debug("{tid} POSBC bytes mensaje {nroBytes}", taskId, nroBytesLeidos);
_nroMsjRecibidos++;
Log.Information("{tid} Gateway <- POSBC {nro} - {bytes} bytes cuerpo", taskId, _nroMsjRecibidos, nroBytesLeidos);
// Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje.
string mensaje = Encoding.UTF8.GetString(bufferMensaje, 0, nroBytesLeidos);
Log.Verbose("{tid} Gateway <- POSBC mensaje\n{msj}", taskId, mensaje);
// Se agrega mensaje a la cola de entrada de mensajes.
await canalSalida.Writer.WriteAsync(mensaje);
Log.Debug("{tid} POSBC mensaje -> [cola salida]", taskId);
}
}
// Envia mensaje, espera respuesta y la retorna como un arreglo de bytes. // Envia mensaje, espera respuesta y la retorna como un arreglo de bytes.
// El mensaje es un string, convertido a arreglo de bytes, y le antepone // El mensaje es un string, convertido a arreglo de bytes, y le antepone
// 4 bytes con la longitud del mensaje. // 4 bytes con la longitud del mensaje.
...@@ -63,8 +119,8 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -63,8 +119,8 @@ public class ClienteServidorPOSBC(string ip, int pto)
{ {
// Remite mensaje. // Remite mensaje.
_socket.Send(bytesMensaje); _socket.Send(bytesMensaje);
_contadorMensajesEnviados++; _nroMsjEnviados++;
Log.Information("Mensaje #{contador} para POSBIC {ip}", _contadorMensajesEnviados, _ip); Log.Information("Mensaje #{contador} para POSBIC {ip}", _nroMsjEnviados, _ip);
Log.Information("Esperando respuesta.."); Log.Information("Esperando respuesta..");
// Leer la respuesta del servidor // Leer la respuesta del servidor
...@@ -88,10 +144,10 @@ public class ClienteServidorPOSBC(string ip, int pto) ...@@ -88,10 +144,10 @@ public class ClienteServidorPOSBC(string ip, int pto)
// Obtener todos los datos recibidos como un arreglo de bytes // Obtener todos los datos recibidos como un arreglo de bytes
bufferEntrada = ms.ToArray(); bufferEntrada = ms.ToArray();
} }
_contadorMensajesRecibidos++; _nroMsjRecibidos++;
Log.Information("Respuesta #{contador} de POSBC {ip}", _contadorMensajesRecibidos, _ip); Log.Information("Respuesta #{contador} de POSBC {ip}", _nroMsjRecibidos, _ip);
Log.Debug("Mensaje - {msj}", _contadorMensajesRecibidos, _ip, Encoding.UTF8.GetString(bufferEntrada)); Log.Debug("Mensaje - {msj}", _nroMsjRecibidos, _ip, Encoding.UTF8.GetString(bufferEntrada));
Log.Debug(Util.DetalleMensajes(bufferEntrada)); Log.Debug(Util.DetalleMensajes(bufferEntrada));
return bufferEntrada; return bufferEntrada;
} }
......
using Serilog; using Serilog;
using Serilog.Core;
using Serilog.Events;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using EvaPosSCOSrv; using EvaPosSCOSrv;
using EvaPosSrvAplicacionImp; using EvaPosSrvAplicacionImp;
using EvaPosSrvRespImp; using EvaPosSrvRespImp;
using gatewaySCO.POSBC; using gatewaySCO.POSBC;
using GatewayCHEC.Servidor;
namespace GatewaySCO namespace GatewaySCO
{ {
public class TaskIdEnricher : ILogEventEnricher
{
public const string TaskIdPropertyName = "TaskId";
public void Enrich(LogEvent logEvent, ILogEventPropertyFactory propertyFactory)
{
var taskId = Task.CurrentId?.ToString() ?? "no-task";
logEvent.AddPropertyIfAbsent(propertyFactory.CreateProperty(TaskIdPropertyName, taskId));
}
}
/// <summary> /// <summary>
/// EvaPos-API : servidor api, sockets y rest. /// EvaPos-API : servidor api, sockets y rest.
/// Usa Serilog para bitácora. /// Usa Serilog para bitácora.
...@@ -106,12 +121,12 @@ namespace GatewaySCO ...@@ -106,12 +121,12 @@ namespace GatewaySCO
Entorno<EntornoPOSBC>.Instancia.get().PortPOSBC = configPOSBC.PortPOSBC; Entorno<EntornoPOSBC>.Instancia.get().PortPOSBC = configPOSBC.PortPOSBC;
// Activa cliente de conexión a POSBC. La conexión con el POSBC // Activa cliente de conexión a POSBC. La conexión con el POSBC
// se almacena en el entorno para uso posterior en la emisión y recepción de mensajes. // se almacena en el entorno para uso posterior en la emisión y recepción de mensajes.
Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC = new ClienteServidorPOSBC(configPOSBC.IpPOSBC, configPOSBC.PortPOSBC); //Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC = new ClienteServidorPOSBC(configPOSBC.IpPOSBC, configPOSBC.PortPOSBC);
Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC.AbreConexion(); //Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC.AbreConexion();
if (Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC.ConexionActiva == false) // if (Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC.ConexionActiva == false)
{ // {
throw new ApplicationException("Error en conexión al POSBC."); // throw new ApplicationException("Error en conexión al POSBC.");
} // }
break; break;
} }
default: default:
...@@ -120,7 +135,21 @@ namespace GatewaySCO ...@@ -120,7 +135,21 @@ namespace GatewaySCO
} }
} }
ActivaServidor(config.IpGateway, config.PortGateway, config.POS); //ActivaServidor(config.IpGateway, config.PortGateway, config.POS);
// Activa servidor Gateway.
try
{
string ipPOSBC = Entorno<EntornoPOSBC>.Instancia.get().IpPOSBC;
int ptoPOSBC = Entorno<EntornoPOSBC>.Instancia.get().PortPOSBC;
ServidorGatewayCHEC gateway = new(config.IpGateway, config.PortGateway, ipPOSBC, ptoPOSBC);
gateway.Activa();
}
catch (Exception e)
{
Log.Error("Excepción {e}", e.ToString);
Environment.Exit(1);
}
} }
// Servidor sockets: acepta peticiones de SCO CHEC. // Servidor sockets: acepta peticiones de SCO CHEC.
......
...@@ -300,9 +300,6 @@ namespace EvaPosSCOSrv ...@@ -300,9 +300,6 @@ namespace EvaPosSCOSrv
// su respeusta se remite sin cambios a CHEC. // su respeusta se remite sin cambios a CHEC.
byte[] mensajeEntrada = Util.ConcatenaArreglosBytes(bufferLongitud, bufferEntrada); byte[] mensajeEntrada = Util.ConcatenaArreglosBytes(bufferLongitud, bufferEntrada);
// Enviar mensaje de entrada a POSBC y retornar respuestas. // Enviar mensaje de entrada a POSBC y retornar respuestas.
byte[] bufferSalida = Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC.EnviaRecibe(mensajeEntrada);
// Remitir respuestas sin cambio a CHEC.
socket.Send(bufferSalida, 0, bufferSalida.Length, SocketFlags.None);
// Procesando entrada: se obtiene mensaje, con el cual se // Procesando entrada: se obtiene mensaje, con el cual se
// identifica comando que lo procesa. // identifica comando que lo procesa.
...@@ -312,6 +309,11 @@ namespace EvaPosSCOSrv ...@@ -312,6 +309,11 @@ namespace EvaPosSCOSrv
{ {
continua = false; continua = false;
} }
byte[] mensajeSalida = Entorno<EntornoPOSBC>.Instancia.get().ClientePOSBC.EnviaRecibe(mensajeEntrada);
// Remitir respuestas sin cambio a CHEC.
socket.Send(mensajeSalida, 0, mensajeSalida.Length, SocketFlags.None);
} }
else else
{ {
......
using System.ComponentModel;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Channels;
using gatewaySCO.POSBC;
using GatewaySCO;
using Serilog;
using Serilog.Events;
namespace GatewayCHEC.Servidor;
/// <summary>
/// Constructor servidor socket. Usa ip y puerto defualt.
/// El método IniciarAync() activa el servidor.
/// <param name="direccion">Dirección IP servidor, típicamente '127.0.0.1'.</param>
/// <param name="puerto">Número de puerto para el socket, default 11.000</param>
/// <returns>Retorna tipo Task.</returns>
/// </summary>
public class ServidorGatewayCHEC(string ipGateway, int ptoGateway, string ipPOSBC, int ptoPOSBC)
{
readonly static bool _isDebug = Log.IsEnabled(LogEventLevel.Debug);
/// <summary>
/// Dirección ip para vincular el socket.
/// </summary>
public string IpGateway { get; private set; } = ipGateway;
/// <summary>
/// Puerto a vincular socket.
/// </summary>
public Int32 PuertoGateway { get; private set; } = ptoGateway;
/// <summary>
/// Dirección ip servidor POSBC.
/// </summary>
public string IpPOSBC { get; private set; } = ipPOSBC;
/// <summary>
/// Puerto servidro POSBC
/// </summary>
public Int32 PuertoPOSBC { get; private set; } = ptoPOSBC;
/// <summary>
/// Longitud de la cola de conexión al socket.
/// </summary>
public int LongColaConexiones { get; private set; } = 10;
/// <summary>
/// Fija timeout usado para enviar / recibir em modo sincrónico, en milisegundos.
/// Si el timeout se excede, se presenta una excepción.
/// Default a 5 segundos.
/// </summary>
public int TimeOutSincMs { get; private set; } = 3000;
/// <summary>
/// Servidor POSBC
/// </summary>
static ClienteServidorPOSBC _posbc;
long _numeroConexionesEntrantes = 0;
static long _nroMsjEntrada = 0;
static long _nroMsjSalida = 0;
static Socket _socketCHEC;
// Canal de entrada.
private static readonly Channel<string> _canalEntrada = Channel.CreateUnbounded<string>();
// Canal de salida.
private static readonly Channel<string> _canalSalida = Channel.CreateUnbounded<string>();
/// <summary>
/// El POSBC debe ser activado antes de invocar el método Activa.
/// </summary>
/// <param name="ip"></param>
/// <param name="pto"></param>
private void ActivaPOSBC()
{
_posbc = new(IpPOSBC, PuertoPOSBC);
_posbc.AbreConexion();
}
public void Activa()
{
IPEndPoint ip = new(IPAddress.Parse(IpGateway), PuertoGateway);
int cont = 0;
// Clave usar 'using' para liberar correctamente recursos.
using Socket tcpSocket = new(
ip.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp);
// Configuración comportamiento socket tcp.
// No se permite a otro socket compartir el puerto.
// TODO - Investigar: cuando servidor y cliente están en la misma instanción de s.o.
// no comparten el puerto?.
// tcpSocket.ExclusiveAddressUse = true;
// El socket espera los segundos del parámetro para terminar de enviar
// datos (si hay en el buffer de salida), despues que se llama Socket.Close.
tcpSocket.LingerState = new LingerOption(true, 3);
// Desactiva algoritmo Nagle.
tcpSocket.NoDelay = true;
// Timeout entrada / salida.
tcpSocket.ReceiveTimeout = 0;
tcpSocket.SendTimeout = TimeOutSincMs;
//tcpSocket.Blocking = false;
tcpSocket.Bind(ip);
tcpSocket.Listen(LongColaConexiones);
// Id proceso. Compilación difiere según .net usado.
int idProceso = -1;
#if NETFRAMEWORK
idProceso = Process.GetCurrentProcess().Id;
#elif (NETSTANDARD || NET5_0_OR_GREATER)
idProceso = Environment.ProcessId;
#endif
Log.Information("Gateway servidor socket en {ip} : {puerto}, proceso id {id}", IpGateway, PuertoGateway, idProceso);
if (_isDebug)
{
Log.Debug("Versión framework {version} #6", Environment.Version);
Log.Debug("Tcp Socket configuración:");
Log.Debug($" Blocking {tcpSocket.Blocking}");
Log.Debug($" ExclusiveAddressUse {tcpSocket.ExclusiveAddressUse}");
Log.Debug($" LingerState {tcpSocket.LingerState.Enabled}, {tcpSocket.LingerState.LingerTime}");
Log.Debug($" NoDelay {tcpSocket.NoDelay}");
Log.Debug($" ReceiveBufferSize {tcpSocket.ReceiveBufferSize}");
Log.Debug($" ReceiveTimeout {tcpSocket.ReceiveTimeout}");
Log.Debug($" SendBufferSize {tcpSocket.SendBufferSize}");
Log.Debug($" SendTimeout {tcpSocket.SendTimeout}");
Log.Debug($" Ttl {tcpSocket.Ttl}");
Log.Debug($" IsBound {tcpSocket.IsBound}");
}
try
{
// Activa el POSBC
if (_posbc == null) ActivaPOSBC();
var tareaProcesaMsj = Task.Run(() => ProcesaEntradaCHEC());
var tareaRecibePOSBC = Task.Run(() => _posbc.Recibe(_canalSalida));
var tareaRemiteRespuesta = Task.Run(() => ProcesaSalidaPOSBC());
// Procesa conexiones al Gateway.
while (true)
{
Log.Information("Esperando conexión");
Socket cliente = tcpSocket.Accept();
Log.Information("Conexión remota ip {ip} en puerto {pto}", IPAddress.Parse(((IPEndPoint)cliente.RemoteEndPoint).Address.ToString()), ((IPEndPoint)cliente.RemoteEndPoint).Port.ToString());
_ = Task.Run(() => AceptaEntradas(cliente));
}
}
finally
{
tcpSocket.Close();
_posbc.CierraConexion();
}
}
public static async Task AceptaEntradas(Socket cliente)
{
var taskId = Task.CurrentId?.ToString() ?? "no-task";
_socketCHEC = cliente;
Log.Debug("{tid} Tarea acepta entradas iniciada", taskId);
while (true)
{
// Lee longitud mensaje entrante, 4 bytes.
Log.Information("{tid} Esperando CHEC", taskId);
var bufferLongitud = new byte[4];
// Lee los primeros 4 bytes de los datos de entrada, los cuales indican
// la longitud del resto del mensaje.
int nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferLongitud), SocketFlags.None);
// Si el número de bytes leidos es cero, la conexión se ha cerrado.
// El método Receive es sincróno, luego si retorna sin datos, es que no hay conexión.
// Mientras tenga conexión, se queda esperando datos.
if (nroBytesLeidos == 0) break;
uint longitudMensaje = Util.LongitudMensaje(bufferLongitud);
Log.Debug("{tid} CHECK bytes cabecera {nroBytes}, longitud mensaje {longitud}", taskId, nroBytesLeidos, longitudMensaje);
// Prepara buffer de entrada según la longitud esperada del mensaje.
var bufferMensaje = new byte[longitudMensaje];
//nroBytesLeidos = cliente.Receive(bufferMensaje, 0, bufferMensaje.Length, SocketFlags.None);
nroBytesLeidos = await cliente.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
Log.Debug("{tid} CHECK bytes mensaje {nroBytes}", taskId, nroBytesLeidos);
_nroMsjEntrada++;
Log.Information("{tid} CHEC -> Gateway {nro} - {bytes} bytes cuerpo", taskId, _nroMsjEntrada, nroBytesLeidos);
// Arreglo de bytes de entrada se transforma en string,
// constituye el mensaje.
string mensaje = Encoding.UTF8.GetString(bufferMensaje, 0, nroBytesLeidos);
Log.Verbose("{tid} CHEC -> Gateway mensaje\n{msj}", taskId, mensaje);
// Se agrega mensaje a la cola de entrada de mensajes.
await _canalEntrada.Writer.WriteAsync(mensaje);
Log.Debug("{tid} mensaje -> [cola entrada]", taskId);
}
cliente.Close();
}
private static async Task ProcesaEntradaCHEC()
{
var taskId = Task.CurrentId?.ToString() ?? "no-task";
Log.Debug("{tid} Tarea procesa entrada CHEC inicia", taskId);
while (true)
{
// Espera por mensajes en cola.
string mensaje = await _canalEntrada.Reader.ReadAsync();
Log.Verbose("Procesa mensaje de [cola entrada CHEC]\n{msj}", mensaje);
//
//
// PROCESA EL MENSAJE CHEC -> GATEWAY
//
//
// Remita mensaje procesado al POSBC
await _posbc.Envia(mensaje);
}
}
private static async Task ProcesaSalidaPOSBC()
{
var taskId = Task.CurrentId?.ToString() ?? "no-task";
Log.Debug("{tid} Tarea procesa salida POSBC inicia", taskId);
while (true)
{
string mensaje = await _canalSalida.Reader.ReadAsync();
Log.Verbose("Procesa mensaje de [cola salida POSBC]\n{msj}", mensaje);
//
//
// PROCESA MENSAJE POSBC --> CHEC
//
//
byte[] bufferSalida = Util.ConvierteEnBufferBytes(mensaje);
await _socketCHEC.SendAsync(bufferSalida);
_nroMsjSalida++;
Log.Information("{tid} CHEC <- Gateway {nro} - {bytes} bytes total", taskId, _nroMsjSalida, bufferSalida.Length);
Log.Verbose("{tid} CHEC <- Gateway mensaje\n{msj}", taskId, mensaje);
}
}
}
\ No newline at end of file
...@@ -4,14 +4,14 @@ ...@@ -4,14 +4,14 @@
"POS_comment": "Indicates the set of commands to instantiate, according to the type of POS: evapos, tests, gk, etc.", "POS_comment": "Indicates the set of commands to instantiate, according to the type of POS: evapos, tests, gk, etc.",
"IpGateway": "127.0.0.1", "IpGateway": "127.0.0.1",
"IpGateway_comment": "Gateway IP, local or remote", "IpGateway_comment": "Gateway IP, local or remote",
"PortGateway": 6690, "PortGateway": 6697,
"PortGateway_comment": "Gateway IP Port", "PortGateway_comment": "Gateway IP Port",
"Language": "es", "Language": "es",
"Language_comment": "Language code as needed by the POS application" "Language_comment": "Language code as needed by the POS application"
}, },
"POSBC": { "POSBC": {
"IpPOSBC": "127.0.0.1", "IpPOSBC": "127.0.0.1",
"PortPOSBC": 6697 "PortPOSBC": 6698
}, },
"DataGK": { "DataGK": {
"IpGkSmartPOS": "10.10.117.10", "IpGkSmartPOS": "10.10.117.10",
...@@ -33,12 +33,12 @@ ...@@ -33,12 +33,12 @@
"Serilog.Sinks.Console", "Serilog.Sinks.Console",
"Serilog.Sinks.File" "Serilog.Sinks.File"
], ],
"MinimumLevel": "Debug", "MinimumLevel": "Verbose",
"WriteTo": [ "WriteTo": [
{ {
"Name": "Console", "Name": "Console",
"Args": { "Args": {
"restrictedToMinimumLevel": "Information" "restrictedToMinimumLevel": "Verbose"
} }
}, },
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
<PackageReference Include="Serilog.Settings.Configuration" Version="7.0.0" /> <PackageReference Include="Serilog.Settings.Configuration" Version="7.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" /> <PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.File" Version="5.0.0" /> <PackageReference Include="Serilog.Sinks.File" Version="5.0.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="8.0.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using GatewaySCO;
namespace gatewayPruebaECO; namespace gatewayPruebaECO;
// Cliente de pruebas del gateway, remite mensajes, espera respuesta. // Cliente de pruebas del gateway, remite mensajes, espera respuesta.
// public class SocketClientECO
// {
// private const int Port = 6697; // Puerto del servidor gateway
// private const string Server = "127.0.0.1"; // Dirección IP del servidor
// // Crear un socket
// static Socket socket = new(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// static int numeroRespuestas = 0;
// public static void Main()
// {
// try
// {
// // Conectarse al servidor
// socket.Connect(new IPEndPoint(IPAddress.Parse(Server), Port));
// Console.WriteLine("Conectado al servidor.");
// var tareaRemiteRespuesta = Task.Run(() => EsperarMensajesAsync());
// string mensaje1 = """
// soeps~Message-Type=REQ|Session-Id=400|~<?xml version="1.0" encoding="UTF-8"?><scsns:Initialize xmlns:scsns="http://bc.si.retail.ibm.com/POSBCSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://bc.si.retail.ibm.com/POSBCSchema C:\PosBc\POSBCSchema_main.xsd">
// <InitializeRequest>
// <OperatorID>NO_DEFAULT</OperatorID>
// <TerminalNumber>400</TerminalNumber>
// <Recovery>false</Recovery>
// </InitializeRequest>
// </scsns:Initialize>
// """;
// Mensaje(mensaje1);
// string mensaje2 = """
// soeps~Message-Type=REQ|Session-Id=400|~<?xml version="1.0" encoding="UTF-8"?><scsns:Initialize xmlns:scsns="http://bc.si.retail.ibm.com/POSBCSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://bc.si.retail.ibm.com/POSBCSchema C:\PosBc\POSBCSchema_main.xsd">
// <InitializeRequest>
// <OperatorID>NO_DEFAULT</OperatorID>
// <TerminalNumber>800</TerminalNumber>
// <Recovery>false</Recovery>
// </InitializeRequest>
// </scsns:Initialize>
// """;
// Mensaje(mensaje2);
// string mensaje3 = """
// soeps~Message-Type=REQ|Session-Id=400|~<?xml version="1.0" encoding="UTF-8"?><scsns:Initialize xmlns:scsns="http://bc.si.retail.ibm.com/POSBCSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://bc.si.retail.ibm.com/POSBCSchema C:\PosBc\POSBCSchema_main.xsd">
// <InitializeRequest>
// <OperatorID>NO_DEFAULT</OperatorID>
// <TerminalNumber>900</TerminalNumber>
// <Recovery>false</Recovery>
// </InitializeRequest>
// </scsns:Initialize>
// """;
// Mensaje(mensaje3);
// // Cerrar el socket
// //socket.Shutdown(SocketShutdown.Both);
// //socket.Close();
// }
// catch (SocketException e)
// {
// Console.WriteLine($"Error de socket: {e.Message}");
// }
// catch (Exception e)
// {
// Console.WriteLine($"Error: {e.Message}");
// }
// }
// public static void Mensaje(string mensaje)
// {
// // Enviar un mensaje al servidor
// socket.Send(Salida(mensaje));
// Console.WriteLine($"Mensaje #1 enviado: {mensaje}");
// Random random = new Random();
// int delay = random.Next(100, 5001); // Tiempo de espera aleatorio entre 0.1 y 5 segundos
// Console.WriteLine($"Espera {delay}ms para enviar siguiente mensaje...");
// Task.Delay(delay).Wait(); // Espera asincrónica
// }
// public static void Respuesta()
// {
// // Recibir la respuesta del servidor
// byte[] buffer = new byte[4096];
// int bytesRead = socket.Receive(buffer);
// numeroRespuestas++;
// string response = Encoding.UTF8.GetString(buffer, 0, bytesRead);
// Console.WriteLine($"Respuesta #{numeroRespuestas} recibida: {response}");
// }
// public static async Task EsperarMensajesAsync()
// {
// Console.WriteLine("Eperando mensajes...");
// byte[] buffer = new byte[4096];
// while (true)
// {
// try
// {
// int bytesRead = await socket.ReceiveAsync(buffer, SocketFlags.None);
// if (bytesRead > 0)
// {
// numeroRespuestas++;
// string response = Encoding.UTF8.GetString(buffer, 0, bytesRead);
// Console.WriteLine($"Respuesta #{numeroRespuestas} recibida: {response}");
// }
// }
// catch (SocketException e)
// {
// Console.WriteLine($"Error de socket: {e.Message}");
// break;
// }
// catch (Exception e)
// {
// Console.WriteLine($"Error: {e.Message}");
// break;
// }
// }
// }
// public static byte[] Salida(string trama)
// {
// // Codifica longitud del mensaje en los 4 primeros bytes.
// var longitud = Convert.ToUInt32(trama.Length);
// byte[] bytesConLongMensaje = BitConverter.GetBytes(longitud);
// // Bytes mas significativos deben ir primero, usa 'big-endian'.
// if (BitConverter.IsLittleEndian)
// Array.Reverse(bytesConLongMensaje);
// // Codifica en bytes texto del mensaje.
// byte[] bytesConMensaje = Encoding.UTF8.GetBytes(trama);
// // Copia los 2 arreglos de bytes en un arreglo unificado.
// byte[] bytes = new byte[bytesConLongMensaje.Length + bytesConMensaje.Length];
// Buffer.BlockCopy(bytesConLongMensaje, 0, bytes, 0, bytesConLongMensaje.Length);
// Buffer.BlockCopy(bytesConMensaje, 0, bytes, bytesConLongMensaje.Length, bytesConMensaje.Length);
// return bytes;
// }
// }
public class SocketClientECO public class SocketClientECO
{ {
private const int Port = 6697; // Puerto del servidor gateway private const int Port = 6697; // Puerto del servidor
private const string Server = "127.0.0.1"; // Dirección IP del servidor private const string Server = "127.0.0.1"; // Dirección IP del servidor
// Crear un socket public static async Task Main(string[] args)
static Socket socket = new(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
static int numeroRespuestas = 0;
public static void Main()
{ {
try try
{ {
// Conectarse al servidor using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect(new IPEndPoint(IPAddress.Parse(Server), Port)); await socket.ConnectAsync(Server, Port);
Console.WriteLine("Conectado al servidor."); Console.WriteLine("Cliente ECO #1 Conectado al servidor.");
// Enviar 4 mensajes de texto en formato binario
string mensaje1 = """ for (int i = 1; i <= 4; i++)
soeps~Message-Type=REQ|Session-Id=400|~<?xml version="1.0" encoding="UTF-8"?><scsns:Initialize xmlns:scsns="http://bc.si.retail.ibm.com/POSBCSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://bc.si.retail.ibm.com/POSBCSchema C:\PosBc\POSBCSchema_main.xsd"> {
<InitializeRequest> string message = $"Mensaje {i} desde el cliente CHEC";
<OperatorID>NO_DEFAULT</OperatorID>
<TerminalNumber>400</TerminalNumber> byte[] bufferMsj = Util.ConvierteEnBufferBytes(message);
<Recovery>false</Recovery>
</InitializeRequest> await socket.SendAsync(new ArraySegment<byte>(bufferMsj), SocketFlags.None);
</scsns:Initialize> Console.WriteLine($"Mensaje enviado: {message}");
"""; }
Mensaje(mensaje1); // Recibir mensajes de forma asincrónica en un ciclo infinito
Respuesta(); await RecibirMensajes(socket);
string mensaje2 = """
soeps~Message-Type=REQ|Session-Id=400|~<?xml version="1.0" encoding="UTF-8"?><scsns:Initialize xmlns:scsns="http://bc.si.retail.ibm.com/POSBCSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://bc.si.retail.ibm.com/POSBCSchema C:\PosBc\POSBCSchema_main.xsd">
<InitializeRequest>
<OperatorID>NO_DEFAULT</OperatorID>
<TerminalNumber>800</TerminalNumber>
<Recovery>false</Recovery>
</InitializeRequest>
</scsns:Initialize>
""";
Mensaje(mensaje2);
Respuesta();
string mensaje3 = """
soeps~Message-Type=REQ|Session-Id=400|~<?xml version="1.0" encoding="UTF-8"?><scsns:Initialize xmlns:scsns="http://bc.si.retail.ibm.com/POSBCSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://bc.si.retail.ibm.com/POSBCSchema C:\PosBc\POSBCSchema_main.xsd">
<InitializeRequest>
<OperatorID>NO_DEFAULT</OperatorID>
<TerminalNumber>900</TerminalNumber>
<Recovery>false</Recovery>
</InitializeRequest>
</scsns:Initialize>
""";
Mensaje(mensaje3);
Respuesta();
// Cerrar el socket
socket.Shutdown(SocketShutdown.Both);
socket.Close();
} }
catch (SocketException e) catch (SocketException e)
{ {
...@@ -76,37 +181,33 @@ public class SocketClientECO ...@@ -76,37 +181,33 @@ public class SocketClientECO
} }
} }
public static void Mensaje(string mensaje) private static async Task RecibirMensajes(Socket socket)
{ {
// Enviar un mensaje al servidor byte[] buffer = new byte[4096];
socket.Send(Salida(mensaje));
Console.WriteLine($"Mensaje #1 enviado: {mensaje}");
}
public static void Respuesta() while (true)
{ {
// Recibir la respuesta del servidor int bytesRead = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
byte[] buffer = new byte[4096]; if (bytesRead > 0)
int bytesRead = socket.Receive(buffer); {
numeroRespuestas++; // Obtener la longitud del mensaje
string response = Encoding.UTF8.GetString(buffer, 0, bytesRead); byte[] lengthBytes = new byte[4];
Console.WriteLine($"Respuesta #{numeroRespuestas} recibida: {response}"); Array.Copy(buffer, 0, lengthBytes, 0, 4);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(lengthBytes);
} }
public static byte[] Salida(string trama) int messageLength = BitConverter.ToInt32(lengthBytes, 0);
string response = Encoding.UTF8.GetString(buffer, 4, messageLength);
Console.WriteLine($"Respuesta recibida: {response}");
}
else
{ {
// Codifica longitud del mensaje en los 4 primeros bytes. break; // La conexión se ha cerrado
var longitud = Convert.ToUInt32(trama.Length); }
byte[] bytesConLongMensaje = BitConverter.GetBytes(longitud); }
// Bytes mas significativos deben ir primero, usa 'big-endian'.
if (BitConverter.IsLittleEndian)
Array.Reverse(bytesConLongMensaje);
// Codifica en bytes texto del mensaje.
byte[] bytesConMensaje = Encoding.UTF8.GetBytes(trama);
// Copia los 2 arreglos de bytes en un arreglo unificado.
byte[] bytes = new byte[bytesConLongMensaje.Length + bytesConMensaje.Length];
Buffer.BlockCopy(bytesConLongMensaje, 0, bytes, 0, bytesConLongMensaje.Length);
Buffer.BlockCopy(bytesConMensaje, 0, bytes, bytesConLongMensaje.Length, bytesConMensaje.Length);
return bytes;
} }
} }
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using GatewaySCO;
namespace gatewayPruebasECO_POSBC; namespace gatewayPruebasECO_POSBC;
...@@ -18,7 +19,7 @@ public class SocketServer ...@@ -18,7 +19,7 @@ public class SocketServer
listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Any, Port)); listener.Bind(new IPEndPoint(IPAddress.Any, Port));
listener.Listen(10); listener.Listen(10);
Console.WriteLine($"Servidor ECO_POSBC escuchando en el puerto {Port}..."); Console.WriteLine($"Servidor #1 ECO_POSBC escuchando en el puerto {Port}...");
// Aceptar una conexión de cliente // Aceptar una conexión de cliente
using Socket clientSocket = listener.Accept(); using Socket clientSocket = listener.Accept();
...@@ -35,6 +36,22 @@ public class SocketServer ...@@ -35,6 +36,22 @@ public class SocketServer
// Enviar el mismo mensaje de vuelta al cliente // Enviar el mismo mensaje de vuelta al cliente
clientSocket.Send(buffer, bytesRead, SocketFlags.None); clientSocket.Send(buffer, bytesRead, SocketFlags.None);
Console.WriteLine("Mensaje enviado de vuelta al cliente."); Console.WriteLine("Mensaje enviado de vuelta al cliente.");
// Enviar mensajes aleatorios adicionales
Random random = new Random();
int additionalMessages = random.Next(0, 6); // Número aleatorio entre 0 y 5
for (int i = 1; i <= additionalMessages; i++)
{
int delay = random.Next(100, 5001); // Tiempo de espera aleatorio entre 0.1 y 5 segundos
Task.Delay(delay).Wait(); // Espera asincrónica
string additionalMessage = $"mensaje aleatorio {i} después de {delay}ms";
byte[] additionalData = Util.ConvierteEnBufferBytes(additionalMessage);
clientSocket.Send(additionalData, SocketFlags.None);
Console.WriteLine($"Mensaje adicional enviado: {additionalMessage}");
Console.WriteLine(Util.DetalleMensajes(additionalData));
}
} }
} }
catch (SocketException e) catch (SocketException e)
...@@ -49,3 +66,76 @@ public class SocketServer ...@@ -49,3 +66,76 @@ public class SocketServer
} }
} }
// using System;
// using System.Net;
// using System.Net.Sockets;
// using System.Text;
// using System.Threading.Tasks;
// public class SocketServer
// {
// private const int Port = 6698; // Puerto en el que el servidor escuchará
// private static readonly Random Random = new Random();
// public static async Task Main()
// {
// Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// listener.Bind(new IPEndPoint(IPAddress.Any, Port));
// listener.Listen(10);
// Console.WriteLine($"POSBC #4 Servidor escuchando en el puerto {Port}...");
// while (true)
// {
// Socket clientSocket = await listener.AcceptAsync();
// _ = Task.Run(() => HandleClientAsync(clientSocket));
// }
// }
// private static async Task HandleClientAsync(Socket clientSocket)
// {
// while (true)
// {
// // Leer los primeros 4 bytes para obtener la longitud del mensaje
// byte[] bufferCabecera = new byte[4];
// int bytesRead = await clientSocket.ReceiveAsync(new ArraySegment<byte>(bufferCabecera), SocketFlags.None);
// if (bytesRead == 0) break; // La conexión se ha cerrado
// uint longitudMsj = Util.LongitudMensaje(bufferCabecera);
// byte[] bufferMensaje = new byte[longitudMsj];
// // Leer el mensaje completo basado en la longitud
// bytesRead = await clientSocket.ReceiveAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
// if (bytesRead == 0) break; // La conexión se ha cerrado
// string message = Encoding.UTF8.GetString(bufferMensaje);
// Console.WriteLine($"Mensaje recibido: >{message}< longitud en cabecera {longitudMsj}, longitud del texto {message.Length}");
// // Enviar el mismo mensaje de vuelta al clientA
// bufferMensaje = Util.ConvierteEnBufferBytes(message);
// await clientSocket.SendAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
// Console.WriteLine("Mensaje enviado de vuelta al cliente >{msj}<", message);
// //await EnviarMensaje(clientSocket, message);
// // Enviar de 0 a 3 mensajes adicionales
// int additionalMessages = Random.Next(0, 4);
// Console.WriteLine("Número mensajes aleatorioes de respuesta {n}", additionalMessages);
// for (int i = 1; i <= additionalMessages; i++)
// {
// int delay = Random.Next(100, 3001);
// await Task.Delay(delay);
// string additionalMessage = $"{message} - Mensaje aleatorio {i} - generado POSBC";
// bufferMensaje = Util.ConvierteEnBufferBytes(additionalMessage);
// await clientSocket.SendAsync(new ArraySegment<byte>(bufferMensaje), SocketFlags.None);
// Console.WriteLine($"Mensaje adicional enviado: {additionalMessage}");
// }
// }
// clientSocket.Close();
// }
// }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment