Project

General

Profile

Tmp » МногопоточныйБрокер.java

Николай Пакулин, 04/02/2015 05:19 PM

 
package ru.ispras.kmb.spring6;

import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class МногопоточныйБрокер implements Broker {
private final class Sender implements Runnable {
private Subscriber target;
private String topic;
private String msg;

public Sender(Subscriber s, String topic, String msg) {
target = s;
this.topic = topic;
this.msg = msg;
}

@Override
public void run() {
target.receive(topic, msg);
}
}

ExecutorService es = Executors.newFixedThreadPool(5);
Map<String, Set<Subscriber>> subscribers = new Hashtable<>();
@Override
public void subscribe(Subscriber s, String topic) {
synchronized (subscribers) {
if (!subscribers.containsKey(topic)) {
subscribers.put(topic, new HashSet<Subscriber>());
}
subscribers.get(topic).add(s);
}
}

@Override
public void unsubscribe(Subscriber s, String topic) {
// TODO Auto-generated method stub
}

@Override
public void publish(String topic, String msg) {
synchronized (subscribers) {
if (subscribers.containsKey(topic)) {
for (Subscriber s : subscribers.get(topic)) {
es.execute(new Sender(s, topic, msg));

}
}
}
}

public void exit() {
synchronized (subscribers) {
subscribers.clear();
}
es.shutdown();
}

}
(3-3/4)