|
package ru.ispras.kmb.spring6;
|
|
|
|
import java.util.HashSet;
|
|
import java.util.Hashtable;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
|
|
public class МногопоточныйБрокер implements Broker {
|
|
private final class Sender implements Runnable {
|
|
private Subscriber target;
|
|
private String topic;
|
|
private String msg;
|
|
|
|
public Sender(Subscriber s, String topic, String msg) {
|
|
target = s;
|
|
this.topic = topic;
|
|
this.msg = msg;
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
target.receive(topic, msg);
|
|
}
|
|
}
|
|
|
|
ExecutorService es = Executors.newFixedThreadPool(5);
|
|
Map<String, Set<Subscriber>> subscribers = new Hashtable<>();
|
|
|
|
@Override
|
|
public void subscribe(Subscriber s, String topic) {
|
|
synchronized (subscribers) {
|
|
if (!subscribers.containsKey(topic)) {
|
|
subscribers.put(topic, new HashSet<Subscriber>());
|
|
}
|
|
subscribers.get(topic).add(s);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void unsubscribe(Subscriber s, String topic) {
|
|
// TODO Auto-generated method stub
|
|
|
|
}
|
|
|
|
@Override
|
|
public void publish(String topic, String msg) {
|
|
synchronized (subscribers) {
|
|
if (subscribers.containsKey(topic)) {
|
|
for (Subscriber s : subscribers.get(topic)) {
|
|
es.execute(new Sender(s, topic, msg));
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public void exit() {
|
|
synchronized (subscribers) {
|
|
subscribers.clear();
|
|
}
|
|
es.shutdown();
|
|
}
|
|
|
|
}
|