forked from cube2222/octosql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathorder_sensitive_transform.go
154 lines (139 loc) · 4.24 KB
/
order_sensitive_transform.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package nodes
import (
"fmt"
"time"
"github.com/google/btree"
. "github.com/cube2222/octosql/execution"
"github.com/cube2222/octosql/octosql"
)
type OrderSensitiveTransform struct {
source Node
orderByKeyExprs []Expression
orderByDirectionMultipliers []int
limit *Expression
noRetractionsPossible bool
}
func NewOrderSensitiveTransform(source Node, orderByKeyExprs []Expression, orderByDirectionMultipliers []int, limit *Expression, noRetractionsPossible bool) *OrderSensitiveTransform {
return &OrderSensitiveTransform{
source: source,
orderByKeyExprs: orderByKeyExprs,
orderByDirectionMultipliers: orderByDirectionMultipliers,
limit: limit,
noRetractionsPossible: noRetractionsPossible,
}
}
type orderByItem struct {
Key []octosql.Value
Values []octosql.Value
Count int
DirectionMultipliers []int
}
func (item *orderByItem) Less(than btree.Item) bool {
thanTyped, ok := than.(*orderByItem)
if !ok {
panic(fmt.Sprintf("invalid order by key comparison: %T", than))
}
for i := 0; i < len(item.Key); i++ {
if comp := item.Key[i].Compare(thanTyped.Key[i]); comp != 0 {
return comp*item.DirectionMultipliers[i] == -1
}
}
// If keys are equal, differentiate by values.
for i := 0; i < len(item.Values); i++ {
if comp := item.Values[i].Compare(thanTyped.Values[i]); comp != 0 {
return comp == -1
}
}
return false
}
func (o *OrderSensitiveTransform) Run(execCtx ExecutionContext, produce ProduceFn, metaSend MetaSendFn) error {
var limit *int64
if o.limit != nil {
val, err := (*o.limit).Evaluate(execCtx)
if err != nil {
return fmt.Errorf("couldn't evaluate limit: %w", err)
}
if val.Int == 0 {
return nil
}
if val.Int < 0 {
return fmt.Errorf("limit must be positive, got %d", val.Int)
}
limit = &val.Int
}
recordCounts := btree.New(BTreeDefaultDegree)
o.source.Run(
execCtx,
func(ctx ProduceContext, record Record) error {
key := make([]octosql.Value, len(o.orderByKeyExprs))
for i := range o.orderByKeyExprs {
keyValue, err := o.orderByKeyExprs[i].Evaluate(execCtx.WithRecord(record))
if err != nil {
return fmt.Errorf("couldn't evaluate order by %d key expression: %w", i, err)
}
key[i] = keyValue
}
item := recordCounts.Get(&orderByItem{Key: key, Values: record.Values, DirectionMultipliers: o.orderByDirectionMultipliers})
var itemTyped *orderByItem
if item == nil {
itemTyped = &orderByItem{
Key: key,
Values: record.Values,
Count: 0,
DirectionMultipliers: o.orderByDirectionMultipliers,
}
} else {
var ok bool
itemTyped, ok = item.(*orderByItem)
if !ok {
panic(fmt.Sprintf("invalid order by item: %v", item))
}
}
if !record.Retraction {
itemTyped.Count++
} else {
itemTyped.Count--
}
if itemTyped.Count > 0 {
recordCounts.ReplaceOrInsert(itemTyped)
} else {
recordCounts.Delete(itemTyped)
}
if limit != nil && o.noRetractionsPossible && int64(recordCounts.Len()) > *limit {
// This doesn't mean we'll always keep just the records that are needed, because tree nodes might have count > 1.
// That said, it's a good approximation, and we'll definitely not lose something that we need to have.
recordCounts.DeleteMax()
}
return nil
},
func(ctx ProduceContext, msg MetadataMessage) error {
return nil
},
)
if err := produceOrderByItems(ProduceFromExecutionContext(execCtx), recordCounts, limit, produce); err != nil {
return fmt.Errorf("couldn't produce ordered items: %w", err)
}
return nil
}
func produceOrderByItems(ctx ProduceContext, recordCounts *btree.BTree, limit *int64, produce ProduceFn) error {
i := int64(0)
var outErr error
recordCounts.Ascend(func(item btree.Item) bool {
if limit != nil && i >= *limit {
return false
}
i++
itemTyped, ok := item.(*orderByItem)
if !ok {
panic(fmt.Sprintf("invalid order by item: %v", item))
}
for i := 0; i < itemTyped.Count; i++ {
if err := produce(ctx, NewRecord(itemTyped.Values, false, time.Time{})); err != nil {
outErr = err
return false
}
}
return true
})
return outErr
}