Phases 234 - prac1 PDF

Title Phases 234 - prac1
Course Sistemas distribuidos
Institution Universitat Oberta de Catalunya
Pages 16
File Size 349.5 KB
File Type PDF
Total Downloads 151
Total Views 642

Summary

Distributed Systems1. IntroducciónLa práctica consiste en el estudio de un proyecto proporcionado junto con el enunciado de lamisma, basado en un ejemplo práctico de implementación del protocolo TSAE (“Time StampedAnti-Entropy) para la gestión y replicación de operaciones entre un grupo de nodos.Con...


Description

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019

Distributed Systems

1.

Introducción

La práctica consiste en el estudio de un proyecto proporcionado junto con el enunciado de la misma, basado en un ejemplo práctico de implementación del protocolo TSAE (“Time Stamped Anti-Entropy) para la gestión y replicación de operaciones entre un grupo de nodos. Con el enunciado se han proporcionado todos los paquetes y clases necesarios para construir el aplicativo, si bien éste está incompleto. La práctica se divide en las fases 2, 3, y 4.1, estableciéndose unos objetivos a lograr en cada una para obtener favorablemente la nota correspondiente a su evaluación. La práctica se divide en fases, debiéndose alcanzar los objetivos establecidos para cada una con objeto de obtener la correspondiente calificación favorable. Para comprender mejor el funcionamiento del entorno experimental en el que se desarrollará la práctica, a continuación se describe el árbol de ejecución de los procesos que constituyen el sistema: El experimento se inicia mediante la ejecución del script “start.sh”, al que se le pasan los parámetros básicos de definición del entorno, y que junto con los parámetros obtenidos desde el fichero “config.properties” establecerán las condiciones de contorno para nuestras simulaciones. Desde el script se iniciará, por una parte el proceso del TestServer, encargado de orquestar la formación del grupo de nodos participantes, y que a su vez iniciará el TestServerExperimentManager, encargado de modular las simulaciones. Por otra parte, el script lanzará tantas instancias de procesos Server como se hayan indicado en los parámetros, que a su vez cada uno de ellos invocará la clase TSAESessionOriginatorSide para planificar la ejecución del inicio de sesiones TSAE de forma periódica, y por otra parte invocará la clase TSAESessionPartnerSide para iniciar un hilo que permanecerá escuchando en un socket para atender los mensajes recibidos.

1

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019

2.

Phase 2

La fase 2 de la práctica consistirá en implementar el protocolo TSAE (sin acks ) descrito en la  , entre pares sección 5.1.2 del artículo “Weak-consistency group communication and membership ”[1] con sincronización de relojes lógicos (loosely-synchronized clocks ), y emitiendo sólo operaciones de agregación (add ), y sin purgado del registro de operaciones (logs ). Para asegurar que la implementación desarrollada es conforme con las clases TSAESessionOriginatorSide y  TSAESessionPartnerSide,  el código resultante deberá seguir las plantillas indicadas en el enunciado de la práctica. El protocolo propuesto por el citado artículo para implementar las sesiones de anti-entropía entre dos pares, se puede resumir en el intercambio de mensajes entre pares descrito en la siguiente tabla: TSAESessionOriginatorSide

TSAESessionPartnerSide

Envía A  E_REQUEST con vectores

Recibe AE_REQUEST con vectores

Message

msg = (Message) i  n.readObject();

msg = new MessageAErequest(localSummary, localAck);

Recibe A  E_REQUEST con vectores

Envía AE_REQUEST con vectores

msg = (Message) in.readObject();

msg = n  ew MessageAErequest(localSummary, localAck  );

Envía O  PERATION con operaciones diferentes

Recibe OPERATION con operaciones diferentes

out.writeObject(new MessageOperation(op));

msg = (Message) i  n.readObject();

Recibe O  PERATION con operaciones diferentes

Envía OPERATION con operaciones diferentes

