Skip to content

Commit

Permalink
Implement configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Superioz committed Feb 3, 2021
1 parent 0e25362 commit 8ad7983
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 24 deletions.
2 changes: 1 addition & 1 deletion hagrid-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>hagrid-api</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package dev.volix.rewinside.odyssey.hagrid;

import dev.volix.lib.grape.Service;
import dev.volix.rewinside.odyssey.hagrid.config.PropertiesConfig;
import dev.volix.rewinside.odyssey.hagrid.exception.HagridConnectionException;

/**
* @author Tobias Büser
*/
public interface HagridService extends Service {

PropertiesConfig getConfiguration();

default HagridWizard wizard() {
return new HagridWizard(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package dev.volix.rewinside.odyssey.hagrid.config;

import java.util.Properties;

/**
* @author Tobias Büser
*/
public class PropertiesConfig {

private final Properties properties;

public PropertiesConfig(final Properties properties) {
this.properties = properties;
}

public <V> V get(final String key) {
if (!this.properties.contains(key)) {
throw new IllegalArgumentException(String.format("unknown configuration for key '%s'", key));
}

return (V) this.properties.get(key);
}

public String getString(final String key) {
return this.get(key);
}

public int getInt(final String key) {
return this.get(key);
}

public double getDouble(final String key) {
return this.get(key);
}

public boolean getBoolean(final String key) {
return this.get(key);
}

public Properties getProperties() {
return this.properties;
}

}
2 changes: 1 addition & 1 deletion hagrid-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>hagrid-core</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public HagridCommunicationHandler(final HagridService service) {
this.service = service;

final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
threadPool.scheduleAtFixedRate(new CleanupTask(this.listenerRegistry), 2, 2, TimeUnit.SECONDS);

final int cleanupDelayInSeconds = service.getConfiguration().getInt(HagridConfig.LISTENER_CLEANUP_DELAY_IN_SECONDS);
threadPool.scheduleAtFixedRate(new CleanupTask(this.listenerRegistry), cleanupDelayInSeconds, cleanupDelayInSeconds, TimeUnit.SECONDS);
}

private String getTopicPrefix(final String pattern) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dev.volix.rewinside.odyssey.hagrid;

import dev.volix.rewinside.odyssey.hagrid.config.PropertiesConfig;
import java.util.Properties;

/**
* @author Tobias Büser
*/
public class HagridConfig extends PropertiesConfig {

public static final String MAX_SUBSCRIBER = "downstream.max_subscriber";
public static final String LISTENER_CLEANUP_DELAY_IN_SECONDS = "listener.cleanup.delay";
public static final String RECONNECT_DELAY_IN_SECONDS = "connection.reconnect.delay";

public HagridConfig(final Properties properties) {
super(properties);

properties.putIfAbsent(MAX_SUBSCRIBER, 10);
properties.putIfAbsent(LISTENER_CLEANUP_DELAY_IN_SECONDS, 2);
properties.putIfAbsent(RECONNECT_DELAY_IN_SECONDS, 10);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public void handleError(final Throwable error) {
}

if (this.reconnectTask != null && this.reconnectTask.isRunning()) return;
this.reconnectTask = new ReconnectTask(Duration.of(10, ChronoUnit.SECONDS));
final int delayInSeconds = this.service.getConfiguration().getInt(HagridConfig.RECONNECT_DELAY_IN_SECONDS);

this.reconnectTask = new ReconnectTask(Duration.of(delayInSeconds, ChronoUnit.SECONDS));
this.threadPool.execute(this.reconnectTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ public class HagridDownstreamHandler implements DownstreamHandler {
private final HagridService service;
private final Supplier<HagridSubscriber> createSubscriberFunction;

private final ExecutorService threadPool = Executors.newFixedThreadPool(10, new DaemonThreadFactory());
private final int maxSubscriber;
private final ExecutorService threadPool;

private final List<ConsumerTask> consumerTasks = new ArrayList<>();
private final Map<HagridTopic<?>, ConsumerTask> topicsToConsumer = new HashMap<>();

public HagridDownstreamHandler(final HagridService service, final Supplier<HagridSubscriber> createSubscriberFunction) {
this.service = service;
this.createSubscriberFunction = createSubscriberFunction;

this.maxSubscriber = service.getConfiguration().getInt(HagridConfig.MAX_SUBSCRIBER);
this.threadPool = Executors.newFixedThreadPool(this.maxSubscriber, new DaemonThreadFactory());
}

@Override
Expand Down Expand Up @@ -58,7 +62,7 @@ public <T> void receive(final String topic, final HagridPacket<T> packet) {

@Override
public void addToNewSubscriber(final HagridTopic<?> topic) {
if (this.consumerTasks.size() >= 10) {
if (this.consumerTasks.size() >= this.maxSubscriber) {
throw new IllegalStateException("reached maximum of parallel consumer tasks");
}
final HagridSubscriber subscriber = this.createSubscriberFunction.get();
Expand Down
2 changes: 1 addition & 1 deletion hagrid-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>hagrid-kafka</artifactId>
<version>0.5.0</version>
<version>0.5.1</version>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import dev.volix.rewinside.odyssey.hagrid.ConnectionHandler;
import dev.volix.rewinside.odyssey.hagrid.DownstreamHandler;
import dev.volix.rewinside.odyssey.hagrid.HagridCommunicationHandler;
import dev.volix.rewinside.odyssey.hagrid.HagridConfig;
import dev.volix.rewinside.odyssey.hagrid.HagridDownstreamHandler;
import dev.volix.rewinside.odyssey.hagrid.HagridService;
import dev.volix.rewinside.odyssey.hagrid.HagridUpstreamHandler;
import dev.volix.rewinside.odyssey.hagrid.UpstreamHandler;
import dev.volix.rewinside.odyssey.hagrid.config.PropertiesConfig;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -22,37 +25,69 @@
*/
public class KafkaHagridService implements HagridService {

private final Properties properties;
private final PropertiesConfig hagridConfig;
private final Properties kafkaProperties;

private final KafkaConnectionHandler connectionHandler;
private final HagridUpstreamHandler upstreamHandler;
private final HagridDownstreamHandler downstreamHandler;
private final HagridCommunicationHandler communicationHandler;

public KafkaHagridService(final List<String> brokerAddresses, final String groupId, final KafkaAuth auth) {
this.properties = new Properties();
this.properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, String.join(",", brokerAddresses));
this.properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 3000);
this.properties.put(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG, 3000);
public KafkaHagridService(final List<String> brokerAddresses, final String groupId, final KafkaAuth auth, final Properties hagridConfig) {
this.hagridConfig = new HagridConfig(hagridConfig);

this.kafkaProperties = new Properties();
this.kafkaProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, String.join(",", brokerAddresses));
this.kafkaProperties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 3000);
this.kafkaProperties.put(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG, 3000);

auth.getProperties().forEach(this.properties::put);
auth.getProperties().forEach(this.kafkaProperties::put);

this.properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
this.properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
this.properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaPacketSerializer.class);
this.kafkaProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
this.kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
this.kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaPacketSerializer.class);

this.properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
this.properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
this.properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaPacketDeserializer.class);
this.kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
this.kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
this.kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaPacketDeserializer.class);

this.connectionHandler = new KafkaConnectionHandler(this, this.properties);
this.upstreamHandler = new HagridUpstreamHandler(this, new KafkaHagridPublisher(this.properties));
this.downstreamHandler = new HagridDownstreamHandler(this, () -> new KafkaHagridSubscriber(this.properties));
this.connectionHandler = new KafkaConnectionHandler(this, this.kafkaProperties);
this.upstreamHandler = new HagridUpstreamHandler(this, new KafkaHagridPublisher(this.kafkaProperties));
this.downstreamHandler = new HagridDownstreamHandler(this, () -> new KafkaHagridSubscriber(this.kafkaProperties));
this.communicationHandler = new HagridCommunicationHandler(this);
}

public KafkaHagridService(final List<String> brokerAddresses, final String groupId, final KafkaAuth auth) {
this(brokerAddresses, groupId, auth, new Properties());
}

public KafkaHagridService(final List<String> brokerAddresses, final KafkaAuth auth, final Properties hagridConfig) {
this(brokerAddresses, UUID.randomUUID().toString(), auth, hagridConfig);
}

public KafkaHagridService(final List<String> brokerAddresses, final KafkaAuth auth) {
this(brokerAddresses, auth, new Properties());
}

public KafkaHagridService(final String address, final String groupId, final KafkaAuth auth, final Properties hagridConfig) {
this(Collections.singletonList(address), groupId, auth, hagridConfig);
}

public KafkaHagridService(final String address, final String groupId, final KafkaAuth auth) {
this(Collections.singletonList(address), groupId, auth);
this(address, groupId, auth, new Properties());
}

public KafkaHagridService(final String address, final KafkaAuth auth, final Properties hagridConfig) {
this(Collections.singletonList(address), auth, hagridConfig);
}

public KafkaHagridService(final String address, final KafkaAuth auth) {
this(address, auth, new Properties());
}

@Override
public PropertiesConfig getConfiguration() {
return this.hagridConfig;
}

@Override
Expand Down

0 comments on commit 8ad7983

Please sign in to comment.