Skip to content

Commit 4d1a007

Browse files
authored
[ADDED] Support for disabling queue groups at service, group, and endpoint levels (#1797)
1 parent ba95fce commit 4d1a007

File tree

3 files changed

+212
-45
lines changed

3 files changed

+212
-45
lines changed

Diff for: micro/README.md

+25-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,10 @@ _ = numbersGroup.AddEndpoint("multiply", micro.HandlerFunc(multiplyHandler))
9898
## Customizing queue groups
9999

100100
For each service, group and endpoint the queue group used to gather responses
101-
can be customized. If not provided a default queue group will be used (`q`).
102-
Customizing queue groups can be useful to e.g. implement fanout request pattern
103-
or hedged request pattern (to reduce tail latencies by only waiting for the
104-
first response for multiple service instances).
101+
can be customized or disabled. If not provided a default queue group will be
102+
used (`q`). Customizing queue groups can be useful to e.g. implement fanout
103+
request pattern or hedged request pattern (to reduce tail latencies by only
104+
waiting for the first response for multiple service instances).
105105

106106
Let's say we have multiple services listening on the same subject, but with
107107
different queue groups:
@@ -154,6 +154,27 @@ Queue groups can be overwritten by setting them on groups and endpoints as well:
154154
g.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q3"))
155155
```
156156

157+
Similarly, queue groups can be disabled on service config, group and endpoint levels. If disabled,
158+
a standard NATS subscription will be created for the endpoint.
159+
160+
```go
161+
// disable queue group for the service
162+
srv, _ := micro.AddService(nc, micro.Config{
163+
Name: "EchoService",
164+
Version: "1.0.0",
165+
QueueGroupDisabled: true,
166+
})
167+
168+
// create a group with queue group disabled
169+
srv.AddGroup("g", micro.WithEndpointQueueGroupDisabled())
170+
171+
// create an endpoint with queue group disabled
172+
srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroupDisabled())
173+
```
174+
175+
When disabling queue groups, same inheritance rules apply as for customizing
176+
queue groups. (service config -> group -> endpoint)
177+
157178
## Discovery and Monitoring
158179

159180
Each service is assigned a unique ID on creation. A service instance is

Diff for: micro/service.go

+77-36
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,12 @@ type (
7373
subject string
7474
metadata map[string]string
7575
queueGroup string
76+
qgDisabled bool
7677
}
7778

7879
groupOpts struct {
7980
queueGroup string
81+
qgDisabled bool
8082
}
8183

8284
// ErrHandler is a function used to configure a custom error handler for a service,
@@ -152,9 +154,10 @@ type (
152154
}
153155

154156
group struct {
155-
service *service
156-
prefix string
157-
queueGroup string
157+
service *service
158+
prefix string
159+
queueGroup string
160+
queueGroupDisabled bool
158161
}
159162

160163
// Verb represents a name of the monitoring service.
@@ -182,6 +185,9 @@ type (
182185
// QueueGroup can be used to override the default queue group name.
183186
QueueGroup string `json:"queue_group"`
184187

188+
// QueueGroupDisabled disables the queue group for the service.
189+
QueueGroupDisabled bool `json:"queue_group_disabled"`
190+
185191
// StatsHandler is a user-defined custom function.
186192
// used to calculate additional service stats.
187193
StatsHandler StatsHandler
@@ -205,6 +211,9 @@ type (
205211

206212
// QueueGroup can be used to override the default queue group name.
207213
QueueGroup string `json:"queue_group"`
214+
215+
// QueueGroupDisabled disables the queue group for the endpoint.
216+
QueueGroupDisabled bool `json:"queue_group_disabled"`
208217
}
209218

210219
// NATSError represents an error returned by a NATS Subscription.
@@ -397,11 +406,11 @@ func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt)
397406
if options.subject != "" {
398407
subject = options.subject
399408
}
400-
queueGroup := queueGroupName(options.queueGroup, s.Config.QueueGroup)
401-
return addEndpoint(s, name, subject, handler, options.metadata, queueGroup)
409+
queueGroup, noQueue := resolveQueueGroup(options.queueGroup, s.Config.QueueGroup, options.qgDisabled, s.Config.QueueGroupDisabled)
410+
return addEndpoint(s, name, subject, handler, options.metadata, queueGroup, noQueue)
402411
}
403412

404-
func addEndpoint(s *service, name, subject string, handler Handler, metadata map[string]string, queueGroup string) error {
413+
func addEndpoint(s *service, name, subject string, handler Handler, metadata map[string]string, queueGroup string, noQueue bool) error {
405414
if !nameRegexp.MatchString(name) {
406415
return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation)
407416
}
@@ -414,21 +423,34 @@ func addEndpoint(s *service, name, subject string, handler Handler, metadata map
414423
endpoint := &Endpoint{
415424
service: s,
416425
EndpointConfig: EndpointConfig{
417-
Subject: subject,
418-
Handler: handler,
419-
Metadata: metadata,
420-
QueueGroup: queueGroup,
426+
Subject: subject,
427+
Handler: handler,
428+
Metadata: metadata,
429+
QueueGroup: queueGroup,
430+
QueueGroupDisabled: noQueue,
421431
},
422432
Name: name,
423433
}
424434

425-
sub, err := s.nc.QueueSubscribe(
426-
subject,
427-
queueGroup,
428-
func(m *nats.Msg) {
429-
s.reqHandler(endpoint, &request{msg: m})
430-
},
431-
)
435+
var sub *nats.Subscription
436+
var err error
437+
438+
if !noQueue {
439+
sub, err = s.nc.QueueSubscribe(
440+
subject,
441+
queueGroup,
442+
func(m *nats.Msg) {
443+
s.reqHandler(endpoint, &request{msg: m})
444+
},
445+
)
446+
} else {
447+
sub, err = s.nc.Subscribe(
448+
subject,
449+
func(m *nats.Msg) {
450+
s.reqHandler(endpoint, &request{msg: m})
451+
},
452+
)
453+
}
432454
if err != nil {
433455
return err
434456
}
@@ -449,11 +471,12 @@ func (s *service) AddGroup(name string, opts ...GroupOpt) Group {
449471
for _, opt := range opts {
450472
opt(&o)
451473
}
452-
queueGroup := queueGroupName(o.queueGroup, s.Config.QueueGroup)
474+
queueGroup, noQueue := resolveQueueGroup(o.queueGroup, s.Config.QueueGroup, o.qgDisabled, s.Config.QueueGroupDisabled)
453475
return &group{
454-
service: s,
455-
prefix: name,
456-
queueGroup: queueGroup,
476+
service: s,
477+
prefix: name,
478+
queueGroup: queueGroup,
479+
queueGroupDisabled: noQueue,
457480
}
458481
}
459482

@@ -797,29 +820,33 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e
797820
if g.prefix == "" {
798821
endpointSubject = subject
799822
}
800-
queueGroup := queueGroupName(options.queueGroup, g.queueGroup)
823+
queueGroup, noQueue := resolveQueueGroup(options.queueGroup, g.queueGroup, options.qgDisabled, g.queueGroupDisabled)
801824

802-
return addEndpoint(g.service, name, endpointSubject, handler, options.metadata, queueGroup)
825+
return addEndpoint(g.service, name, endpointSubject, handler, options.metadata, queueGroup, noQueue)
803826
}
804827

805-
func queueGroupName(customQG, parentQG string) string {
806-
queueGroup := customQG
807-
if queueGroup == "" {
808-
if parentQG != "" {
809-
queueGroup = parentQG
810-
} else {
811-
queueGroup = DefaultQueueGroup
812-
}
828+
func resolveQueueGroup(customQG, parentQG string, disabled, parentDisabled bool) (string, bool) {
829+
if disabled {
830+
return "", true
831+
}
832+
if customQG != "" {
833+
return customQG, false
834+
}
835+
if parentDisabled {
836+
return "", true
813837
}
814-
return queueGroup
838+
if parentQG != "" {
839+
return parentQG, false
840+
}
841+
return DefaultQueueGroup, false
815842
}
816843

817844
func (g *group) AddGroup(name string, opts ...GroupOpt) Group {
818845
var o groupOpts
819846
for _, opt := range opts {
820847
opt(&o)
821848
}
822-
queueGroup := queueGroupName(o.queueGroup, g.queueGroup)
849+
queueGroup, noQueue := resolveQueueGroup(o.queueGroup, g.queueGroup, o.qgDisabled, g.queueGroupDisabled)
823850

824851
parts := make([]string, 0, 2)
825852
if g.prefix != "" {
@@ -831,9 +858,10 @@ func (g *group) AddGroup(name string, opts ...GroupOpt) Group {
831858
prefix := strings.Join(parts, ".")
832859

833860
return &group{
834-
service: g.service,
835-
prefix: prefix,
836-
queueGroup: queueGroup,
861+
service: g.service,
862+
prefix: prefix,
863+
queueGroup: queueGroup,
864+
queueGroupDisabled: noQueue,
837865
}
838866
}
839867

@@ -907,8 +935,21 @@ func WithEndpointQueueGroup(queueGroup string) EndpointOpt {
907935
}
908936
}
909937

938+
func WithEndpointQueueGroupDisabled() EndpointOpt {
939+
return func(e *endpointOpts) error {
940+
e.qgDisabled = true
941+
return nil
942+
}
943+
}
944+
910945
func WithGroupQueueGroup(queueGroup string) GroupOpt {
911946
return func(g *groupOpts) {
912947
g.queueGroup = queueGroup
913948
}
914949
}
950+
951+
func WithGroupQueueGroupDisabled() GroupOpt {
952+
return func(g *groupOpts) {
953+
g.qgDisabled = true
954+
}
955+
}

Diff for: micro/test/service_test.go

+110-5
Original file line numberDiff line numberDiff line change
@@ -1574,6 +1574,59 @@ func TestCustomQueueGroup(t *testing.T) {
15741574
"baz": "custom",
15751575
},
15761576
},
1577+
{
1578+
name: "disable queue group on service config",
1579+
endpointInit: func(t *testing.T, nc *nats.Conn) micro.Service {
1580+
srv, err := micro.AddService(nc, micro.Config{
1581+
Name: "test_service",
1582+
Version: "0.0.1",
1583+
QueueGroupDisabled: true,
1584+
Endpoint: &micro.EndpointConfig{
1585+
Subject: "foo",
1586+
Handler: micro.HandlerFunc(func(r micro.Request) {}),
1587+
},
1588+
})
1589+
if err != nil {
1590+
t.Fatalf("Unexpected error: %v", err)
1591+
}
1592+
1593+
// add endpoint on service directly, should have inherited disabled queue group
1594+
err = srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}))
1595+
if err != nil {
1596+
t.Fatalf("Unexpected error: %v", err)
1597+
}
1598+
1599+
// add group with queue group from service config
1600+
g1 := srv.AddGroup("g1")
1601+
1602+
// add endpoint on group, should have queue group disabled
1603+
err = g1.AddEndpoint("baz", micro.HandlerFunc(func(r micro.Request) {}))
1604+
if err != nil {
1605+
t.Fatalf("Unexpected error: %v", err)
1606+
}
1607+
1608+
// add endpoint on a service with queue group enabled
1609+
err = srv.AddEndpoint("qux", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-qux"))
1610+
if err != nil {
1611+
t.Fatalf("Unexpected error: %v", err)
1612+
}
1613+
1614+
// add endpoint on group and set custom queue group
1615+
err = g1.AddEndpoint("quux", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-quux"))
1616+
if err != nil {
1617+
t.Fatalf("Unexpected error: %v", err)
1618+
}
1619+
1620+
return srv
1621+
},
1622+
expectedQueueGroups: map[string]string{
1623+
"default": "",
1624+
"bar": "",
1625+
"baz": "",
1626+
"qux": "q-qux",
1627+
"quux": "q-quux",
1628+
},
1629+
},
15771630
{
15781631
name: "overwriting queue groups",
15791632
endpointInit: func(t *testing.T, nc *nats.Conn) micro.Service {
@@ -1598,6 +1651,9 @@ func TestCustomQueueGroup(t *testing.T) {
15981651
// overwrite parent group queue group
15991652
g3 := g2.AddGroup("g3", micro.WithGroupQueueGroup("q-g3"))
16001653

1654+
// disable queue group on group
1655+
g4 := g2.AddGroup("g4", micro.WithGroupQueueGroupDisabled())
1656+
16011657
// add endpoint on service directly, overwriting the queue group
16021658
err = srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-bar"))
16031659
if err != nil {
@@ -1621,14 +1677,20 @@ func TestCustomQueueGroup(t *testing.T) {
16211677
if err != nil {
16221678
t.Fatalf("Unexpected error: %v", err)
16231679
}
1680+
1681+
err = g4.AddEndpoint("foo-disabled", micro.HandlerFunc(func(r micro.Request) {}))
1682+
if err != nil {
1683+
t.Fatalf("Unexpected error: %v", err)
1684+
}
16241685
return srv
16251686
},
16261687
expectedQueueGroups: map[string]string{
1627-
"default": "q-default",
1628-
"bar": "q-bar",
1629-
"baz": "q-g1",
1630-
"qux": "q-qux",
1631-
"quux": "q-g3",
1688+
"default": "q-default",
1689+
"bar": "q-bar",
1690+
"baz": "q-g1",
1691+
"qux": "q-qux",
1692+
"quux": "q-g3",
1693+
"foo-disabled": "",
16321694
},
16331695
},
16341696
{
@@ -1805,3 +1867,46 @@ func TestCustomQueueGroupMultipleResponses(t *testing.T) {
18051867
}
18061868
}
18071869
}
1870+
1871+
func TestDisableQueueGroup(t *testing.T) {
1872+
s := RunServerOnPort(-1)
1873+
defer s.Shutdown()
1874+
1875+
nc, err := nats.Connect(s.ClientURL())
1876+
if err != nil {
1877+
t.Fatalf("Expected to connect to server, got %v", err)
1878+
}
1879+
defer nc.Close()
1880+
wg := sync.WaitGroup{}
1881+
1882+
// Create 5 service responders.
1883+
config := micro.Config{
1884+
Name: "CoolAddService",
1885+
Version: "0.1.0",
1886+
Description: "Add things together",
1887+
Metadata: map[string]string{"basic": "metadata"},
1888+
Endpoint: &micro.EndpointConfig{
1889+
Subject: "svc.add",
1890+
Handler: micro.HandlerFunc(func(r micro.Request) {
1891+
r.Respond(nil)
1892+
wg.Done()
1893+
}),
1894+
},
1895+
QueueGroupDisabled: true,
1896+
}
1897+
1898+
for range 10 {
1899+
srv, err := micro.AddService(nc, config)
1900+
if err != nil {
1901+
t.Fatalf("Unexpected error: %v", err)
1902+
}
1903+
defer srv.Stop()
1904+
}
1905+
wg.Add(10)
1906+
// Send a request to the service.
1907+
if err = nc.PublishRequest("svc.add", "rply", []byte("req")); err != nil {
1908+
t.Fatalf("Unexpected error: %v", err)
1909+
}
1910+
wg.Wait()
1911+
1912+
}

0 commit comments

Comments
 (0)