From 3c2339cc340be00739274f4bf66cacf00a448927 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Wed, 24 Apr 2024 09:55:52 +0200 Subject: [PATCH 1/2] mark int type explicitly as int64 fixes cube2222#330 Signed-off-by: Thomas Jungblut --- aggregates/count.go | 2 +- aggregates/sum.go | 2 +- cmd/root.go | 2 +- datasources/csv/execution.go | 2 +- datasources/lines/execution.go | 2 +- datasources/parquet/reconstruct.go | 4 +-- execution/nodes/limit.go | 2 +- execution/nodes/order_sensitive_transform.go | 8 ++--- functions/functions.go | 32 ++++++++++---------- octosql/values.go | 4 +-- outputs/batch/live_output.go | 10 +++--- outputs/formats/json_format.go | 2 +- parser/parser.go | 2 +- plugins/internal/plugins/plugins.go | 2 +- 14 files changed, 38 insertions(+), 38 deletions(-) diff --git a/aggregates/count.go b/aggregates/count.go index c8ae6f94..e29a103a 100644 --- a/aggregates/count.go +++ b/aggregates/count.go @@ -15,7 +15,7 @@ var CountOverloads = []physical.AggregateDescriptor{ } type Count struct { - count int + count int64 } func NewCountPrototype() func() nodes.Aggregate { diff --git a/aggregates/sum.go b/aggregates/sum.go index 87dff0ba..1e3410bd 100644 --- a/aggregates/sum.go +++ b/aggregates/sum.go @@ -27,7 +27,7 @@ var SumOverloads = []physical.AggregateDescriptor{ } type SumInt struct { - sum int + sum int64 } func NewSumIntPrototype() func() nodes.Aggregate { diff --git a/cmd/root.go b/cmd/root.go index ccf81cb6..6f45d7cc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -385,7 +385,7 @@ octosql "SELECT * FROM plugins.plugins"`, switch output { case "live_table", "batch_table": - var limit *int + var limit *int64 if limitExpression != nil { val, err := (*limitExpression).Evaluate(execCtx) if err != nil { diff --git a/datasources/csv/execution.go b/datasources/csv/execution.go index c609a700..ac64e5a1 100644 --- a/datasources/csv/execution.go +++ b/datasources/csv/execution.go @@ -71,7 +71,7 @@ func (d *DatasourceExecuting) Run(ctx ExecutionContext, produce ProduceFn, metaS if octosql.Int.Is(d.fields[i].Type) == octosql.TypeRelationIs { integer, err := fastfloat.ParseInt64(str) if err == nil { - values[i] = octosql.NewInt(int(integer)) + values[i] = octosql.NewInt(integer) continue } } diff --git a/datasources/lines/execution.go b/datasources/lines/execution.go index ab7d537e..6eba426f 100644 --- a/datasources/lines/execution.go +++ b/datasources/lines/execution.go @@ -45,7 +45,7 @@ func (d *DatasourceExecuting) Run(ctx ExecutionContext, produce ProduceFn, metaS }) } - line := 0 + line := int64(0) for sc.Scan() { values := make([]octosql.Value, len(d.fields)) for i := range d.fields { diff --git a/datasources/parquet/reconstruct.go b/datasources/parquet/reconstruct.go index 41148902..645ded1e 100644 --- a/datasources/parquet/reconstruct.go +++ b/datasources/parquet/reconstruct.go @@ -293,9 +293,9 @@ func assignValue(dst *octosql.Value, src parquet.Value) error { case parquet.Boolean: *dst = octosql.NewBoolean(src.Boolean()) case parquet.Int32: - *dst = octosql.NewInt(int(src.Int32())) + *dst = octosql.NewInt(src.Int64()) case parquet.Int64: - *dst = octosql.NewInt(int(src.Int64())) + *dst = octosql.NewInt(src.Int64()) case parquet.Int96: *dst = octosql.NewString(src.Int96().String()) case parquet.Float: diff --git a/execution/nodes/limit.go b/execution/nodes/limit.go index 8a39c2ab..e037e07d 100644 --- a/execution/nodes/limit.go +++ b/execution/nodes/limit.go @@ -30,7 +30,7 @@ func (m *Limit) Run(ctx ExecutionContext, produce ProduceFn, metaSend MetaSendFn limitNodeID := ulid.MustNew(ulid.Now(), rand.Reader).String() - i := 0 + i := int64(0) if err := m.source.Run(ctx, func(produceCtx ProduceContext, record Record) error { if err := produce(produceCtx, record); err != nil { return fmt.Errorf("couldn't produce: %w", err) diff --git a/execution/nodes/order_sensitive_transform.go b/execution/nodes/order_sensitive_transform.go index 55234f63..88006520 100644 --- a/execution/nodes/order_sensitive_transform.go +++ b/execution/nodes/order_sensitive_transform.go @@ -58,7 +58,7 @@ func (item *orderByItem) Less(than btree.Item) bool { } func (o *OrderSensitiveTransform) Run(execCtx ExecutionContext, produce ProduceFn, metaSend MetaSendFn) error { - var limit *int + var limit *int64 if o.limit != nil { val, err := (*o.limit).Evaluate(execCtx) if err != nil { @@ -112,7 +112,7 @@ func (o *OrderSensitiveTransform) Run(execCtx ExecutionContext, produce ProduceF } else { recordCounts.Delete(itemTyped) } - if limit != nil && o.noRetractionsPossible && recordCounts.Len() > *limit { + if limit != nil && o.noRetractionsPossible && int64(recordCounts.Len()) > *limit { // This doesn't mean we'll always keep just the records that are needed, because tree nodes might have count > 1. // That said, it's a good approximation, and we'll definitely not lose something that we need to have. recordCounts.DeleteMax() @@ -130,8 +130,8 @@ func (o *OrderSensitiveTransform) Run(execCtx ExecutionContext, produce ProduceF return nil } -func produceOrderByItems(ctx ProduceContext, recordCounts *btree.BTree, limit *int, produce ProduceFn) error { - i := 0 +func produceOrderByItems(ctx ProduceContext, recordCounts *btree.BTree, limit *int64, produce ProduceFn) error { + i := int64(0) var outErr error recordCounts.Ascend(func(item btree.Item) bool { if limit != nil && i >= *limit { diff --git a/functions/functions.go b/functions/functions.go index fa70d728..fb089309 100644 --- a/functions/functions.go +++ b/functions/functions.go @@ -304,7 +304,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.String, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewString(strings.Repeat(values[0].Str, values[1].Int)), nil + return octosql.NewString(strings.Repeat(values[0].Str, int(values[1].Int))), nil }, }, { @@ -312,7 +312,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.String, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewString(strings.Repeat(values[1].Str, values[0].Int)), nil + return octosql.NewString(strings.Repeat(values[1].Str, int(values[0].Int))), nil }, }, }, @@ -718,7 +718,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.String, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - if len(values[0].Str) <= values[1].Int { + if int64(len(values[0].Str)) <= values[1].Int { return octosql.NewString(""), nil } return octosql.NewString(values[0].Str[values[1].Int:]), nil @@ -729,12 +729,12 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.String, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - if len(values[0].Str) <= values[1].Int { + if int64(len(values[0].Str)) <= values[1].Int { return octosql.NewString(""), nil } end := values[1].Int + values[2].Int - if end > len(values[0].Str) { - end = len(values[0].Str) + if end > int64(len(values[0].Str)) { + end = int64(len(values[0].Str)) } return octosql.NewString(values[0].Str[values[1].Int:end]), nil }, @@ -766,7 +766,7 @@ func FunctionMap() map[string]physical.FunctionDetails { if i == -1 { return octosql.NewNull(), nil } - return octosql.NewInt(i), nil + return octosql.NewInt(int64(i)), nil }, }, }, @@ -779,7 +779,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.Int, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(len(values[0].Str)), nil + return octosql.NewInt(int64(len(values[0].Str))), nil }, }, { @@ -794,7 +794,7 @@ func FunctionMap() map[string]physical.FunctionDetails { }, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(len(values[0].List)), nil + return octosql.NewInt(int64(len(values[0].List))), nil }, }, { @@ -809,7 +809,7 @@ func FunctionMap() map[string]physical.FunctionDetails { }, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(len(values[0].Struct)), nil + return octosql.NewInt(int64(len(values[0].Struct))), nil }, }, { @@ -824,7 +824,7 @@ func FunctionMap() map[string]physical.FunctionDetails { }, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(len(values[0].Tuple)), nil + return octosql.NewInt(int64(len(values[0].Tuple))), nil }, }, }, @@ -891,7 +891,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.Int, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(int(values[0].Time.Unix())), nil + return octosql.NewInt(values[0].Time.Unix()), nil }, }, }, @@ -927,7 +927,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.Int, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(int(values[0].Float)), nil + return octosql.NewInt(int64(values[0].Float)), nil }, }, { @@ -935,7 +935,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.Int, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - n, err := strconv.Atoi(values[0].Str) + n, err := strconv.ParseInt(values[0].Str, 10, 64) if err != nil { log.Printf("couldn't parse string '%s' as int: %s", values[0].Str, err) return octosql.NewNull(), nil @@ -948,7 +948,7 @@ func FunctionMap() map[string]physical.FunctionDetails { OutputType: octosql.Int, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - return octosql.NewInt(int(values[0].Duration)), nil + return octosql.NewInt(int64(values[0].Duration)), nil }, }, }, @@ -1033,7 +1033,7 @@ func FunctionMap() map[string]physical.FunctionDetails { }, Strict: true, Function: func(values []octosql.Value) (octosql.Value, error) { - if values[1].Int >= len(values[0].List) { + if values[1].Int >= int64(len(values[0].List)) { return octosql.NewNull(), nil } return values[0].List[values[1].Int], nil diff --git a/octosql/values.go b/octosql/values.go index f6b35d46..f00b30c3 100644 --- a/octosql/values.go +++ b/octosql/values.go @@ -14,7 +14,7 @@ var ZeroValue = Value{} // Value represents a single row value. The zero value of it is conveniently NULL. type Value struct { TypeID TypeID - Int int + Int int64 Float float64 Boolean bool Str string @@ -31,7 +31,7 @@ func NewNull() Value { } } -func NewInt(value int) Value { +func NewInt(value int64) Value { return Value{ TypeID: TypeIDInt, Int: value, diff --git a/outputs/batch/live_output.go b/outputs/batch/live_output.go index 29433c3e..8e4b42e3 100644 --- a/outputs/batch/live_output.go +++ b/outputs/batch/live_output.go @@ -24,7 +24,7 @@ type OutputPrinter struct { source Node keyExprs []Expression directionMultipliers []int - limit *int + limit *int64 noRetractionsPossible bool schema physical.Schema @@ -32,7 +32,7 @@ type OutputPrinter struct { live bool } -func NewOutputPrinter(source Node, keyExprs []Expression, directionMultipliers []int, limit *int, noRetractionsPossible bool, schema physical.Schema, format func(io.Writer) Format, live bool) *OutputPrinter { +func NewOutputPrinter(source Node, keyExprs []Expression, directionMultipliers []int, limit *int64, noRetractionsPossible bool, schema physical.Schema, format func(io.Writer) Format, live bool) *OutputPrinter { return &OutputPrinter{ source: source, keyExprs: keyExprs, @@ -89,7 +89,7 @@ func (o *OutputPrinter) Run(execCtx ExecutionContext) error { format := o.format(&buf) format.SetSchema(o.schema) - i := 0 + i := int64(0) recordCounts.Ascend(func(item btree.Item) bool { itemTyped := item.(*outputItem) for j := 0; j < itemTyped.Count; j++ { @@ -157,7 +157,7 @@ func (o *OutputPrinter) Run(execCtx ExecutionContext) error { if onlyZeroEventTimesSeen && !record.EventTime.IsZero() { onlyZeroEventTimesSeen = false } - if o.limit != nil && o.noRetractionsPossible && recordCounts.Len() > *o.limit { + if o.limit != nil && o.noRetractionsPossible && int64(recordCounts.Len()) > *o.limit { // This doesn't mean we'll always keep just the records that are needed, because tree nodes might have count > 1. // That said, it's a good approximation, and we'll definitely not lose something that we need to have. recordCounts.DeleteMax() @@ -183,7 +183,7 @@ func (o *OutputPrinter) Run(execCtx ExecutionContext) error { var buf bytes.Buffer format := o.format(&buf) format.SetSchema(o.schema) - i := 0 + i := int64(0) recordCounts.Ascend(func(item btree.Item) bool { itemTyped := item.(*outputItem) for j := 0; j < itemTyped.Count; j++ { diff --git a/outputs/formats/json_format.go b/outputs/formats/json_format.go index e74fc202..9f474f8c 100644 --- a/outputs/formats/json_format.go +++ b/outputs/formats/json_format.go @@ -60,7 +60,7 @@ func ValueToJson(arena *fastjson.Arena, t octosql.Type, value octosql.Value) *fa case octosql.TypeIDNull: return arena.NewNull() case octosql.TypeIDInt: - return arena.NewNumberInt(value.Int) + return arena.NewNumberInt(int(value.Int)) case octosql.TypeIDFloat: return arena.NewNumberFloat64(value.Float) case octosql.TypeIDBoolean: diff --git a/parser/parser.go b/parser/parser.go index 1f6aa9a8..2f458db4 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -629,7 +629,7 @@ func ParseExpression(expr sqlparser.Expr) (logical.Expression, error) { case sqlparser.IntVal: var i int64 i, err = strconv.ParseInt(string(expr.Val), 10, 64) - value = octosql.NewInt(int(i)) + value = octosql.NewInt(i) case sqlparser.FloatVal: var val float64 val, err = strconv.ParseFloat(string(expr.Val), 64) diff --git a/plugins/internal/plugins/plugins.go b/plugins/internal/plugins/plugins.go index d5e81935..b4e2122f 100644 --- a/plugins/internal/plugins/plugins.go +++ b/plugins/internal/plugins/plugins.go @@ -103,7 +103,7 @@ func (x *Value) ToNativeValue() octosql.Value { switch octosql.TypeID(x.TypeId) { case octosql.TypeIDNull: case octosql.TypeIDInt: - out.Int = int(x.Int) + out.Int = x.Int case octosql.TypeIDFloat: out.Float = x.Float case octosql.TypeIDBoolean: From d25a1ef44dedce0520c468355b6ce7153c752344 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Mon, 29 Apr 2024 08:50:28 +0200 Subject: [PATCH 2/2] use 32bit truncation for parquet --- datasources/parquet/reconstruct.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasources/parquet/reconstruct.go b/datasources/parquet/reconstruct.go index 645ded1e..cc83b9ae 100644 --- a/datasources/parquet/reconstruct.go +++ b/datasources/parquet/reconstruct.go @@ -293,7 +293,7 @@ func assignValue(dst *octosql.Value, src parquet.Value) error { case parquet.Boolean: *dst = octosql.NewBoolean(src.Boolean()) case parquet.Int32: - *dst = octosql.NewInt(src.Int64()) + *dst = octosql.NewInt(int64(src.Int32())) case parquet.Int64: *dst = octosql.NewInt(src.Int64()) case parquet.Int96: