mirror of
https://github.com/stashapp/stash.git
synced 2025-12-06 08:26:00 +01:00
Fix hang in concurrency tests
This commit is contained in:
parent
f76a440e54
commit
8b59a3b014
1 changed files with 49 additions and 8 deletions
|
|
@ -5,8 +5,10 @@ package sqlite_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stashapp/stash/pkg/models"
|
"github.com/stashapp/stash/pkg/models"
|
||||||
"github.com/stashapp/stash/pkg/txn"
|
"github.com/stashapp/stash/pkg/txn"
|
||||||
|
|
@ -57,6 +59,24 @@ import (
|
||||||
// wg.Wait()
|
// wg.Wait()
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
func signalOtherThread(c chan struct{}) error {
|
||||||
|
select {
|
||||||
|
case c <- struct{}{}:
|
||||||
|
return nil
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
return errors.New("timed out signalling other thread")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForOtherThread(c chan struct{}) error {
|
||||||
|
select {
|
||||||
|
case <-c:
|
||||||
|
return nil
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
return errors.New("timed out waiting for other thread")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestConcurrentReadTxn(t *testing.T) {
|
func TestConcurrentReadTxn(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
@ -76,8 +96,12 @@ func TestConcurrentReadTxn(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for other thread to start
|
// wait for other thread to start
|
||||||
c <- struct{}{}
|
if err := signalOtherThread(c); err != nil {
|
||||||
<-c
|
return err
|
||||||
|
}
|
||||||
|
if err := waitForOtherThread(c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -94,9 +118,15 @@ func TestConcurrentReadTxn(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
||||||
// wait for first thread
|
// wait for first thread
|
||||||
<-c
|
if err := waitForOtherThread(c); err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
c <- struct{}{}
|
if err := signalOtherThread(c); err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
scene := &models.Scene{
|
scene := &models.Scene{
|
||||||
|
|
@ -105,6 +135,7 @@ func TestConcurrentReadTxn(t *testing.T) {
|
||||||
|
|
||||||
// expect error when we try to do this, as the other thread has already
|
// expect error when we try to do this, as the other thread has already
|
||||||
// modified this table
|
// modified this table
|
||||||
|
// this takes time to fail, so we need to wait for it
|
||||||
if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
||||||
if !db.IsLocked(err) {
|
if !db.IsLocked(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
|
@ -140,8 +171,12 @@ func TestConcurrentExclusiveAndReadTxn(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for other thread to start
|
// wait for other thread to start
|
||||||
c <- struct{}{}
|
if err := signalOtherThread(c); err != nil {
|
||||||
<-c
|
return err
|
||||||
|
}
|
||||||
|
if err := waitForOtherThread(c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -158,9 +193,15 @@ func TestConcurrentExclusiveAndReadTxn(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
||||||
// wait for first thread
|
// wait for first thread
|
||||||
<-c
|
if err := waitForOtherThread(c); err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
c <- struct{}{}
|
if err := signalOtherThread(c); err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if _, err := db.Scene.Find(ctx, sceneIDs[sceneIdx1WithPerformer]); err != nil {
|
if _, err := db.Scene.Find(ctx, sceneIDs[sceneIdx1WithPerformer]); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue