|
package ru.ispras.kmb.spring6;
|
|
|
|
import static org.junit.Assert.*;
|
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.SynchronousQueue;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import org.junit.Test;
|
|
|
|
public class TestBroker {
|
|
static class ReceivedMessage {
|
|
public String topic;
|
|
public String message;
|
|
public ReceivedMessage(String topic, String message) {
|
|
super();
|
|
this.topic = topic;
|
|
this.message = message;
|
|
}
|
|
|
|
}
|
|
static class TestSubscriber implements Subscriber {
|
|
BlockingQueue<ReceivedMessage> sink;
|
|
|
|
public TestSubscriber(BlockingQueue<ReceivedMessage> sink) {
|
|
super();
|
|
this.sink = sink;
|
|
}
|
|
|
|
@Override
|
|
public void receive(String topic, String msg) {
|
|
try {
|
|
sink.put(new ReceivedMessage(topic, msg));
|
|
} catch (InterruptedException e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
}
|
|
|
|
@Test
|
|
public void test() throws InterruptedException {
|
|
Broker target = new МногопоточныйБрокер();
|
|
SynchronousQueue<ReceivedMessage> fromSubs = new SynchronousQueue<>();
|
|
// Создать рабочих подписчиков.
|
|
TestSubscriber[] subA = new TestSubscriber[] {
|
|
new TestSubscriber(fromSubs),
|
|
new TestSubscriber(fromSubs),
|
|
new TestSubscriber(fromSubs),
|
|
};
|
|
TestSubscriber[] subB = new TestSubscriber[] {
|
|
new TestSubscriber(fromSubs),
|
|
new TestSubscriber(fromSubs),
|
|
};
|
|
// подписчик при инициализации регистрируется у брокера
|
|
for (TestSubscriber ts: subA) {
|
|
target.subscribe(ts, "A");
|
|
}
|
|
for (TestSubscriber ts: subB) {
|
|
target.subscribe(ts, "B");
|
|
}
|
|
// Опубликовать сообщение в брокере
|
|
target.publish("A", "Hello, world!");
|
|
// Дождаться уведомления
|
|
for (int i = 0; i < subA.length; i++) {
|
|
ReceivedMessage actual = fromSubs.poll(100, TimeUnit.MILLISECONDS);
|
|
// вынести вердикт
|
|
assertNotNull(actual);
|
|
assertEquals("A", actual.topic);
|
|
assertEquals("Hello, world!", actual.message);
|
|
}
|
|
ReceivedMessage expectedNull = fromSubs.poll(100, TimeUnit.MILLISECONDS);
|
|
assertNull(expectedNull);
|
|
((МногопоточныйБрокер)target).exit();
|
|
}
|
|
|
|
}
|