Title | Phases 234 - prac1 |
---|---|
Course | Sistemas distribuidos |
Institution | Universitat Oberta de Catalunya |
Pages | 16 |
File Size | 349.5 KB |
File Type | |
Total Downloads | 151 |
Total Views | 642 |
Distributed Systems1. IntroducciónLa práctica consiste en el estudio de un proyecto proporcionado junto con el enunciado de lamisma, basado en un ejemplo práctico de implementación del protocolo TSAE (“Time StampedAnti-Entropy) para la gestión y replicación de operaciones entre un grupo de nodos.Con...
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019
Distributed Systems
1.
Introducción
La práctica consiste en el estudio de un proyecto proporcionado junto con el enunciado de la misma, basado en un ejemplo práctico de implementación del protocolo TSAE (“Time Stamped Anti-Entropy) para la gestión y replicación de operaciones entre un grupo de nodos. Con el enunciado se han proporcionado todos los paquetes y clases necesarios para construir el aplicativo, si bien éste está incompleto. La práctica se divide en las fases 2, 3, y 4.1, estableciéndose unos objetivos a lograr en cada una para obtener favorablemente la nota correspondiente a su evaluación. La práctica se divide en fases, debiéndose alcanzar los objetivos establecidos para cada una con objeto de obtener la correspondiente calificación favorable. Para comprender mejor el funcionamiento del entorno experimental en el que se desarrollará la práctica, a continuación se describe el árbol de ejecución de los procesos que constituyen el sistema: El experimento se inicia mediante la ejecución del script “start.sh”, al que se le pasan los parámetros básicos de definición del entorno, y que junto con los parámetros obtenidos desde el fichero “config.properties” establecerán las condiciones de contorno para nuestras simulaciones. Desde el script se iniciará, por una parte el proceso del TestServer, encargado de orquestar la formación del grupo de nodos participantes, y que a su vez iniciará el TestServerExperimentManager, encargado de modular las simulaciones. Por otra parte, el script lanzará tantas instancias de procesos Server como se hayan indicado en los parámetros, que a su vez cada uno de ellos invocará la clase TSAESessionOriginatorSide para planificar la ejecución del inicio de sesiones TSAE de forma periódica, y por otra parte invocará la clase TSAESessionPartnerSide para iniciar un hilo que permanecerá escuchando en un socket para atender los mensajes recibidos.
1
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019
2.
Phase 2
La fase 2 de la práctica consistirá en implementar el protocolo TSAE (sin acks ) descrito en la , entre pares sección 5.1.2 del artículo “Weak-consistency group communication and membership ”[1] con sincronización de relojes lógicos (loosely-synchronized clocks ), y emitiendo sólo operaciones de agregación (add ), y sin purgado del registro de operaciones (logs ). Para asegurar que la implementación desarrollada es conforme con las clases TSAESessionOriginatorSide y TSAESessionPartnerSide, el código resultante deberá seguir las plantillas indicadas en el enunciado de la práctica. El protocolo propuesto por el citado artículo para implementar las sesiones de anti-entropía entre dos pares, se puede resumir en el intercambio de mensajes entre pares descrito en la siguiente tabla: TSAESessionOriginatorSide
TSAESessionPartnerSide
Envía A E_REQUEST con vectores
Recibe AE_REQUEST con vectores
Message
msg = (Message) i n.readObject();
msg = new MessageAErequest(localSummary, localAck);
Recibe A E_REQUEST con vectores
Envía AE_REQUEST con vectores
msg = (Message) in.readObject();
msg = n ew MessageAErequest(localSummary, localAck );
Envía O PERATION con operaciones diferentes
Recibe OPERATION con operaciones diferentes
out.writeObject(new MessageOperation(op));
msg = (Message) i n.readObject();
Recibe O PERATION con operaciones diferentes
Envía OPERATION con operaciones diferentes
msg = (Message) in.readObject();
ew MessageOperation(op)); out.writeObject(n
Envía E ND_TSAE y recibe confirmación
Envía END_TSAE y recibe confirmación
msg = n ew MessageEndTSAE(); msg = (Message) in .readObject()
msg = n ew MessageEndTSAE(); msg = (Message) in .readObject()
Actualiza vectores con respecto al partner
Actualiza vectores con respecto al originador
serverData.getSummary().updateMax(msgAe.getSummary());
serverData.getSummary().updateMax(msgAe.getSummary());
serverData.getAck().updateMax(msgAe.getAck());
serverData.getAck().updateMax(m sgAe.getAck());
Procesa cola de operaciones recibidas
Procesa cola de operaciones recibidas
p.getOperation()) serverData.deliverOperation((AddOperation) o
p.getOperation()) serverData.deliverOperation((AddOperation) o
2.1.
Implementación
Con carácter general, hemos implementado aquellas operaciones que manipulan las estructuras de serverData con métodos de tipo sincronizado (synchronized ), es decir, que solamente un subproceso puede acceder a dicho método a la vez. Ello evita que dos acciones emitidas por hilos de ejecución diferentes interfieran entre sí al intentar modificar estas estructuras de forma concurrente, debiendo esperar el resto de hilos a que finalice el proceso que bloquea la estructura con acceso exclusivo. Atendiendo al objetivo del presente informe, a continuación se comentan sólo los extractos de las clases y métodos que han sido necesarios modificar para alcanzar los objetivos planteados por el 2
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 enunciado de la práctica: [recipes_services.tsae.data_structures.Log] Con respecto al código desarrollado para la fase 1, hemos tenido que preparar esta clase para recibir la lista de operaciones desde otros nodos, y determinar aquéllas que se deben agregar al registro local mediante la comparación de sus marcas de tiempo (timestamps ) con las conocidas en el resumen (summary): public synchronized List listNewer(TimestampVector sum ){ List ops = new ArrayList(); // get all the keys = hosts from hashmap Set hosts = log.keySet(); for (String host : hosts) { //t = last timestamp for index host in sumVector Timestamp tRef = s um. getTimestampVector().get(host); //get the list of ops from a host List l ogOp = log.get(host); //iterate over list of operations from host(i) and compare if timestamp is new than tRef.-> add to list for (int i = 0; i < logOp .size(); i+ +) { if (logOp . get(i) .getTimestamp().compare(tRef) > 0){ ops.add(logOp.get(i) ); } } //Sort of the ops list Collections.sort ( ops, n ew Comparator() { @Override public int compare(Operation o1 , Operation o2 ) { return (int) o 1. getTimestamp().compare(o2.getTimestamp()); } }); } return ops; }
[recipes_services.tsae.data_structures.TimestampVector] Con respecto al código desarrollado para la fase 1, hemos tenido que completar el desarrollo de esta clase para soportar todos los métodos necesarios para implementar el protocolo TSAE. El método updateMax(Timestampvector tsVector) recorre los resúmenes de cada host del grupo, y actualiza el resumen del propio host con los valores más altos encontrados para cada host: public synchronized void updateMax(TimestampVector tsVector){ if( t sVector != null ){ Set hosts = this.timestampVector.keySet(); for (String host : hosts) { Timestamp max = tsVector.t imestampVector.get(host); if( m ax != null ) { //replace if the timestampVector is higher than the current if ( this.timestampVector .get(h ost).compare(max) < 0 ) { timestampVector.replace(host, max); } } } } }
El método clone() genera una copia de la estructura de resumen, que será usada por el resto de clases para realizar ciertas operaciones que no requieren de la manipulación directa de la estructura en serverData:
3
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 public TimestampVector clone(){ Set h osts = t imestampVector.keySet(); ConcurrentHashMap c opy = n ew ConcurrentHashMap(); for (String h ost : hosts ) { copy.put(host, timestampVector.get(host)); } List h ostsList = new ArrayList (hosts); TimestampVector tsVec = n ew TimestampVector(hostsList); tsVec.setTimestampVector(copy); return tsVec; }
El método getLatest(String host) devuelve la estructura de resumen para un host determinado: public Timestamp getLatest(String host) { return this .timestampVector.get(host); }
[recipes_services.ServerData] Esta clase contiene las estructuras de datos del servidor necesarias para implementar el protocolo TSAE (log, summary, ack ), así como para soportar la aplicación de recetas (recipes ), que en esta fase sólo dará soporte a las operaciones de adición (add ) . Estas estructuras son instancias de la clase [java.util.concurrent.concurrentHashMap ], que permite concurrencia de operaciones para evitar el bloqueo entre hilos al acceder a éstas. Por ello es necesario implementar métodos sincronizados para los accesos que realicen cambios sobre estas estructuras. El método deliverOperation(AddOperation op) evalúa la incorporación de una operación recibida desde un nodo remoto al registro (log ) del nodo local: public synchronized void deliverOperation(AddOperation op) { if (this.log .add(op)) { this.recipes.add(op.getRecipe()); } }
[recipes_services.tsae.sessions.TSAESessionOriginatorSide] Esta clase implementa la parte del protocolo correspondiente al iniciador de la sesión de anti-entropía, tal como se describe más arriba en la introducción de esta fase 2. Para asegurar su compatibilidad con el resto del sistema, su desarrollo se ha realizado conforme a la plantilla de pseudocódigo descrita en el enunciado de la práctica. Para evitar la concurrencia de operaciones de modificación sobre las estructuras de serverData, instanciamos de forma síncrona sendas copias de dichas estructuras: synchronized (serverData) { // Initiate localSummary with a copy of the summary in the serverData server localSummary = serverData.getSummary().clone(); serverData. getAck().update(serverData.getId(), localSummary); // Initiate localAck with a copy of the acknowledge vector in the serverData server. localAck = s erverData.getAck().clone(); }
4
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 Evaluamos si el mensaje recibido es del tipo OPERATION para añadir la operación recibida a la lista que será procesada localmente con posterioridad: while (msg.type() == MsgType.OPERATION){ // Add received operation to queue-list of pending operations listOperations. add((MessageOperation) msg); msg = (Message) in.readObject(); lsim.log(Level.TRACE , "[TSAESessionOriginatorSide] [session: "+c urrent_session_number+" ] received message: "+msg); }
Evaluamos el mensaje recibido es del tipo A E_REQUEST para intercambiar los vectores de resumen y reconocimiento (si bien estos últimos no se tendrán en cuenta en la fase 2), evaluar las operaciones que no conoce la otra parte, y enviárselas: if (msg .type() == MsgType.AE_REQUEST){ MessageAErequest msgAe = (MessageAErequest) m sg; List n ewLogs = s erverData.getLog().listNewer(msgAe.getSummary()); //send operations for (Operation op : newLogs) { out.writeObject(new MessageOperation(op)); } }
Evaluamos si el mensaje recibido es del tipo E ND_TSAE , para finalizar la sesión de anti-entropia y actualizar de forma síncrona las estructuras de serverData , estaleciendo los vectores con los valores más altos de entre los conocidos, y añadiendo las operaciones que no conoce localmente: // catch message for ending the TSAE session if (msg .type() == MsgType.END_TSAE){ // deliver serverData structures (ADD / REMOVE) concurrently using synchronized method synchronized (serverData ){ for (MessageOperation op : listOperations) { if (op.getOperation().getType() == OperationType.ADD) { serverData.deliverOperation( (AddOperation) op.getOperation() ); } } //Update summary serverData.getSummary().updateMax(msgAe.getSummary()); serverData.getAck().updateMax(msgAe.getAck()); serverData.getLog().purgeLog(serverData.getAck()); } }
[recipes_services.tsae.sessions.TSAESessionPartnerSide] Esta clase implementa la parte del protocolo correspondiente a la contraparte de la sesión de anti-entropía, tal como se describe más arriba en la introducción de esta fase 2. Para asegurar su compatibilidad con el resto del sistema, su desarrollo se ha realizado conforme a la plantilla de pseudocódigo descrita en el enunciado de la práctica. Para evitar la concurrencia de operaciones de modificación sobre las estructuras de serverData, instanciamos de forma síncrona sendas copias de dichas estructuras: synchronized (serverData) { // Initiate localSummary with a copy of the summary in the serverData server. localSummary = serverData.getSummary().clone(); serverData. getAck().update(serverData.getId(), localSummary); // Initiate localAck with a copy of the acknowledge vector in the serverData server. localAck = s erverData.getAck().clone(); }
5
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 Enviamos al originador las operaciones que éste no conoce, mediante un mensaje del tipo OPERATION: List n ewLogs = serverData.getLog().listNewer(msgAe.getSummary()); for (Operation op : newLogs ) { out. writeObject(new MessageOperation(op)); msg. setSessionNumber(current_session_number); // send an operation to the partner out. writeObject(msg); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+current_session_number+" ] sent message: "+ msg); }
Enviamos al originador un mensaje del tipo A E_REQUEST para intercambiar los vectores de resumen y reconocimiento (si bien este último no se tendrá en cuenta en la fase 2): msg = new MessageAErequest(localSummary, localAck); msg. setSessionNumber(current_session_number); out. writeObject(msg); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+c urrent_session_number+"] sent message: "+ msg);
Evaluamos si el mensaje recibido es del tipo O PERATION para añadir la operación recibida a la lista que será procesada localmente con posterioridad: List listOperations = n ew ArrayList(); msg = (Message) in.readObject(); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+c urrent_session_number+"] received message: "+ msg); while (msg.type() == MsgType.OPERATION){ // add received operation to queue-list of pending operations listOperations. add((MessageOperation) msg); // process next message msg = (Message) in.readObject(); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+current_session_number+" ] received message: "+ msg); }
Evaluamos si el mensaje recibido es del tipo E ND_TSAE para finalizar la sesión respondiendo también con un mensaje la operación recibida a la lista que será procesada localmente con posterioridad // catch message for ending the TSAE session if (msg .type() == MsgType.END_TSAE){ msg = n ew MessageEndTSAE(); msg. setSessionNumber(current_session_number); // send "end of TSAE session" message to the partner out. writeObject(msg); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+current_session_number+" ] sent message: "+ msg); }
Procesamos la cola de operaciones recibidas desde el partner, y la añadimos al registro (log ) de ). En esta fase no se implementarán las serverData si se trata de una operación de adición (add operaciones de eliminación (remove ). synchronized (serverData) { for (MessageOperation op : listOperations) { if (op .getOperation().getType() == OperationType.ADD) { serverData.deliverOperation( (AddOperation) o p.getOperation() ); } } //Update summary serverData.getSummary().updateMax(msgAe.getSummary()); serverData.getAck().updateMax(msgAe.getAck()); serverData. getLog().purgeLog(serverData.getAck()); }
6
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019
2.2.
Tests
1. Ejecución local de la simulación para la fase 2 Una vez realizadas las pruebas interactivas durante el desarrollo de nuestra solución, validamos su correcto funcionamiento mediante una simulación desatendida según la sentencia siguiente, y obteniendo los resultados que se muestran a continuación de forma resumida: n@ubuntu1804:~/tsae-group470832533-v4/scripts$ ./start.sh 20004 15 --logResults -path ../results --nopurge --noremove start recipes_service.test.TestServer TestServer -- current experiment of group 470832533 will run on port 20005 TestServerExperimentManager -- params : {sessionPeriod=20, simulationStop=300, numSes=1, groupId=470832533 , simulationPeriod=20, executionMode=localMode, purge=nopurge , probReconnect=0.2, propDegree=0, executionStop=180 , probDisconnect=0.05, samplingTime=5, sessionDelay=0, simulationDelay=5, probDel=0, probCreate=0.25, serverBasePort=35000} Server -- Initializing ...Server -- Initializing ...
Server -- Initializing ...
-- *** --> Server -- local node: [127.0.1.1,35001,[email protected]:35001] -- *** --> Server -- participants: [[email protected]:35000, [email protected]:35001, [email protected]:35002,
[email protected]:35011, [email protected]:35012, [email protected]:35013, [email protected]:35014]
[[email protected]:35008] ADD recipe: ixogcuur
[[email protected]:35007] >> Server DISCONNECTION [[email protected]:35001] >> Server RECONNECTION
Server [email protected]:35001 finishes Activity Simulation. It will stop because is not connected ##### [iteration: 0] partial result from server: [email protected]:35002 Server [email protected]:35004 finishes Activity Simulation
##### [iteration: 35] partial result from server: [email protected]:35013 Server [email protected]:35002 Ends Execution
##### Final result from server: [email protected]:35009
END OF EVALUATION
RESULTS
##### [[email protected]:35009] Result: Group id: 470832533 Node id: [email protected]:35009 Recipes: {aekelcjc=[aekelcjc, Content--aekelcjc, 470832533], bitqpktd=[bitqpktd, Content--bitqpktd, 470832533], bninuubo=[bninuubo,
Log: AddOperation [recipe=[earfpvby, Content--earfpvby, 470832533], [email protected]:35007: 0]
Summary: [email protected]:35007: 1
Ack: [email protected]:35007: [email protected]:35007: 1
Results are equal Nodes converged at the iteration 1
================================================
*********** num received results: 8 *********** % received results: 53 *********** minimal required number of results: 8
Puesto que el experimento se ha ejecutado según los parámetros de simulación prestablecidos en el fichero config.properties según el enunciado de la práctica, podemos considerar la prueba como satisfactoria. 7
75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019
2. Ejecución en el laboratorio DSLab de la simulación para la fase 2 El objeto de esta prueba será la evaluación formal del código implementado, en el entorno experimental controlado, proporcionado por DSLab de la UOC. Tras generar el correspondiente proyecto, subir los ficheros ServerData.java, Log.java, TimestampVector.java, TSAESessionOriginatorSide.java, TSAESessionPartnerSide.java, construir la compilación, y generar el experimento para su ejecución, se obtienen los siguientes resultados: ##### [All instances have the same value in all data structures (recipes, log, summary, Ack)] Result: Group id: 470832533 Node id: [email protected]:350...