From 7bc7fe45b23f048a6bd609b7fbd01270b498a20b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 8 May 2012 10:20:39 +0200 Subject: Get rid of queues in Master. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Remove WorkQueue code. – Remove delayed operations in master. (Handled by RPC instead) --- same/src/main/java/com/orbekk/same/Client.java | 1 - same/src/main/java/com/orbekk/same/Master.java | 108 ++++------ .../java/com/orbekk/same/ServicesPbConversion.java | 14 +- same/src/main/java/com/orbekk/util/WorkQueue.java | 229 --------------------- .../test/java/com/orbekk/util/WorkQueueTest.java | 52 ----- 5 files changed, 47 insertions(+), 357 deletions(-) delete mode 100644 same/src/main/java/com/orbekk/util/WorkQueue.java delete mode 100644 same/src/test/java/com/orbekk/util/WorkQueueTest.java diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index bbf9ca2..f632241 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -32,7 +32,6 @@ import com.orbekk.paxos.MasterProposer; import com.orbekk.protobuf.Rpc; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.FullStateResponse; -import com.orbekk.same.Services.FullStateResponse.Builder; import com.orbekk.same.Services.MasterState; import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index b32fedd..90601df 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -15,7 +15,6 @@ */ package com.orbekk.same; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -29,7 +28,6 @@ import com.orbekk.same.Services.ClientState; import com.orbekk.same.Services.Empty; import com.orbekk.same.Services.MasterTakeoverResponse; import com.orbekk.same.State.Component; -import com.orbekk.util.WorkQueue; public class Master { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -97,8 +95,8 @@ public class Master { private Services.Master newMasterImpl = new Services.Master() { @Override public void joinNetworkRequest(RpcController controller, ClientState request, RpcCallback done) { - logger.info("joinNetworkRequest({})", request); - sendFullStateThread.add(request.getLocation()); + sendInitialMasterTakeover(request.getLocation()); + sendFullState(request.getLocation()); addParticipant(request.getLocation()); done.run(Empty.getDefaultInstance()); } @@ -112,31 +110,35 @@ public class Master { success = true; long newRevision = revision.incrementAndGet(); state.forceUpdate(request.getId(), request.getData(), newRevision); - updateStateRequestThread.add(request.getId()); + sendStateToClients(state.getComponent(request.getId())); } done.run(Services.UpdateComponentResponse.newBuilder() .setSuccess(success).build()); } }; - WorkQueue updateStateRequestThread = new WorkQueue() { - @Override protected void onChange() { - List pending = getAndClear(); - List updatedComponents = new ArrayList(); - for (String component : pending) { - updatedComponents.add(state.getComponent(component)); - } - - logger.info("updateStateRequestThread: Updated state: {}", - pending); - for (String clientLocation : state.getList( - com.orbekk.same.State.PARTICIPANTS)) { - sendComponents(clientLocation, updatedComponents); - } + private void sendStateToClients(State.Component component) { + for (String clientLocation : state.getList( + com.orbekk.same.State.PARTICIPANTS)) { + sendComponent(clientLocation, component); + } + } + + private void sendComponent(String clientLocation, Component component) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); } - }; - public void sendComponents(String clientLocation, + Services.Component componentProto = ServicesPbConversion.componentToPb(component); + Rpc rpc = rpcf.create(); + RpcCallback done = + new RemoveParticipantIfFailsCallback(clientLocation, + rpc); + client.setState(rpc, componentProto, done); + } + + private void sendComponents(String clientLocation, List components) { Services.Client client = connections.getClient0(clientLocation); if (client == null) { @@ -144,7 +146,7 @@ public class Master { } for (Component component : components) { - Services.Component componentProto = componentToProto(component); + Services.Component componentProto = ServicesPbConversion.componentToPb(component); Rpc rpc = rpcf.create(); RpcCallback done = new RemoveParticipantIfFailsCallback(clientLocation, @@ -153,51 +155,27 @@ public class Master { } } - WorkQueue sendFullStateThread = new WorkQueue() { - @Override protected void onChange() { - List pending = getAndClear(); - logger.info("Sending full state to {}", pending); - final List components = state.getComponents(); - for (String clientLocation : pending) { - Services.Client client = connections.getClient0(clientLocation); - if (client == null) { - removeParticipant(clientLocation); - continue; - } - - { // Send masterTakeover(). - Rpc rpc = rpcf.create(); - RpcCallback done = - new RemoveParticipantIfFailsCallback( - clientLocation, rpc); - client.masterTakeover(rpc, getMasterInfo(), done); - } - sendComponents(clientLocation, components); - } - } - }; - - private Services.Component componentToProto(State.Component component) { - return Services.Component.newBuilder() - .setId(component.getName()) - .setData(component.getData()) - .setRevision(component.getRevision()) - .build(); + private void sendFullState(String clientLocation) { + List components = state.getComponents(); + sendComponents(clientLocation, components); + } + + private void sendInitialMasterTakeover(String clientLocation) { + Services.Client client = connections.getClient0(clientLocation); + Rpc rpc = rpcf.create(); + RpcCallback done = + new RemoveParticipantIfFailsCallback( + clientLocation, rpc); + client.masterTakeover(rpc, getMasterInfo(), done); } void performWork() { - sendFullStateThread.performWork(); - updateStateRequestThread.performWork(); } public void start() { - sendFullStateThread.start(); - updateStateRequestThread.start(); } public void interrupt() { - sendFullStateThread.interrupt(); - updateStateRequestThread.interrupt(); } public Services.Master getNewService() { @@ -210,29 +188,19 @@ public class Master { participants.add(location); state.updateFromObject(State.PARTICIPANTS, participants, state.getRevision(State.PARTICIPANTS) + 1); - updateStateRequestThread.add(State.PARTICIPANTS); + sendStateToClients(state.getComponent(State.PARTICIPANTS)); } } private synchronized void removeParticipant(String url) { - /** TODO: Remove this code. */ - List participants = state.getList(".participants"); - if (participants.contains(url)) { - logger.info("removeParticipant({})", url); - participants.remove(url); - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - updateStateRequestThread.add(".participants"); - } - List participants0 = state.getList(State.PARTICIPANTS); if (participants0.contains(url)) { logger.info("removeParticipant({})", url); participants0.remove(url); state.updateFromObject(State.PARTICIPANTS, participants0, state.getRevision(State.PARTICIPANTS) + 1); - updateStateRequestThread.add(State.PARTICIPANTS); + sendStateToClients(state.getComponent(State.PARTICIPANTS)); } } diff --git a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java index 5638e1a..7fc52f0 100644 --- a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java +++ b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java @@ -7,12 +7,16 @@ public class ServicesPbConversion { public static List componentsToPb(List components) { List results = new ArrayList(); for (State.Component c : components) { - results.add(Services.Component.newBuilder() - .setId(c.getName()) - .setRevision(c.getRevision()) - .setData(c.getData()) - .build()); + results.add(componentToPb(c)); } return results; } + + public static Services.Component componentToPb(State.Component component) { + return Services.Component.newBuilder() + .setId(component.getName()) + .setRevision(component.getRevision()) + .setData(component.getData()) + .build(); + } } diff --git a/same/src/main/java/com/orbekk/util/WorkQueue.java b/same/src/main/java/com/orbekk/util/WorkQueue.java deleted file mode 100644 index cfd5008..0000000 --- a/same/src/main/java/com/orbekk/util/WorkQueue.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Copyright 2012 Kjetil Ørbekk - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.orbekk.util; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A WorkList is a list for pending units of work. - */ -abstract public class WorkQueue extends Thread implements List { - private Logger logger = LoggerFactory.getLogger(getClass()); - private volatile List list = null; - private volatile boolean done = false; - - public WorkQueue() { - list = new ArrayList(); - } - - public WorkQueue(Collection collection) { - list = new ArrayList(collection); - } - - public synchronized List getAndClear() { - List copy = new ArrayList(list); - list.clear(); - return copy; - } - - /** - * OnChange event. - * - * May be run even if the WorkQueue has not been changed. - * Called until the queue is empty. - */ - protected abstract void onChange(); - - /** - * Perform work until the queue is empty. - * - * Can be used for testing or for combining several WorkQueues in - * a single thread. - */ - public synchronized void performWork() { - while (!isEmpty()) { - onChange(); - } - } - - @Override - public void run() { - while (!done) { - if (!isEmpty()) { - onChange(); - } - synchronized (this) { - try { - if (isEmpty()) { - wait(); - } - } catch (InterruptedException e) { - done = true; - } - if (Thread.interrupted()) { - done = true; - } - } - } - } - - - @Override - public synchronized boolean add(E e) { - notifyAll(); - return list.add(e); - } - - @Override - public synchronized void add(int index, E element) { - notifyAll(); - list.add(index, element); - } - - @Override - public synchronized boolean addAll(Collection c) { - notifyAll(); - return list.addAll(c); - } - - @Override - public synchronized boolean addAll(int index, Collection c) { - notifyAll(); - return list.addAll(index, c); - } - - @Override - public synchronized void clear() { - notifyAll(); - list.clear(); - } - - @Override - public synchronized boolean contains(Object o) { - notifyAll(); - return list.contains(o); - } - - @Override - public synchronized boolean containsAll(Collection c) { - notifyAll(); - return containsAll(c); - } - - @Override - public synchronized E get(int index) { - notifyAll(); - return list.get(index); - } - - @Override - public synchronized int indexOf(Object o) { - notifyAll(); - return list.indexOf(o); - } - - @Override - public synchronized boolean isEmpty() { - notifyAll(); - return list.isEmpty(); - } - - @Override - public synchronized Iterator iterator() { - notifyAll(); - return list.iterator(); - } - - @Override - public synchronized int lastIndexOf(Object o) { - notifyAll(); - return list.lastIndexOf(o); - } - - @Override - public synchronized ListIterator listIterator() { - notifyAll(); - return list.listIterator(); - } - - @Override - public synchronized ListIterator listIterator(int index) { - notifyAll(); - return list.listIterator(index); - } - - @Override - public synchronized boolean remove(Object o) { - notifyAll(); - return list.remove(o); - } - - @Override - public synchronized E remove(int index) { - notifyAll(); - return list.remove(index); - } - - @Override - public synchronized boolean removeAll(Collection c) { - notifyAll(); - return list.removeAll(c); - } - - @Override - public synchronized boolean retainAll(Collection c) { - notifyAll(); - return list.retainAll(c); - } - - @Override - public synchronized E set(int index, E element) { - notifyAll(); - return list.set(index, element); - } - - @Override - public synchronized int size() { - notifyAll(); - return list.size(); - } - - @Override - public synchronized List subList(int fromIndex, int toIndex) { - notifyAll(); - return list.subList(fromIndex, toIndex); - } - - @Override - public synchronized Object[] toArray() { - notifyAll(); - return list.toArray(); - } - - @Override - public synchronized T[] toArray(T[] a) { - notifyAll(); - return list.toArray(a); - } - -} diff --git a/same/src/test/java/com/orbekk/util/WorkQueueTest.java b/same/src/test/java/com/orbekk/util/WorkQueueTest.java deleted file mode 100644 index f39281c..0000000 --- a/same/src/test/java/com/orbekk/util/WorkQueueTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2012 Kjetil Ørbekk - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.orbekk.util; - -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Test; - -public class WorkQueueTest { - @Test - public void testPerformsWork() throws Exception { - final ArrayList doubled = new ArrayList(); - WorkQueue worker = new WorkQueue() { - @Override protected void onChange() { - List list = getAndClear(); - for (int x : list) { - doubled.add(x * 2); - } - synchronized (doubled) { - doubled.notifyAll(); - } - } - }; - - synchronized (doubled) { - worker.start(); - worker.add(1); - doubled.wait(); - worker.interrupt(); - } - - worker.join(); - assertEquals(2, (int)doubled.get(0)); - } - -} -- cgit v1.2.3