Project

General

Profile

Tmp » TestBroker.java

Николай Пакулин, 04/02/2015 05:19 PM

 
package ru.ispras.kmb.spring6;

import static org.junit.Assert.*;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class TestBroker {
static class ReceivedMessage {
public String topic;
public String message;
public ReceivedMessage(String topic, String message) {
super();
this.topic = topic;
this.message = message;
}
}
static class TestSubscriber implements Subscriber {
BlockingQueue<ReceivedMessage> sink;
public TestSubscriber(BlockingQueue<ReceivedMessage> sink) {
super();
this.sink = sink;
}

@Override
public void receive(String topic, String msg) {
try {
sink.put(new ReceivedMessage(topic, msg));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
public void test() throws InterruptedException {
Broker target = new МногопоточныйБрокер();
SynchronousQueue<ReceivedMessage> fromSubs = new SynchronousQueue<>();
// Создать рабочих подписчиков.
TestSubscriber[] subA = new TestSubscriber[] {
new TestSubscriber(fromSubs),
new TestSubscriber(fromSubs),
new TestSubscriber(fromSubs),
};
TestSubscriber[] subB = new TestSubscriber[] {
new TestSubscriber(fromSubs),
new TestSubscriber(fromSubs),
};
// подписчик при инициализации регистрируется у брокера
for (TestSubscriber ts: subA) {
target.subscribe(ts, "A");
}
for (TestSubscriber ts: subB) {
target.subscribe(ts, "B");
}
// Опубликовать сообщение в брокере
target.publish("A", "Hello, world!");
// Дождаться уведомления
for (int i = 0; i < subA.length; i++) {
ReceivedMessage actual = fromSubs.poll(100, TimeUnit.MILLISECONDS);
// вынести вердикт
assertNotNull(actual);
assertEquals("A", actual.topic);
assertEquals("Hello, world!", actual.message);
}
ReceivedMessage expectedNull = fromSubs.poll(100, TimeUnit.MILLISECONDS);
assertNull(expectedNull);
((МногопоточныйБрокер)target).exit();
}

}
(4-4/4)