Skip to content

Commit e545e3b

Browse files
authored
[ISSUE #6803] Benchmark support reportInterval option (#6804)
1 parent ea32980 commit e545e3b

File tree

4 files changed

+39
-13
lines changed

4 files changed

+39
-13
lines changed

example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,19 @@ public static void main(String[] args) throws MQClientException {
7474
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
7575
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
7676
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
77+
final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
7778

7879
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
79-
"aclEnable: %s%n compressEnable: %s%n",
80-
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress);
80+
"aclEnable: %s%n compressEnable: %s, reportInterval: %d%n",
81+
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress, reportInterval);
8182

8283
StringBuilder sb = new StringBuilder(messageSize);
8384
for (int i = 0; i < messageSize; i++) {
8485
sb.append(RandomStringUtils.randomAlphanumeric(1));
8586
}
8687
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);
8788

88-
final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer();
89+
final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(reportInterval);
8990
statsBenchmark.start();
9091

9192
RPCHook rpcHook = null;
@@ -253,6 +254,10 @@ public static Options buildCommandlineOptions(final Options options) {
253254
opt.setRequired(false);
254255
options.addOption(opt);
255256

257+
opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
258+
opt.setRequired(false);
259+
options.addOption(opt);
260+
256261
return options;
257262
}
258263

@@ -359,6 +364,12 @@ class StatsBenchmarkBatchProducer {
359364

360365
private final LinkedList<Long[]> snapshotList = new LinkedList<>();
361366

367+
private final int reportInterval;
368+
369+
public StatsBenchmarkBatchProducer(int reportInterval) {
370+
this.reportInterval = reportInterval;
371+
}
372+
362373
public Long[] createSnapshot() {
363374
Long[] snap = new Long[] {
364375
System.currentTimeMillis(),
@@ -432,7 +443,7 @@ public void run() {
432443
e.printStackTrace();
433444
}
434445
}
435-
}, 10000, 10000, TimeUnit.MILLISECONDS);
446+
}, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
436447
}
437448

438449
public void shutdown() {

example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,15 @@ public static void main(String[] args) throws MQClientException, IOException {
6868
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
6969
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
7070
final boolean clientRebalanceEnable = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : true;
71+
final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
7172

7273
String group = groupPrefix;
7374
if (Boolean.parseBoolean(isSuffixEnable)) {
7475
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
7576
}
7677

77-
System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
78-
topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
78+
System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s, reportInterval: %d%n",
79+
topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable, reportInterval);
7980

8081
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
8182

@@ -124,7 +125,7 @@ public void run() {
124125
e.printStackTrace();
125126
}
126127
}
127-
}, 10000, 10000, TimeUnit.MILLISECONDS);
128+
}, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
128129

129130
RPCHook rpcHook = null;
130131
if (aclEnable) {
@@ -235,6 +236,10 @@ public static Options buildCommandlineOptions(final Options options) {
235236
opt.setRequired(false);
236237
options.addOption(opt);
237238

239+
opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
240+
opt.setRequired(false);
241+
options.addOption(opt);
242+
238243
return options;
239244
}
240245

example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,13 @@ public static void main(String[] args) throws MQClientException {
8282
final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
8383
final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
8484
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
85+
final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
8586

8687
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
8788
"traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
88-
"asyncEnable: %s%n compressEnable: %s%n",
89+
"asyncEnable: %s%n compressEnable: %s, reportInterval: %d%n",
8990
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
90-
delayEnable, delayLevel, asyncEnable, enableCompress);
91+
delayEnable, delayLevel, asyncEnable, enableCompress, reportInterval);
9192

9293
StringBuilder sb = new StringBuilder(messageSize);
9394
for (int i = 0; i < messageSize; i++) {
@@ -139,7 +140,7 @@ public void run() {
139140
e.printStackTrace();
140141
}
141142
}
142-
}, 10000, 10000, TimeUnit.MILLISECONDS);
143+
}, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
143144

144145
RPCHook rpcHook = null;
145146
if (aclEnable) {
@@ -370,6 +371,10 @@ public static Options buildCommandlineOptions(final Options options) {
370371
opt.setRequired(false);
371372
options.addOption(opt);
372373

374+
opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
375+
opt.setRequired(false);
376+
options.addOption(opt);
377+
373378
return options;
374379
}
375380

example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
7979
config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
8080
config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
8181
config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
82+
config.reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
8283

8384
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
8485

@@ -105,8 +106,7 @@ private void printStats() {
105106
Snapshot begin = snapshotList.getFirst();
106107
Snapshot end = snapshotList.getLast();
107108

108-
final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount)
109-
+ (end.sendRequestFailedCount - begin.sendRequestFailedCount);
109+
final long sendCount = end.sendRequestSuccessCount - begin.sendRequestSuccessCount;
110110
final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
111111
final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);
112112

@@ -131,7 +131,7 @@ public void run() {
131131
e.printStackTrace();
132132
}
133133
}
134-
}, 10000, 10000, TimeUnit.MILLISECONDS);
134+
}, config.reportInterval, config.reportInterval, TimeUnit.MILLISECONDS);
135135

136136
RPCHook rpcHook = null;
137137
if (config.aclEnable) {
@@ -291,6 +291,10 @@ public static Options buildCommandlineOptions(final Options options) {
291291
opt.setRequired(false);
292292
options.addOption(opt);
293293

294+
opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
295+
opt.setRequired(false);
296+
options.addOption(opt);
297+
294298
return options;
295299
}
296300
}
@@ -475,6 +479,7 @@ class TxSendConfig {
475479
int sendInterval;
476480
boolean aclEnable;
477481
boolean msgTraceEnable;
482+
int reportInterval;
478483
}
479484

480485
class LRUMap<K, V> extends LinkedHashMap<K, V> {

0 commit comments

Comments
 (0)