summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-08 10:20:39 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-08 10:20:39 +0200
commit7bc7fe45b23f048a6bd609b7fbd01270b498a20b (patch)
tree69bb11e40dbc9c119f7461c430804d3ae9ef86af
parent561b6ab936f1b60d364e81b08322899a931ecc2e (diff)
Get rid of queues in Master.
– Remove WorkQueue code. – Remove delayed operations in master. (Handled by RPC instead)
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java1
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java108
-rw-r--r--same/src/main/java/com/orbekk/same/ServicesPbConversion.java14
-rw-r--r--same/src/main/java/com/orbekk/util/WorkQueue.java229
-rw-r--r--same/src/test/java/com/orbekk/util/WorkQueueTest.java52
5 files changed, 47 insertions, 357 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index bbf9ca2..f632241 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -32,7 +32,6 @@ import com.orbekk.paxos.MasterProposer;
import com.orbekk.protobuf.Rpc;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.FullStateResponse;
-import com.orbekk.same.Services.FullStateResponse.Builder;
import com.orbekk.same.Services.MasterState;
import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index b32fedd..90601df 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -15,7 +15,6 @@
*/
package com.orbekk.same;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,7 +28,6 @@ import com.orbekk.same.Services.ClientState;
import com.orbekk.same.Services.Empty;
import com.orbekk.same.Services.MasterTakeoverResponse;
import com.orbekk.same.State.Component;
-import com.orbekk.util.WorkQueue;
public class Master {
private Logger logger = LoggerFactory.getLogger(getClass());
@@ -97,8 +95,8 @@ public class Master {
private Services.Master newMasterImpl = new Services.Master() {
@Override public void joinNetworkRequest(RpcController controller,
ClientState request, RpcCallback<Empty> done) {
- logger.info("joinNetworkRequest({})", request);
- sendFullStateThread.add(request.getLocation());
+ sendInitialMasterTakeover(request.getLocation());
+ sendFullState(request.getLocation());
addParticipant(request.getLocation());
done.run(Empty.getDefaultInstance());
}
@@ -112,31 +110,35 @@ public class Master {
success = true;
long newRevision = revision.incrementAndGet();
state.forceUpdate(request.getId(), request.getData(), newRevision);
- updateStateRequestThread.add(request.getId());
+ sendStateToClients(state.getComponent(request.getId()));
}
done.run(Services.UpdateComponentResponse.newBuilder()
.setSuccess(success).build());
}
};
- WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- List<Component> updatedComponents = new ArrayList<Component>();
- for (String component : pending) {
- updatedComponents.add(state.getComponent(component));
- }
-
- logger.info("updateStateRequestThread: Updated state: {}",
- pending);
- for (String clientLocation : state.getList(
- com.orbekk.same.State.PARTICIPANTS)) {
- sendComponents(clientLocation, updatedComponents);
- }
+ private void sendStateToClients(State.Component component) {
+ for (String clientLocation : state.getList(
+ com.orbekk.same.State.PARTICIPANTS)) {
+ sendComponent(clientLocation, component);
+ }
+ }
+
+ private void sendComponent(String clientLocation, Component component) {
+ Services.Client client = connections.getClient0(clientLocation);
+ if (client == null) {
+ removeParticipant(clientLocation);
}
- };
- public void sendComponents(String clientLocation,
+ Services.Component componentProto = ServicesPbConversion.componentToPb(component);
+ Rpc rpc = rpcf.create();
+ RpcCallback<Empty> done =
+ new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
+ rpc);
+ client.setState(rpc, componentProto, done);
+ }
+
+ private void sendComponents(String clientLocation,
List<Component> components) {
Services.Client client = connections.getClient0(clientLocation);
if (client == null) {
@@ -144,7 +146,7 @@ public class Master {
}
for (Component component : components) {
- Services.Component componentProto = componentToProto(component);
+ Services.Component componentProto = ServicesPbConversion.componentToPb(component);
Rpc rpc = rpcf.create();
RpcCallback<Empty> done =
new RemoveParticipantIfFailsCallback<Empty>(clientLocation,
@@ -153,51 +155,27 @@ public class Master {
}
}
- WorkQueue<String> sendFullStateThread = new WorkQueue<String>() {
- @Override protected void onChange() {
- List<String> pending = getAndClear();
- logger.info("Sending full state to {}", pending);
- final List<Component> components = state.getComponents();
- for (String clientLocation : pending) {
- Services.Client client = connections.getClient0(clientLocation);
- if (client == null) {
- removeParticipant(clientLocation);
- continue;
- }
-
- { // Send masterTakeover().
- Rpc rpc = rpcf.create();
- RpcCallback<MasterTakeoverResponse> done =
- new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
- clientLocation, rpc);
- client.masterTakeover(rpc, getMasterInfo(), done);
- }
- sendComponents(clientLocation, components);
- }
- }
- };
-
- private Services.Component componentToProto(State.Component component) {
- return Services.Component.newBuilder()
- .setId(component.getName())
- .setData(component.getData())
- .setRevision(component.getRevision())
- .build();
+ private void sendFullState(String clientLocation) {
+ List<Component> components = state.getComponents();
+ sendComponents(clientLocation, components);
+ }
+
+ private void sendInitialMasterTakeover(String clientLocation) {
+ Services.Client client = connections.getClient0(clientLocation);
+ Rpc rpc = rpcf.create();
+ RpcCallback<MasterTakeoverResponse> done =
+ new RemoveParticipantIfFailsCallback<MasterTakeoverResponse>(
+ clientLocation, rpc);
+ client.masterTakeover(rpc, getMasterInfo(), done);
}
void performWork() {
- sendFullStateThread.performWork();
- updateStateRequestThread.performWork();
}
public void start() {
- sendFullStateThread.start();
- updateStateRequestThread.start();
}
public void interrupt() {
- sendFullStateThread.interrupt();
- updateStateRequestThread.interrupt();
}
public Services.Master getNewService() {
@@ -210,29 +188,19 @@ public class Master {
participants.add(location);
state.updateFromObject(State.PARTICIPANTS, participants,
state.getRevision(State.PARTICIPANTS) + 1);
- updateStateRequestThread.add(State.PARTICIPANTS);
+ sendStateToClients(state.getComponent(State.PARTICIPANTS));
}
}
private synchronized void removeParticipant(String url) {
- /** TODO: Remove this code. */
- List<String> participants = state.getList(".participants");
- if (participants.contains(url)) {
- logger.info("removeParticipant({})", url);
- participants.remove(url);
- state.updateFromObject(".participants", participants,
- state.getRevision(".participants") + 1);
- updateStateRequestThread.add(".participants");
- }
-
List<String> participants0 = state.getList(State.PARTICIPANTS);
if (participants0.contains(url)) {
logger.info("removeParticipant({})", url);
participants0.remove(url);
state.updateFromObject(State.PARTICIPANTS, participants0,
state.getRevision(State.PARTICIPANTS) + 1);
- updateStateRequestThread.add(State.PARTICIPANTS);
+ sendStateToClients(state.getComponent(State.PARTICIPANTS));
}
}
diff --git a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java
index 5638e1a..7fc52f0 100644
--- a/same/src/main/java/com/orbekk/same/ServicesPbConversion.java
+++ b/same/src/main/java/com/orbekk/same/ServicesPbConversion.java
@@ -7,12 +7,16 @@ public class ServicesPbConversion {
public static List<Services.Component> componentsToPb(List<State.Component> components) {
List<Services.Component> results = new ArrayList<Services.Component>();
for (State.Component c : components) {
- results.add(Services.Component.newBuilder()
- .setId(c.getName())
- .setRevision(c.getRevision())
- .setData(c.getData())
- .build());
+ results.add(componentToPb(c));
}
return results;
}
+
+ public static Services.Component componentToPb(State.Component component) {
+ return Services.Component.newBuilder()
+ .setId(component.getName())
+ .setRevision(component.getRevision())
+ .setData(component.getData())
+ .build();
+ }
}
diff --git a/same/src/main/java/com/orbekk/util/WorkQueue.java b/same/src/main/java/com/orbekk/util/WorkQueue.java
deleted file mode 100644
index cfd5008..0000000
--- a/same/src/main/java/com/orbekk/util/WorkQueue.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Copyright 2012 Kjetil Ørbekk <kjetil.orbekk@gmail.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.orbekk.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A WorkList is a list for pending units of work.
- */
-abstract public class WorkQueue<E> extends Thread implements List<E> {
- private Logger logger = LoggerFactory.getLogger(getClass());
- private volatile List<E> list = null;
- private volatile boolean done = false;
-
- public WorkQueue() {
- list = new ArrayList<E>();
- }
-
- public WorkQueue(Collection<? extends E> collection) {
- list = new ArrayList<E>(collection);
- }
-
- public synchronized List<E> getAndClear() {
- List<E> copy = new ArrayList<E>(list);
- list.clear();
- return copy;
- }
-
- /**
- * OnChange event.
- *
- * May be run even if the WorkQueue has not been changed.
- * Called until the queue is empty.
- */
- protected abstract void onChange();
-
- /**
- * Perform work until the queue is empty.
- *
- * Can be used for testing or for combining several WorkQueues in
- * a single thread.
- */
- public synchronized void performWork() {
- while (!isEmpty()) {
- onChange();
- }
- }
-
- @Override
- public void run() {
- while (!done) {
- if (!isEmpty()) {
- onChange();
- }
- synchronized (this) {
- try {
- if (isEmpty()) {
- wait();
- }
- } catch (InterruptedException e) {
- done = true;
- }
- if (Thread.interrupted()) {
- done = true;
- }
- }
- }
- }
-
-
- @Override
- public synchronized boolean add(E e) {
- notifyAll();
- return list.add(e);
- }
-
- @Override
- public synchronized void add(int index, E element) {
- notifyAll();
- list.add(index, element);
- }
-
- @Override
- public synchronized boolean addAll(Collection<? extends E> c) {
- notifyAll();
- return list.addAll(c);
- }
-
- @Override
- public synchronized boolean addAll(int index, Collection<? extends E> c) {
- notifyAll();
- return list.addAll(index, c);
- }
-
- @Override
- public synchronized void clear() {
- notifyAll();
- list.clear();
- }
-
- @Override
- public synchronized boolean contains(Object o) {
- notifyAll();
- return list.contains(o);
- }
-
- @Override
- public synchronized boolean containsAll(Collection<?> c) {
- notifyAll();
- return containsAll(c);
- }
-
- @Override
- public synchronized E get(int index) {
- notifyAll();
- return list.get(index);
- }
-
- @Override
- public synchronized int indexOf(Object o) {
- notifyAll();
- return list.indexOf(o);
- }
-
- @Override
- public synchronized boolean isEmpty() {
- notifyAll();
- return list.isEmpty();
- }
-
- @Override
- public synchronized Iterator<E> iterator() {
- notifyAll();
- return list.iterator();
- }
-
- @Override
- public synchronized int lastIndexOf(Object o) {
- notifyAll();
- return list.lastIndexOf(o);
- }
-
- @Override
- public synchronized ListIterator<E> listIterator() {
- notifyAll();
- return list.listIterator();
- }
-
- @Override
- public synchronized ListIterator<E> listIterator(int index) {
- notifyAll();
- return list.listIterator(index);
- }
-
- @Override
- public synchronized boolean remove(Object o) {
- notifyAll();
- return list.remove(o);
- }
-
- @Override
- public synchronized E remove(int index) {
- notifyAll();
- return list.remove(index);
- }
-
- @Override
- public synchronized boolean removeAll(Collection<?> c) {
- notifyAll();
- return list.removeAll(c);
- }
-
- @Override
- public synchronized boolean retainAll(Collection<?> c) {
- notifyAll();
- return list.retainAll(c);
- }
-
- @Override
- public synchronized E set(int index, E element) {
- notifyAll();
- return list.set(index, element);
- }
-
- @Override
- public synchronized int size() {
- notifyAll();
- return list.size();
- }
-
- @Override
- public synchronized List<E> subList(int fromIndex, int toIndex) {
- notifyAll();
- return list.subList(fromIndex, toIndex);
- }
-
- @Override
- public synchronized Object[] toArray() {
- notifyAll();
- return list.toArray();
- }
-
- @Override
- public synchronized <T> T[] toArray(T[] a) {
- notifyAll();
- return list.toArray(a);
- }
-
-}
diff --git a/same/src/test/java/com/orbekk/util/WorkQueueTest.java b/same/src/test/java/com/orbekk/util/WorkQueueTest.java
deleted file mode 100644
index f39281c..0000000
--- a/same/src/test/java/com/orbekk/util/WorkQueueTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Copyright 2012 Kjetil Ørbekk <kjetil.orbekk@gmail.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.orbekk.util;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Test;
-
-public class WorkQueueTest {
- @Test
- public void testPerformsWork() throws Exception {
- final ArrayList<Integer> doubled = new ArrayList<Integer>();
- WorkQueue<Integer> worker = new WorkQueue<Integer>() {
- @Override protected void onChange() {
- List<Integer> list = getAndClear();
- for (int x : list) {
- doubled.add(x * 2);
- }
- synchronized (doubled) {
- doubled.notifyAll();
- }
- }
- };
-
- synchronized (doubled) {
- worker.start();
- worker.add(1);
- doubled.wait();
- worker.interrupt();
- }
-
- worker.join();
- assertEquals(2, (int)doubled.get(0));
- }
-
-}