Commit 70f4b5ae authored by Evan Ward's avatar Evan Ward Committed by Bryan Cazabonne
Browse files

Improve concurrency issue in NtripClientTest

Previously it would wait 1 second for all messages. This proved to be
too short on some platforms. Now it waits up to 30 seconds for 30
messages, then 1 additional second for the message that causes the
error.

The concurrency constructs used may not be the best, but they seem to
work.
parent 7eb9cf6e
......@@ -16,6 +16,10 @@
*/
package org.orekit.gnss.metric.ntrip;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.orekit.gnss.metric.messages.ParsedMessage;
......@@ -25,21 +29,35 @@ import org.orekit.gnss.metric.messages.ParsedMessage;
public class CountingObserver implements MessageObserver {
private Function<ParsedMessage, Boolean> filter;
private int count;
private AtomicInteger received = new AtomicInteger(0);
private Phaser phaser = new Phaser(1);
public CountingObserver(final Function<ParsedMessage, Boolean> filter) {
this.filter = filter;
this.count = 0;
}
public void messageAvailable(String mountPoint, ParsedMessage message) {
if (filter.apply(message)) {
++count;
final int i = received.incrementAndGet();
phaser.arrive();
}
}
public int getCount() {
return count;
/**
* Wait for a certain number of messages to be received.
*
* @param count number of messages to wait for.
* @param timeout when waiting in ms.
* @throws InterruptedException if interrupted while waiting.
* @throws TimeoutException if timeout is reached while waiting.
*/
public void awaitCount(int count, long timeout) throws InterruptedException, TimeoutException {
final long start = System.currentTimeMillis();
final long end = start + timeout;
int phase = phaser.getPhase();
while (received.get() < count && (timeout = end - System.currentTimeMillis()) > 0) {
phase = phaser.awaitAdvanceInterruptibly(phase, timeout, TimeUnit.MILLISECONDS);
}
}
}
......@@ -27,13 +27,18 @@ import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class DummyServer {
private final String[] fileNames;
private final ServerSocket server;
private final Map<String, String> requestProperties;
private final CountDownLatch done = new CountDownLatch(1);
private final AtomicReference<Exception> error = new AtomicReference<>(null);
public DummyServer(final String... fileNames) throws IOException {
this.fileNames = fileNames.clone();
......@@ -49,6 +54,13 @@ public class DummyServer {
return requestProperties.get(key);
}
public void await(long timeout, TimeUnit unit) throws Exception {
done.await(timeout, unit);
final Exception e = error.get();
if (e != null)
throw e;
}
public void run() {
Executors.newSingleThreadExecutor().execute(() -> {
try {
......@@ -98,8 +110,11 @@ public class DummyServer {
socket.close();
}
done.countDown();
} catch (IOException ioe) {
error.set(ioe);
done.countDown();
throw new RuntimeException(ioe);
}
});
......
......@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import org.hipparchus.util.FastMath;
import org.junit.Assert;
......@@ -162,21 +163,23 @@ public class NtripClientTest {
}
@Test
public void testUnknownMessage() {
public void testUnknownMessage() throws Exception {
DummyServer server = prepareServer("/gnss/ntrip/sourcetable-products.igs-ip.net.txt",
"/gnss/ntrip/RTCM3EPH01.dat");
server.run();
NtripClient client = new NtripClient("localhost", server.getServerPort());
client.setTimeout(100);
client.setReconnectParameters(0.001, 2.0, 2);
client.addObserver(1042, "RTCM3EPH01", new CountingObserver(m -> true));
final CountingObserver counter = new CountingObserver(m -> true);
client.addObserver(1042, "RTCM3EPH01", counter);
client.addObserver(1042, "RTCM3EPH01", new LoggingObserver());
client.startStreaming("RTCM3EPH01", Type.RTCM, false, false);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// ignored
}
server.await(10, TimeUnit.SECONDS);
// the 31st message causes the exception
counter.awaitCount(30, 30 * 1000);
// wait a bit for next message, the 31st
// better condition would be to wait for StreamMonitor.exception to not be null
Thread.sleep(1000);
try {
client.stopStreaming(100);
Assert.fail("an exception should have been thrown");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment