From 58ec090017bbdca7ac06343d0d224648d5f7ac6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Wed, 28 Mar 2012 14:56:54 +0200 Subject: Add TimeoutManager. --- .../java/com/orbekk/protobuf/TimeoutManager.java | 85 ++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 src/main/java/com/orbekk/protobuf/TimeoutManager.java diff --git a/src/main/java/com/orbekk/protobuf/TimeoutManager.java b/src/main/java/com/orbekk/protobuf/TimeoutManager.java new file mode 100644 index 0000000..a19070b --- /dev/null +++ b/src/main/java/com/orbekk/protobuf/TimeoutManager.java @@ -0,0 +1,85 @@ +package com.orbekk.protobuf; + +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.PriorityQueue; +import java.io.Closeable; + +public class TimeoutManager extends Thread { + private static final Logger logger = + Logger.getLogger(TimeoutManager.class.getName()); + private Environment environment; + private PriorityQueue entries = new PriorityQueue(); + + private static class Entry implements Comparable { + // May not be null. + public Long timeout; + public Closeable closeable; + + public Entry(long timeout, Closeable closeable) { + this.timeout = timeout; + this.closeable = closeable; + } + + @Override public int compareTo(Entry other) { + return timeout.compareTo(other.timeout); + } + } + + public interface Environment { + long currentTimeMillis(); + void sleep(long millis) throws InterruptedException; + } + + public static class DefaultEnvironment { + @Override public long currentTimeMillis() { + System.currentTimeMillis(); + } + @Override public void sleep(long millis) throws InterruptedException { + Thread.sleep(millis); + } + } + + TimeoutManager(Environment environment) { + this.environment = environment; + } + + public TimeoutManager() { + self(new DefaultTime()); + } + + @Override public void run() { + while (!Thread.interrupted()) { + synchronized (this) { + if (entries.isEmpty()) { + environment.wait(); + } else { + long sleepTime = entries.peek().timeout - + environment.currentTimeMillis(); + if (sleepTime > 0) { + environment.sleep(sleepTime); + } + } + closeExpiredEntries(); + } + } + } + + public synchronized void closeExpiredEntries() { + long currentTime = environment.currentTimeMillis(); + while (entries.peek().timeout <= currentTime) { + try { + entries.poll().close(); + } catch (IOException e) { + logger.log(Level.INFO, "Could not close entry. ", e); + } + } + } + + public synchronized void addEntry(long timeoutTime, Closeable closeable) { + if (entries.isEmpty() || timeoutTime <= entries.peek().timeout) { + notify(); + } + entries.add(new Entry(timeoutTime, closeable)); + } +} -- cgit v1.2.3