refactor: add root store (#1153)

This commit is contained in:
Jonas Plum
2025-09-21 11:47:29 +02:00
committed by GitHub
parent d9f759c879
commit 9da90e7cc8
15 changed files with 713 additions and 26 deletions

290
app/rootstore/rootstore.go Normal file
View File

@@ -0,0 +1,290 @@
package rootstore
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"github.com/tus/tusd/v2/pkg/handler"
)
var (
defaultFilePerm = os.FileMode(0o664)
defaultDirectoryPerm = os.FileMode(0o754)
)
const (
// StorageKeyPath is the key of the path of uploaded file in handler.FileInfo.Storage.
StorageKeyPath = "Path"
// StorageKeyInfoPath is the key of the path of .info file in handler.FileInfo.Storage.
StorageKeyInfoPath = "InfoPath"
)
// RootStore is a file system based data store for tusd.
type RootStore struct {
root *os.Root
}
func New(root *os.Root) RootStore {
return RootStore{root: root}
}
// UseIn sets this store as the core data store in the passed composer and adds
// all possible extension to it.
func (store RootStore) UseIn(composer *handler.StoreComposer) {
composer.UseCore(store)
composer.UseTerminater(store)
composer.UseConcater(store)
composer.UseLengthDeferrer(store)
composer.UseContentServer(store)
}
func (store RootStore) NewUpload(_ context.Context, info handler.FileInfo) (handler.Upload, error) {
if info.ID == "" {
info.ID = rand.Text()
}
// The .info file's location can directly be deduced from the upload ID
infoPath := store.infoPath(info.ID)
// The binary file's location might be modified by the pre-create hook.
var binPath string
if info.Storage != nil && info.Storage[StorageKeyPath] != "" {
binPath = info.Storage[StorageKeyPath]
} else {
binPath = store.defaultBinPath(info.ID)
}
info.Storage = map[string]string{
"Type": "rootstore",
StorageKeyPath: binPath,
StorageKeyInfoPath: infoPath,
}
_ = store.root.MkdirAll(filepath.Dir(binPath), defaultDirectoryPerm)
// Create binary file with no content
if err := store.root.WriteFile(binPath, nil, defaultFilePerm); err != nil {
return nil, err
}
upload := &fileUpload{
root: store.root,
info: info,
infoPath: infoPath,
binPath: binPath,
}
// writeInfo creates the file by itself if necessary
if err := upload.writeInfo(); err != nil {
return nil, err
}
return upload, nil
}
func (store RootStore) GetUpload(_ context.Context, id string) (handler.Upload, error) {
infoPath := store.infoPath(id)
data, err := fs.ReadFile(store.root.FS(), filepath.ToSlash(infoPath))
if err != nil {
if os.IsNotExist(err) {
// Interpret os.ErrNotExist as 404 Not Found
err = handler.ErrNotFound
}
return nil, err
}
var info handler.FileInfo
if err := json.Unmarshal(data, &info); err != nil {
return nil, err
}
// If the info file contains a custom path to the binary file, we use that. If not, we
// fall back to the default value (although the Path property should always be set in recent
// tusd versions).
var binPath string
if info.Storage != nil && info.Storage[StorageKeyPath] != "" {
// No filepath.Join here because the joining already happened in NewUpload. Duplicate joining
// with relative paths lead to incorrect paths
binPath = info.Storage[StorageKeyPath]
} else {
binPath = store.defaultBinPath(info.ID)
}
stat, err := store.root.Stat(binPath)
if err != nil {
if os.IsNotExist(err) {
// Interpret os.ErrNotExist as 404 Not Found
err = handler.ErrNotFound
}
return nil, err
}
info.Offset = stat.Size()
return &fileUpload{
root: store.root,
info: info,
binPath: binPath,
infoPath: infoPath,
}, nil
}
func (store RootStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
return upload.(*fileUpload) //nolint:forcetypeassert
}
func (store RootStore) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload {
return upload.(*fileUpload) //nolint:forcetypeassert
}
func (store RootStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
return upload.(*fileUpload) //nolint:forcetypeassert
}
func (store RootStore) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*fileUpload) //nolint:forcetypeassert
}
// defaultBinPath returns the path to the file storing the binary data, if it is
// not customized using the pre-create hook.
func (store RootStore) defaultBinPath(id string) string {
return id
}
// infoPath returns the path to the .info file storing the file's info.
func (store RootStore) infoPath(id string) string {
return id + ".info"
}
type fileUpload struct {
root *os.Root
// info stores the current information about the upload
info handler.FileInfo
// infoPath is the path to the .info file
infoPath string
// binPath is the path to the binary file (which has no extension)
binPath string
}
func (upload *fileUpload) GetInfo(_ context.Context) (handler.FileInfo, error) {
return upload.info, nil
}
func (upload *fileUpload) WriteChunk(_ context.Context, _ int64, src io.Reader) (int64, error) {
file, err := upload.root.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
if err != nil {
return 0, err
}
// Avoid the use of defer file.Close() here to ensure no errors are lost
// See https://github.com/tus/tusd/issues/698.
n, err := io.Copy(file, src)
upload.info.Offset += n
if err != nil {
file.Close()
return n, err
}
return n, file.Close()
}
func (upload *fileUpload) GetReader(_ context.Context) (io.ReadCloser, error) {
return upload.root.Open(upload.binPath)
}
func (upload *fileUpload) Terminate(_ context.Context) error {
// We ignore errors indicating that the files cannot be found because we want
// to delete them anyways. The files might be removed by a cron job for cleaning up
// or some file might have been removed when tusd crashed during the termination.
err := upload.root.Remove(upload.binPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
err = upload.root.Remove(upload.infoPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
return nil
}
func (upload *fileUpload) ConcatUploads(_ context.Context, uploads []handler.Upload) (err error) {
file, err := upload.root.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
if err != nil {
return err
}
defer func() {
// Ensure that close error is propagated, if it occurs.
// See https://github.com/tus/tusd/issues/698.
cerr := file.Close()
if err == nil {
err = cerr
}
}()
for _, partialUpload := range uploads {
if err := partialUpload.(*fileUpload).appendTo(file); err != nil { //nolint:forcetypeassert
return err
}
}
return
}
func (upload *fileUpload) appendTo(file *os.File) error {
src, err := upload.root.Open(upload.binPath)
if err != nil {
return err
}
if _, err := io.Copy(file, src); err != nil {
src.Close()
return err
}
return src.Close()
}
func (upload *fileUpload) DeclareLength(_ context.Context, length int64) error {
upload.info.Size = length
upload.info.SizeIsDeferred = false
return upload.writeInfo()
}
// writeInfo updates the entire information. Everything will be overwritten.
func (upload *fileUpload) writeInfo() error {
data, err := json.Marshal(upload.info)
if err != nil {
return err
}
_ = upload.root.MkdirAll(filepath.Dir(upload.infoPath), defaultDirectoryPerm)
return upload.root.WriteFile(upload.infoPath, data, defaultFilePerm)
}
func (upload *fileUpload) FinishUpload(_ context.Context) error {
return nil
}
func (upload *fileUpload) ServeContent(_ context.Context, w http.ResponseWriter, r *http.Request) error {
http.ServeFileFS(w, r, upload.root.FS(), filepath.ToSlash(upload.binPath))
return nil
}

View File

@@ -0,0 +1,391 @@
package rootstore
import (
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tus/tusd/v2/pkg/handler"
)
// Test interface implementation of FSStore.
var (
_ handler.DataStore = RootStore{}
_ handler.TerminaterDataStore = RootStore{}
_ handler.ConcaterDataStore = RootStore{}
_ handler.LengthDeferrerDataStore = RootStore{}
)
func TestFSStore(t *testing.T) {
t.Parallel()
root, err := os.OpenRoot(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
ctx := t.Context()
// Create new upload
upload, err := store.NewUpload(ctx, handler.FileInfo{
Size: 42,
MetaData: map[string]string{
"hello": "world",
},
})
require.NoError(t, err)
assert.NotNil(t, upload)
// Check info without writing
info, err := upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 42, info.Size)
assert.EqualValues(t, 0, info.Offset)
assert.Equal(t, handler.MetaData{"hello": "world"}, info.MetaData)
assert.Len(t, info.Storage, 3)
assert.Equal(t, "rootstore", info.Storage["Type"])
assert.Equal(t, info.ID, info.Storage["Path"])
assert.Equal(t, info.ID+".info", info.Storage["InfoPath"])
// Write data to upload
bytesWritten, err := upload.WriteChunk(ctx, 0, strings.NewReader("hello world"))
require.NoError(t, err)
assert.EqualValues(t, len("hello world"), bytesWritten)
// Check new offset
info, err = upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 42, info.Size)
assert.EqualValues(t, 11, info.Offset)
// Read content
reader, err := upload.GetReader(ctx)
require.NoError(t, err)
content, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "hello world", string(content))
reader.Close()
// Serve content
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/", nil)
r.Header.Set("Range", "bytes=0-4")
err = store.AsServableUpload(upload).ServeContent(t.Context(), w, r)
require.NoError(t, err)
assert.Equal(t, http.StatusPartialContent, w.Code)
assert.Equal(t, "5", w.Header().Get("Content-Length"))
assert.Equal(t, "text/plain; charset=utf-8", w.Header().Get("Content-Type"))
assert.Equal(t, "bytes 0-4/11", w.Header().Get("Content-Range"))
assert.NotEmpty(t, w.Header().Get("Last-Modified"))
assert.Equal(t, "hello", w.Body.String())
// Terminate upload
require.NoError(t, store.AsTerminatableUpload(upload).Terminate(ctx))
// Test if upload is deleted
upload, err = store.GetUpload(ctx, info.ID)
assert.Nil(t, upload)
assert.Equal(t, handler.ErrNotFound, err)
}
// TestCreateDirectories tests whether an upload with a slash in its ID causes
// the correct directories to be created.
func TestFSStoreCreateDirectories(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
root, err := os.OpenRoot(tmp)
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
ctx := t.Context()
// Create new upload
upload, err := store.NewUpload(ctx, handler.FileInfo{
ID: "hello/world/123",
Size: 42,
MetaData: map[string]string{
"hello": "world",
},
})
require.NoError(t, err)
assert.NotNil(t, upload)
// Check info without writing
info, err := upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 42, info.Size)
assert.EqualValues(t, 0, info.Offset)
assert.Equal(t, handler.MetaData{"hello": "world"}, info.MetaData)
assert.Len(t, info.Storage, 3)
assert.Equal(t, "rootstore", info.Storage["Type"])
assert.Equal(t, filepath.FromSlash(info.ID), info.Storage["Path"])
assert.Equal(t, filepath.FromSlash(info.ID+".info"), info.Storage["InfoPath"])
// Write data to upload
bytesWritten, err := upload.WriteChunk(ctx, 0, strings.NewReader("hello world"))
require.NoError(t, err)
assert.EqualValues(t, len("hello world"), bytesWritten)
// Check new offset
info, err = upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 42, info.Size)
assert.EqualValues(t, 11, info.Offset)
// Read content
reader, err := upload.GetReader(ctx)
require.NoError(t, err)
content, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "hello world", string(content))
reader.Close()
// Check that the file and directory exists on disk
statInfo, err := os.Stat(filepath.Join(tmp, "hello/world/123"))
require.NoError(t, err)
assert.True(t, statInfo.Mode().IsRegular())
assert.EqualValues(t, 11, statInfo.Size())
statInfo, err = os.Stat(filepath.Join(tmp, "hello/world/"))
require.NoError(t, err)
assert.True(t, statInfo.Mode().IsDir())
// Terminate upload
require.NoError(t, store.AsTerminatableUpload(upload).Terminate(ctx))
// Test if upload is deleted
upload, err = store.GetUpload(ctx, info.ID)
assert.Nil(t, upload)
assert.Equal(t, handler.ErrNotFound, err)
}
func TestFSStoreNotFound(t *testing.T) {
t.Parallel()
root, err := os.OpenRoot(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
ctx := t.Context()
upload, err := store.GetUpload(ctx, "upload-that-does-not-exist")
require.Error(t, err)
assert.Equal(t, handler.ErrNotFound, err)
assert.Nil(t, upload)
}
func TestFSStoreConcatUploads(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
root, err := os.OpenRoot(tmp)
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
ctx := t.Context()
// Create new upload to hold concatenated upload
finUpload, err := store.NewUpload(ctx, handler.FileInfo{Size: 9})
require.NoError(t, err)
assert.NotNil(t, finUpload)
finInfo, err := finUpload.GetInfo(ctx)
require.NoError(t, err)
finID := finInfo.ID
// Create three uploads for concatenating
partialUploads := make([]handler.Upload, 3)
contents := []string{
"abc",
"def",
"ghi",
}
for i := range 3 {
upload, err := store.NewUpload(ctx, handler.FileInfo{Size: 3})
require.NoError(t, err)
n, err := upload.WriteChunk(ctx, 0, strings.NewReader(contents[i]))
require.NoError(t, err)
assert.EqualValues(t, 3, n)
partialUploads[i] = upload
}
err = store.AsConcatableUpload(finUpload).ConcatUploads(ctx, partialUploads)
require.NoError(t, err)
// Check offset
finUpload, err = store.GetUpload(ctx, finID)
require.NoError(t, err)
info, err := finUpload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 9, info.Size)
assert.EqualValues(t, 9, info.Offset)
// Read content
reader, err := finUpload.GetReader(ctx)
require.NoError(t, err)
content, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "abcdefghi", string(content))
reader.Close()
}
func TestFSStoreDeclareLength(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
root, err := os.OpenRoot(tmp)
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
ctx := t.Context()
upload, err := store.NewUpload(ctx, handler.FileInfo{
Size: 0,
SizeIsDeferred: true,
})
require.NoError(t, err)
assert.NotNil(t, upload)
info, err := upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 0, info.Size)
assert.True(t, info.SizeIsDeferred)
err = store.AsLengthDeclarableUpload(upload).DeclareLength(ctx, 100)
require.NoError(t, err)
updatedInfo, err := upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 100, updatedInfo.Size)
assert.False(t, updatedInfo.SizeIsDeferred)
}
// TestCustomRelativePath tests whether the upload's destination can be customized
// relative to the storage directory.
func TestFSStoreCustomRelativePath(t *testing.T) {
t.Parallel()
tmp := t.TempDir()
root, err := os.OpenRoot(tmp)
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
ctx := t.Context()
// Create new upload
upload, err := store.NewUpload(ctx, handler.FileInfo{
ID: "folder1/info",
Size: 42,
Storage: map[string]string{
"Path": "./folder2/bin",
},
})
require.NoError(t, err)
assert.NotNil(t, upload)
// Check info without writing
info, err := upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 42, info.Size)
assert.EqualValues(t, 0, info.Offset)
assert.Len(t, info.Storage, 3)
assert.Equal(t, "rootstore", info.Storage["Type"])
assert.Equal(t, filepath.FromSlash("./folder2/bin"), info.Storage["Path"])
assert.Equal(t, filepath.FromSlash("folder1/info.info"), info.Storage["InfoPath"])
// Write data to upload
bytesWritten, err := upload.WriteChunk(ctx, 0, strings.NewReader("hello world"))
require.NoError(t, err)
assert.EqualValues(t, len("hello world"), bytesWritten)
// Check new offset
info, err = upload.GetInfo(ctx)
require.NoError(t, err)
assert.EqualValues(t, 42, info.Size)
assert.EqualValues(t, 11, info.Offset)
// Read content
reader, err := upload.GetReader(ctx)
require.NoError(t, err)
content, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "hello world", string(content))
reader.Close()
// Check that the output file and info file exist on disk
statInfo, err := os.Stat(filepath.Join(tmp, "folder2/bin"))
require.NoError(t, err)
assert.True(t, statInfo.Mode().IsRegular())
assert.EqualValues(t, 11, statInfo.Size())
statInfo, err = os.Stat(filepath.Join(tmp, "folder1/info.info"))
require.NoError(t, err)
assert.True(t, statInfo.Mode().IsRegular())
// Terminate upload
require.NoError(t, store.AsTerminatableUpload(upload).Terminate(ctx))
// Test if upload is deleted
upload, err = store.GetUpload(ctx, info.ID)
assert.Nil(t, upload)
assert.Equal(t, handler.ErrNotFound, err)
}
// TestCustomAbsolutePath tests whether the upload's destination can be customized
// using an absolute path to the storage directory.
func TestFSStoreCustomAbsolutePath(t *testing.T) {
t.Parallel()
root, err := os.OpenRoot(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() { root.Close() })
store := New(root)
// Create new upload, but the Path property points to a directory
// outside of the directory given to FSStore
binPath := filepath.Join(t.TempDir(), "dir/my-upload.bin")
_, err = store.NewUpload(t.Context(), handler.FileInfo{
ID: "my-upload",
Size: 42,
Storage: map[string]string{
"Path": binPath,
},
})
require.Error(t, err)
_, err = os.Stat(binPath)
require.Error(t, err)
}