Skip to content

Commit 6191cc3

Browse files
committed
[unitdb] update block reader/writer
1 parent e09dc9f commit 6191cc3

18 files changed

+229
-395
lines changed

block.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
)
2323

2424
const (
25-
slotSize = 16
26-
blockSize uint32 = 4096
25+
slotSize = 16
26+
blockSize int32 = 4096
2727
)
2828

2929
type (
@@ -55,7 +55,7 @@ func blockOffset(idx int32) int64 {
5555
if idx == -1 {
5656
return int64(0)
5757
}
58-
return int64(blockSize * uint32(idx))
58+
return int64(blockSize * idx)
5959
}
6060

6161
func (e _IndexEntry) mSize() uint32 {

block_reader.go

+23-19
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,31 @@ package unitdb
1919
type _BlockReader struct {
2020
indexBlock _IndexBlock
2121

22-
fs *_FileSet
22+
fs *_FileSet
23+
indexFile, dataFile *_File
2324
}
2425

2526
func newBlockReader(fs *_FileSet) *_BlockReader {
26-
return &_BlockReader{fs: fs}
27-
}
27+
r := &_BlockReader{fs: fs}
2828

29-
func (r *_BlockReader) readIndexBlock(seq uint64) (_IndexBlock, error) {
30-
off := blockOffset(blockIndex(seq))
31-
indexFile, err := r.fs.getFile(_FileDesc{fileType: typeIndex})
29+
indexFile, err := fs.getFile(_FileDesc{fileType: typeIndex})
3230
if err != nil {
33-
return _IndexBlock{}, err
31+
return nil
3432
}
35-
buf, err := indexFile.slice(off, off+int64(blockSize))
33+
r.indexFile = indexFile
34+
35+
dataFile, err := fs.getFile(_FileDesc{fileType: typeData})
36+
if err != nil {
37+
return nil
38+
}
39+
r.dataFile = dataFile
40+
41+
return r
42+
}
43+
44+
func (r *_BlockReader) readIndexBlock(blockIdx int32) (_IndexBlock, error) {
45+
off := blockOffset(blockIdx)
46+
buf, err := r.indexFile.slice(off, off+int64(blockSize))
3647
if err != nil {
3748
return _IndexBlock{}, err
3849
}
@@ -42,7 +53,8 @@ func (r *_BlockReader) readIndexBlock(seq uint64) (_IndexBlock, error) {
4253
}
4354

4455
func (r *_BlockReader) readIndexEntry(seq uint64) (_IndexEntry, error) {
45-
if _, err := r.readIndexBlock(seq); err != nil {
56+
bIdx := blockIndex(seq)
57+
if _, err := r.readIndexBlock(bIdx); err != nil {
4658
return _IndexEntry{}, err
4759
}
4860

@@ -65,11 +77,7 @@ func (r *_BlockReader) readMessage(e _IndexEntry) ([]byte, []byte, error) {
6577
if e.cache != nil {
6678
return e.cache[:idSize], e.cache[e.topicSize+idSize:], nil
6779
}
68-
dataFile, err := r.fs.getFile(_FileDesc{fileType: typeData})
69-
if err != nil {
70-
return nil, nil, err
71-
}
72-
message, err := dataFile.slice(e.msgOffset, e.msgOffset+int64(e.mSize()))
80+
message, err := r.dataFile.slice(e.msgOffset, e.msgOffset+int64(e.mSize()))
7381
if err != nil {
7482
return nil, nil, err
7583
}
@@ -80,9 +88,5 @@ func (r *_BlockReader) readTopic(e _IndexEntry) ([]byte, error) {
8088
if e.cache != nil {
8189
return e.cache[idSize : e.topicSize+idSize], nil
8290
}
83-
dataFile, err := r.fs.getFile(_FileDesc{fileType: typeData})
84-
if err != nil {
85-
return nil, err
86-
}
87-
return dataFile.slice(e.msgOffset+int64(idSize), e.msgOffset+int64(e.topicSize)+int64(idSize))
91+
return r.dataFile.slice(e.msgOffset+int64(idSize), e.msgOffset+int64(e.topicSize)+int64(idSize))
8892
}

block_writer.go

+65-33
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,69 @@ import (
2424
)
2525

2626
type _BlockWriter struct {
27+
blockIdx int32
2728
indexBlocks map[int32]_IndexBlock // map[blockIdx]block
2829

2930
fs *_FileSet
3031
lease *_Lease
3132
buffer *bpool.Buffer
3233

33-
indexLeases map[uint64]struct{} //map[seq]struct
34-
dataLeases map[int64]uint32 // map[offset]size
35-
dataOffset int64
34+
indexLeases map[uint64]struct{} //map[seq]struct
35+
dataLeases map[int64]uint32 // map[offset]size
36+
indexFile, dataFile *_File
37+
offset, indexOffset, dataOffset int64
3638
}
3739

3840
func newBlockWriter(fs *_FileSet, lease *_Lease, buf *bpool.Buffer) (*_BlockWriter, error) {
39-
w := &_BlockWriter{indexBlocks: make(map[int32]_IndexBlock), fs: fs, lease: lease, buffer: buf}
41+
w := &_BlockWriter{blockIdx: -1, indexBlocks: make(map[int32]_IndexBlock), fs: fs, lease: lease, buffer: buf}
4042
w.indexLeases = make(map[uint64]struct{})
4143
w.dataLeases = make(map[int64]uint32)
4244

45+
indexFile, err := fs.getFile(_FileDesc{fileType: typeIndex})
46+
if err != nil {
47+
return nil, err
48+
}
49+
w.indexFile = indexFile
50+
w.indexOffset = indexFile.currSize()
51+
if w.indexOffset > 0 {
52+
w.blockIdx = int32(w.indexOffset / int64(blockSize))
53+
// read final block from index file.
54+
if w.indexOffset > int64(w.blockIdx*blockSize) {
55+
r := newBlockReader(w.fs)
56+
b, err := r.readIndexBlock(w.blockIdx)
57+
if err != nil {
58+
return nil, err
59+
}
60+
w.indexBlocks[w.blockIdx] = b
61+
}
62+
}
63+
4364
dataFile, err := fs.getFile(_FileDesc{fileType: typeData})
4465
if err != nil {
4566
return nil, err
4667
}
47-
w.dataOffset = dataFile.Size()
68+
w.dataFile = dataFile
69+
w.offset = dataFile.currSize()
70+
w.dataOffset = dataFile.currSize()
4871
return w, nil
4972
}
5073

51-
func (w *_BlockWriter) extend(size uint32) (int64, error) {
52-
indexFile, err := w.fs.getFile(_FileDesc{fileType: typeIndex})
53-
if err != nil {
54-
return 0, err
74+
func (w *_BlockWriter) extend(upperSeq uint64) (int64, error) {
75+
off := blockOffset(blockIndex(upperSeq))
76+
if off <= w.indexFile.currSize() {
77+
return w.indexFile.currSize(), nil
5578
}
56-
return indexFile.extend(size)
79+
return w.indexFile.extend(uint32(off - w.indexFile.currSize()))
5780
}
5881

5982
func (w *_BlockWriter) del(seq uint64) (_IndexEntry, error) {
6083
var delEntry _IndexEntry
6184
bIdx := blockIndex(seq)
85+
if bIdx > w.blockIdx {
86+
return delEntry, nil // no entry in db to delete
87+
}
6288
r := newBlockReader(w.fs)
63-
b, err := r.readIndexBlock(seq)
89+
b, err := r.readIndexBlock(bIdx)
6490
if err != nil {
6591
return _IndexEntry{}, err
6692
}
@@ -89,7 +115,7 @@ func (w *_BlockWriter) del(seq uint64) (_IndexEntry, error) {
89115
return delEntry, nil
90116
}
91117

92-
func (w *_BlockWriter) append(e _IndexEntry, blockIdx int32) (err error) {
118+
func (w *_BlockWriter) append(e _IndexEntry) (err error) {
93119
var b _IndexBlock
94120
var ok bool
95121
if e.seq == 0 {
@@ -98,9 +124,9 @@ func (w *_BlockWriter) append(e _IndexEntry, blockIdx int32) (err error) {
98124
bIdx := blockIndex(e.seq)
99125
b, ok = w.indexBlocks[bIdx]
100126
if !ok {
101-
if bIdx <= blockIdx {
127+
if bIdx < w.blockIdx {
102128
r := newBlockReader(w.fs)
103-
b, err = r.readIndexBlock(e.seq)
129+
b, err = r.readIndexBlock(bIdx)
104130
if err != nil {
105131
return err
106132
}
@@ -128,24 +154,20 @@ func (w *_BlockWriter) append(e _IndexEntry, blockIdx int32) (err error) {
128154
if off != -1 {
129155
buf := make([]byte, dataLen)
130156
copy(buf, e.cache)
131-
dataFile, err := w.fs.getFile(_FileDesc{fileType: typeData})
132-
if err != nil {
133-
return err
134-
}
135-
if _, err = dataFile.WriteAt(buf, off); err != nil {
157+
if _, err = w.dataFile.WriteAt(buf, off); err != nil {
136158
return err
137159
}
138160
w.dataLeases[off] = uint32(dataLen)
139161
} else {
140-
off = w.dataOffset
162+
off = w.offset
141163
offset, err := w.buffer.Extend(int64(dataLen))
142164
if err != nil {
143165
return err
144166
}
145167
if _, err := w.buffer.WriteAt(e.cache, offset); err != nil {
146168
return err
147169
}
148-
w.dataOffset += int64(dataLen)
170+
w.offset += int64(dataLen)
149171
}
150172
e.msgOffset = off
151173

@@ -166,20 +188,12 @@ func (w *_BlockWriter) append(e _IndexEntry, blockIdx int32) (err error) {
166188

167189
func (w *_BlockWriter) write() error {
168190
// write data blocks
169-
dataFile, err := w.fs.getFile(_FileDesc{fileType: typeData})
170-
if err != nil {
171-
return err
172-
}
173-
if _, err := dataFile.write(w.buffer.Bytes()); err != nil {
191+
if _, err := w.dataFile.write(w.buffer.Bytes()); err != nil {
174192
return err
175193
}
176194

177195
// Reset buffer before reusing it.
178196
w.buffer.Reset()
179-
indexFile, err := w.fs.getFile(_FileDesc{fileType: typeIndex})
180-
if err != nil {
181-
return err
182-
}
183197
for bIdx, b := range w.indexBlocks {
184198
if !b.leased || !b.dirty {
185199
continue
@@ -189,7 +203,7 @@ func (w *_BlockWriter) write() error {
189203
}
190204
off := blockOffset(bIdx)
191205
buf := b.MarshalBinary()
192-
if _, err := indexFile.WriteAt(buf, off); err != nil {
206+
if _, err := w.indexFile.WriteAt(buf, off); err != nil {
193207
return err
194208
}
195209
b.dirty = false
@@ -219,7 +233,7 @@ func (w *_BlockWriter) write() error {
219233
return err
220234
}
221235
buf := b.MarshalBinary()
222-
if _, err := indexFile.WriteAt(buf, off); err != nil {
236+
if _, err := w.indexFile.WriteAt(buf, off); err != nil {
223237
return err
224238
}
225239
b.dirty = false
@@ -240,7 +254,7 @@ func (w *_BlockWriter) write() error {
240254
if err != nil {
241255
return err
242256
}
243-
if _, err := indexFile.WriteAt(blockData, blockOff); err != nil {
257+
if _, err := w.indexFile.WriteAt(blockData, blockOff); err != nil {
244258
return err
245259
}
246260
bufOff = w.buffer.Size()
@@ -294,3 +308,21 @@ func (w *_BlockWriter) rollback() error {
294308
}
295309
return nil
296310
}
311+
312+
func (w *_BlockWriter) reset() error {
313+
w.buffer.Reset()
314+
315+
w.indexOffset = w.indexFile.currSize()
316+
w.blockIdx = int32(w.indexOffset / int64(blockSize))
317+
318+
w.dataOffset = w.dataFile.currSize()
319+
320+
return nil
321+
}
322+
323+
func (w *_BlockWriter) abort() error {
324+
w.indexFile.truncate(w.indexOffset)
325+
w.dataFile.truncate(w.dataOffset)
326+
327+
return w.rollback()
328+
}

db.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,12 @@ func Open(path string, opts ...Options) (*DB, error) {
9393
}
9494

9595
dbInfo := _DBInfo{}
96-
if infoFile.Size() == 0 {
96+
if infoFile.currSize() == 0 {
9797
dbInfo = _DBInfo{
9898
header: _Header{
9999
signature: signature,
100100
version: version,
101101
},
102-
blockIdx: -1,
103-
windowIdx: -1,
104102
}
105103
if _, err = infoFile.extend(fixed); err != nil {
106104
return nil, err
@@ -134,7 +132,7 @@ func Open(path string, opts ...Options) (*DB, error) {
134132
mutex: newMutex(),
135133
dbInfo: dbInfo,
136134
info: infoFile,
137-
timeWindow: newTimeWindowBucket(dbInfo.windowIdx, timeOptions),
135+
timeWindow: newTimeWindowBucket(timeOptions),
138136
filter: Filter{file: filterFile, filterBlock: fltr.NewFilterGenerator()},
139137
freeList: lease,
140138
reader: newBlockReader(fileset),

db_info.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
var (
2424
signature = [7]byte{'u', 'n', 'i', 't', 'd', 'b', '\x0e'}
25-
fixed = uint32(64)
25+
fixed = uint32(32)
2626
)
2727

2828
type (
@@ -35,8 +35,6 @@ type (
3535
encryption int8
3636
sequence uint64
3737
count uint64
38-
blockIdx int32
39-
windowIdx int32
4038
}
4139
)
4240

@@ -48,8 +46,6 @@ func (inf _DBInfo) MarshalBinary() ([]byte, error) {
4846
buf[12] = uint8(inf.encryption)
4947
binary.LittleEndian.PutUint64(buf[12:20], inf.sequence)
5048
binary.LittleEndian.PutUint64(buf[20:28], inf.count)
51-
binary.LittleEndian.PutUint32(buf[28:32], uint32(inf.windowIdx))
52-
binary.LittleEndian.PutUint32(buf[32:36], uint32(inf.blockIdx))
5349

5450
return buf, nil
5551
}
@@ -61,8 +57,6 @@ func (inf *_DBInfo) UnmarshalBinary(data []byte) error {
6157
inf.encryption = int8(data[7])
6258
inf.sequence = binary.LittleEndian.Uint64(data[12:20])
6359
inf.count = binary.LittleEndian.Uint64(data[20:28])
64-
inf.windowIdx = int32(binary.LittleEndian.Uint32(data[28:32]))
65-
inf.blockIdx = int32(binary.LittleEndian.Uint32(data[32:36]))
6660

6761
return nil
6862
}

0 commit comments

Comments
 (0)