Skip to content

Commit

Permalink
Update release related files (#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Sep 13, 2024
1 parent a528a5d commit 0e734c4
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 333 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ RELEASE_SCRIPTS := $(mk_dir)/scripts/release.sh
release-binary: release-source ## Package binary archive
${RELEASE_SCRIPTS} -b

release-source: clean ## Package source archive
release-source: ## Package source archive
${RELEASE_SCRIPTS} -s

release-sign: ## Sign artifacts
Expand All @@ -196,7 +196,11 @@ release-sign: ## Sign artifacts

release-assembly: release-binary release-sign ## Generate release package

PUSH_RELEASE_SCRIPTS := $(mk_dir)/scripts/push-release.sh

release-push-candidate: ## Push release candidate
${PUSH_RELEASE_SCRIPTS}

.PHONY: all $(PROJECTS) clean build default nuke
.PHONY: lint check tidy format pre-push
.PHONY: test test-race test-coverage test-ci
Expand Down
7 changes: 7 additions & 0 deletions banyand/measure/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ func (pw *partWrapper) ID() uint64 {
return pw.p.partMetadata.ID
}

func (pw *partWrapper) String() string {
if pw.mp != nil {
return fmt.Sprintf("mem part %v", pw.mp.partMetadata)
}
return fmt.Sprintf("part %v", pw.p.partMetadata)
}

func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part {
var p part
partPath := partPath(root, id)
Expand Down
152 changes: 0 additions & 152 deletions banyand/measure/tstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)

Expand Down Expand Up @@ -266,155 +263,6 @@ func Test_tstIter(t *testing.T) {
})
}
})

t.Run("file snapshot", func(t *testing.T) {
tests := []testCtx{
{
name: "Test with no data points",
dpsList: []*dataPoints{},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 1,
},
{
name: "Test with single part",
dpsList: []*dataPoints{dpsTS1},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with multiple parts with different ts, the block will be merged",
dpsList: []*dataPoints{dpsTS1, dpsTS2},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 2, uncompressedSizeBytes: 3368},
{seriesID: 2, count: 2, uncompressedSizeBytes: 126},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
{
name: "Test with multiple parts with same ts, duplicated blocks will be merged",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run("merging on the fly", func(t *testing.T) {
tmpPath, defFn := test.Space(require.New(t))
fileSystem := fs.NewLocalFileSystem()
defer defFn()

tst, err := newTSTable(fileSystem, tmpPath, common.Position{},
logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting()}, nil)
require.NoError(t, err)
for i, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration
OUTER:
for {
select {
case <-timeout:
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
default:
snp := tst.currentSnapshot()
if snp == nil {
t.Logf("waiting for snapshot %d to be introduced", i)
time.Sleep(100 * time.Millisecond)
continue
}
if snp.creator != snapshotCreatorMemPart {
snp.decRef()
break OUTER
}
t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
i, snp.creator, snp.parts)
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
}
// wait until some parts are merged
if len(tt.dpsList) > 0 {
timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration
OUTER1:
for {
select {
case <-timeout:
t.Fatalf("timeout waiting for snapshot to be merged")
default:
snp := tst.currentSnapshot()
if snp == nil {
time.Sleep(100 * time.Millisecond)
continue
}
if len(snp.parts) == 1 {
snp.decRef()
break OUTER1
}
t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
}
verify(t, tt, tst)
})

t.Run("merging on close", func(t *testing.T) {
t.Skip("the test is flaky due to unpredictable merge loop schedule.")
tmpPath, defFn := test.Space(require.New(t))
fileSystem := fs.NewLocalFileSystem()
defer defFn()

tst, err := newTSTable(fileSystem, tmpPath, common.Position{},
logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, &metrics{})
require.NoError(t, err)
for _, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
time.Sleep(100 * time.Millisecond)
}
// wait until the introducer is done
if len(tt.dpsList) > 0 {
for {
snp := tst.currentSnapshot()
if snp == nil {
time.Sleep(100 * time.Millisecond)
continue
}
if len(snp.parts) == len(tt.dpsList) {
snp.decRef()
tst.Close()
break
}
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
// reopen the table
tst, err = newTSTable(fileSystem, tmpPath, common.Position{},
logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, nil)
require.NoError(t, err)
verify(t, tt, tst)
})
})
}
})
}

var tagProjections = map[int][]model.TagProjection{
Expand Down
2 changes: 1 addition & 1 deletion banyand/metadata/schema/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (e *etcdSchemaRegistry) replaceProperty(ctx context.Context, key string, pr

func tagLen(property *propertyv1.Property) (uint32, error) {
tagsCount := len(property.Tags)
if tagsCount < 0 || tagsCount > math.MaxUint32 {
if tagsCount < 0 || uint64(tagsCount) > math.MaxUint32 {
return 0, errors.New("integer overflow: tags count exceeds uint32 range")
}
tagsNum := uint32(tagsCount)
Expand Down
7 changes: 7 additions & 0 deletions banyand/stream/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ func (pw *partWrapper) ID() uint64 {
return pw.p.partMetadata.ID
}

func (pw *partWrapper) String() string {
if pw.mp != nil {
return fmt.Sprintf("mem part %v", pw.mp.partMetadata)
}
return fmt.Sprintf("part %v", pw.p.partMetadata)
}

func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part {
var p part
partPath := partPath(root, id)
Expand Down
Loading

0 comments on commit 0e734c4

Please sign in to comment.