msg = (Message) in.readObject();

 ew MessageOperation(op)); out.writeObject(n

Envía E  ND_TSAE y recibe confirmación

Envía END_TSAE y recibe confirmación

msg = n  ew MessageEndTSAE(); msg = (Message) in  .readObject()

msg = n  ew MessageEndTSAE(); msg = (Message) in  .readObject()

Actualiza vectores con respecto al partner

Actualiza vectores con respecto al originador

serverData.getSummary().updateMax(msgAe.getSummary());

serverData.getSummary().updateMax(msgAe.getSummary());

serverData.getAck().updateMax(msgAe.getAck());

serverData.getAck().updateMax(m  sgAe.getAck());

Procesa cola de operaciones recibidas

Procesa cola de operaciones recibidas

 p.getOperation()) serverData.deliverOperation((AddOperation) o

 p.getOperation()) serverData.deliverOperation((AddOperation) o

2.1.

Implementación

Con carácter general, hemos implementado aquellas operaciones que manipulan las estructuras de serverData con métodos de tipo sincronizado (synchronized ), es decir, que solamente un subproceso puede acceder a dicho método a la vez. Ello evita que dos acciones emitidas por hilos de ejecución diferentes interfieran entre sí al intentar modificar estas estructuras de forma concurrente, debiendo esperar el resto de hilos a que finalice el proceso que bloquea la estructura con acceso exclusivo. Atendiendo al objetivo del presente informe, a continuación se comentan sólo los extractos de las clases y métodos que han sido necesarios modificar para alcanzar los objetivos planteados por el 2

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 enunciado de la práctica: [recipes_services.tsae.data_structures.Log] Con respecto al código desarrollado para la fase 1, hemos tenido que preparar esta clase para recibir la lista de operaciones desde otros nodos, y determinar aquéllas que se deben agregar al registro local mediante la comparación de sus marcas de tiempo (timestamps ) con las conocidas en el resumen (summary): public synchronized  List listNewer(TimestampVector sum  ){ List ops = new ArrayList(); // get all the keys = hosts from hashmap Set hosts = log.keySet(); for (String host : hosts) { //t = last timestamp for index host in sumVector Timestamp tRef = s um. getTimestampVector().get(host); //get the list of ops from a host List l ogOp = log.get(host); //iterate over list of operations from host(i) and compare if timestamp is new than tRef.-> add to list for (int i = 0; i  < logOp  .size(); i+  +) { if (logOp  . get(i) .getTimestamp().compare(tRef) > 0){ ops.add(logOp.get(i) ); } } //Sort of the ops list Collections.sort ( ops, n  ew Comparator() { @Override public int  compare(Operation o1  , Operation o2  ) { return (int) o  1. getTimestamp().compare(o2.getTimestamp()); } }); } return ops; }

[recipes_services.tsae.data_structures.TimestampVector] Con respecto al código desarrollado para la fase 1, hemos tenido que completar el desarrollo de esta clase para soportar todos los métodos necesarios para implementar el protocolo TSAE. El método updateMax(Timestampvector  tsVector) recorre  los resúmenes de cada host del grupo, y actualiza el resumen del propio host con los valores más altos encontrados para cada host: public synchronized void updateMax(TimestampVector tsVector){ if( t sVector != null ){ Set hosts = this.timestampVector.keySet(); for (String host : hosts) { Timestamp max = tsVector.t imestampVector.get(host); if( m  ax != null ) { //replace if the timestampVector is higher than the current if ( this.timestampVector  .get(h  ost).compare(max) < 0 ) { timestampVector.replace(host, max); } } } } }

El método clone()  genera una copia de la estructura de resumen, que será usada por el resto de clases para realizar ciertas operaciones que no requieren de la manipulación directa de la estructura en serverData:

3

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 public TimestampVector clone(){ Set h  osts = t imestampVector.keySet(); ConcurrentHashMap c opy = n  ew ConcurrentHashMap(); for (String h  ost : hosts  ) { copy.put(host, timestampVector.get(host)); } List h  ostsList = new ArrayList (hosts); TimestampVector tsVec = n  ew TimestampVector(hostsList); tsVec.setTimestampVector(copy); return tsVec; }

El método getLatest(String host) devuelve  la estructura de resumen para un host determinado: public Timestamp getLatest(String host) { return this  .timestampVector.get(host); }

[recipes_services.ServerData] Esta clase contiene las estructuras de datos del servidor necesarias para implementar el protocolo TSAE (log, summary, ack ), así como para soportar la aplicación de recetas (recipes ), que en esta fase sólo dará soporte a las operaciones de adición (add ) . Estas estructuras son instancias de la clase [java.util.concurrent.concurrentHashMap   ], que permite concurrencia de operaciones para evitar el bloqueo entre hilos al acceder a éstas. Por ello es necesario implementar métodos sincronizados para los accesos que realicen cambios sobre estas estructuras. El método deliverOperation(AddOperation  op) evalúa la incorporación de una operación recibida desde un nodo remoto al registro (log ) del nodo local: public synchronized void deliverOperation(AddOperation op) { if (this.log  .add(op)) { this.recipes.add(op.getRecipe()); } }

[recipes_services.tsae.sessions.TSAESessionOriginatorSide] Esta clase implementa la parte del protocolo correspondiente al iniciador de la sesión de anti-entropía, tal como se describe más arriba en la introducción de esta fase 2. Para asegurar su compatibilidad con el resto del sistema, su desarrollo se ha realizado conforme a la plantilla de pseudocódigo descrita en el enunciado de la práctica. Para evitar la concurrencia de operaciones de modificación sobre las estructuras de serverData,   instanciamos de forma síncrona sendas copias de dichas estructuras: synchronized (serverData) { // Initiate localSummary with a copy of the summary in the serverData server localSummary = serverData.getSummary().clone(); serverData. getAck().update(serverData.getId(), localSummary); // Initiate localAck with a copy of the acknowledge vector in the serverData server. localAck = s erverData.getAck().clone(); }

4

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 Evaluamos si el mensaje recibido es del tipo OPERATION para añadir la operación recibida a la lista que será procesada localmente con posterioridad: while (msg.type() == MsgType.OPERATION){ // Add received operation to queue-list of pending operations listOperations. add((MessageOperation) msg); msg = (Message) in.readObject(); lsim.log(Level.TRACE , "[TSAESessionOriginatorSide] [session: "+c urrent_session_number+" ] received message: "+msg); }

Evaluamos el mensaje recibido es del tipo A  E_REQUEST para intercambiar los vectores de resumen y reconocimiento (si bien estos últimos no se tendrán en cuenta en la fase 2), evaluar las operaciones que no conoce la otra parte, y enviárselas: if (msg  .type() == MsgType.AE_REQUEST){ MessageAErequest msgAe = (MessageAErequest) m  sg; List n  ewLogs = s erverData.getLog().listNewer(msgAe.getSummary()); //send operations for (Operation op   : newLogs) { out.writeObject(new MessageOperation(op)); } }

Evaluamos si el mensaje recibido es del tipo E  ND_TSAE , para finalizar la sesión de anti-entropia y actualizar de forma síncrona las estructuras de serverData  , estaleciendo los vectores con los valores más altos de entre los conocidos, y añadiendo las operaciones que no conoce localmente: // catch message for ending the TSAE session if (msg  .type() == MsgType.END_TSAE){ // deliver serverData structures (ADD / REMOVE) concurrently using synchronized method synchronized (serverData  ){ for (MessageOperation op : listOperations) { if (op.getOperation().getType() == OperationType.ADD) { serverData.deliverOperation( (AddOperation) op.getOperation() ); } } //Update summary serverData.getSummary().updateMax(msgAe.getSummary()); serverData.getAck().updateMax(msgAe.getAck()); serverData.getLog().purgeLog(serverData.getAck()); } }

[recipes_services.tsae.sessions.TSAESessionPartnerSide] Esta clase implementa la parte del protocolo correspondiente a la contraparte de la sesión de anti-entropía, tal como se describe más arriba en la introducción de esta fase 2. Para asegurar su compatibilidad con el resto del sistema, su desarrollo se ha realizado conforme a la plantilla de pseudocódigo descrita en el enunciado de la práctica. Para evitar la concurrencia de operaciones de modificación sobre las estructuras de serverData,   instanciamos de forma síncrona sendas copias de dichas estructuras: synchronized (serverData) { // Initiate localSummary with a copy of the summary in the serverData server. localSummary = serverData.getSummary().clone(); serverData. getAck().update(serverData.getId(), localSummary); // Initiate localAck with a copy of the acknowledge vector in the serverData server. localAck = s erverData.getAck().clone(); }

5

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019 Enviamos al originador las operaciones que éste no conoce, mediante un mensaje del tipo OPERATION: List n  ewLogs = serverData.getLog().listNewer(msgAe.getSummary()); for (Operation op   : newLogs  ) { out. writeObject(new MessageOperation(op)); msg. setSessionNumber(current_session_number); // send an operation to the partner out. writeObject(msg); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+current_session_number+" ] sent message: "+ msg); }

Enviamos al originador un mensaje del tipo A  E_REQUEST para intercambiar los vectores de resumen y reconocimiento (si bien este último no se tendrá en cuenta en la fase 2): msg = new MessageAErequest(localSummary, localAck); msg. setSessionNumber(current_session_number); out. writeObject(msg); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+c urrent_session_number+"] sent message: "+ msg);

Evaluamos si el mensaje recibido es del tipo O  PERATION para añadir la operación recibida a la lista que será procesada localmente con posterioridad: List listOperations = n  ew ArrayList(); msg = (Message) in.readObject(); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+c urrent_session_number+"] received message: "+ msg); while (msg.type() == MsgType.OPERATION){ // add received operation to queue-list of pending operations listOperations. add((MessageOperation) msg); // process next message msg = (Message) in.readObject(); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+current_session_number+" ] received message: "+ msg); }

Evaluamos si el mensaje recibido es del tipo E  ND_TSAE para finalizar la sesión respondiendo también con un mensaje la operación recibida a la lista que será procesada localmente con posterioridad // catch message for ending the TSAE session if (msg  .type() == MsgType.END_TSAE){ msg = n  ew MessageEndTSAE(); msg. setSessionNumber(current_session_number); // send "end of TSAE session" message to the partner out. writeObject(msg); lsim.log(Level.TRACE , "[TSAESessionPartnerSide] [session: "+current_session_number+" ] sent message: "+ msg); }

Procesamos la cola de operaciones recibidas desde el partner, y la añadimos al registro (log ) de ). En esta fase no se implementarán las serverData si se trata de una operación de adición (add  operaciones de eliminación (remove ). synchronized (serverData) { for (MessageOperation op   : listOperations) { if (op  .getOperation().getType() == OperationType.ADD) { serverData.deliverOperation( (AddOperation) o  p.getOperation() ); } } //Update summary serverData.getSummary().updateMax(msgAe.getSummary()); serverData.getAck().updateMax(msgAe.getAck()); serverData. getLog().purgeLog(serverData.getAck()); }

6

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019

2.2.

Tests

1. Ejecución local de la simulación para la fase 2 Una vez realizadas las pruebas interactivas durante el desarrollo de nuestra solución, validamos su correcto funcionamiento mediante una simulación desatendida según la sentencia siguiente, y obteniendo los resultados que se muestran a continuación de forma resumida: n@ubuntu1804:~/tsae-group470832533-v4/scripts$ ./start.sh 20004 15 --logResults -path ../results --nopurge --noremove start recipes_service.test.TestServer TestServer -- current experiment  of group 470832533 will run on port 20005 TestServerExperimentManager -- params  : {sessionPeriod=20, simulationStop=300, numSes=1, groupId=470832533  , simulationPeriod=20, executionMode=localMode, purge=nopurge  , probReconnect=0.2, propDegree=0, executionStop=180  , probDisconnect=0.05, samplingTime=5, sessionDelay=0, simulationDelay=5, probDel=0, probCreate=0.25, serverBasePort=35000} Server -- Initializing ...Server -- Initializing ...

Server -- Initializing ...

-- *** --> Server -- local node: [127.0.1.1,35001,[email protected]:35001] -- *** --> Server -- participants: [[email protected]:35000, [email protected]:35001, [email protected]:35002,

[email protected]:35011, [email protected]:35012, [email protected]:35013, [email protected]:35014]

[[email protected]:35008] ADD recipe: ixogcuur

[[email protected]:35007] >> Server DISCONNECTION [[email protected]:35001] >> Server RECONNECTION

Server [email protected]:35001 finishes Activity Simulation. It will stop because is not connected ##### [iteration: 0] partial result from server: [email protected]:35002 Server [email protected]:35004 finishes Activity Simulation

##### [iteration: 35] partial result from server: [email protected]:35013 Server [email protected]:35002 Ends Execution

##### Final result from server: [email protected]:35009

END OF EVALUATION

RESULTS

##### [[email protected]:35009] Result: Group id: 470832533 Node id: [email protected]:35009 Recipes: {aekelcjc=[aekelcjc, Content--aekelcjc, 470832533], bitqpktd=[bitqpktd, Content--bitqpktd, 470832533], bninuubo=[bninuubo,

Log: AddOperation [recipe=[earfpvby, Content--earfpvby, 470832533], [email protected]:35007: 0]

Summary: [email protected]:35007: 1

Ack: [email protected]:35007: [email protected]:35007: 1

Results are equal Nodes converged at the iteration 1

================================================

*********** num received results: 8 *********** % received results: 53 *********** minimal required number of results: 8

Puesto que el experimento se ha ejecutado según los parámetros de simulación prestablecidos en el fichero config.properties   según el enunciado de la práctica, podemos considerar la prueba como satisfactoria. 7

75.589 Sistemas Distribuidos – Grupo 470832533 – Práctica 2 - Diciembre 2019

2. Ejecución en el laboratorio DSLab de la simulación para la fase 2 El objeto de esta prueba será la evaluación formal del código implementado, en el entorno experimental controlado, proporcionado por DSLab de la UOC. Tras generar el correspondiente proyecto, subir los ficheros ServerData.java, Log.java, TimestampVector.java, TSAESessionOriginatorSide.java, TSAESessionPartnerSide.java, construir la compilación, y generar el experimento para su ejecución, se obtienen los siguientes resultados: ##### [All instances have the same value in all data structures (recipes, log, summary, Ack)] Result: Group id: 470832533 Node id: [email protected]:350...


Similar Free PDFs