Skip to content

refactor(core): improve multihasher seek error handling and close behavior #882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 114 additions & 2 deletions core/internal/core_tests/multihasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (f *failingSeeker) Read(p []byte) (n int, err error) {
}

func (f *failingSeeker) Seek(offset int64, whence int) (int64, error) {
return 0, errors.New("seek error")
return 0, io.ErrUnexpectedEOF
}

func TestNewMultiHasher_CreateMode(t *testing.T) {
Expand Down Expand Up @@ -617,6 +617,113 @@ func TestMultiHasher_Close(t *testing.T) {
mockHash1.AssertExpectations(t)
}

func TestMultiHasher_Close_WithClosableSource(t *testing.T) {
core.ResetState()
defer core.ResetState()

// Register mock hash algorithm
mockAlgo := core.HashAlgorithm{Type: 0x12, Name: "SHA-256", Priority: 100, Protocol: "test"}
registry := core.GetHashRegistry()
err := registry.RegisterHashAlgorithm(mockAlgo)
require.NoError(t, err)

// Create a buffer wrapped in a NopCloser to implement io.ReadCloser
testData := []byte("test data")
source := bytes.NewBuffer(testData)
closer := io.NopCloser(source)

// Create hasher with closable source
hasher, err := core.NewMultiHasherFromReader(closer)
require.NoError(t, err)

err = hasher.Close()
assert.NoError(t, err)

// Verify buffer is still intact
assert.Equal(t, testData, source.Bytes())
}

func TestMultiHasher_Seek(t *testing.T) {
core.ResetState()
defer core.ResetState()

// Register mock hash algorithm
mockAlgo := core.HashAlgorithm{Type: 0x12, Name: "SHA-256", Priority: 100, Protocol: "test"}
registry := core.GetHashRegistry()
err := registry.RegisterHashAlgorithm(mockAlgo)
require.NoError(t, err)

// Create mock seekable source
testData := []byte("test data for seeking")
source := bytes.NewReader(testData)

// Create hasher with seekable source
hasher, err := core.NewMultiHasherFromReader(source)
assert.NoError(t, err)

// Test seeking to start
pos, err := hasher.Seek(0, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(0), pos)

// Test seeking to end
pos, err = hasher.Seek(0, io.SeekEnd)
assert.NoError(t, err)
assert.Equal(t, int64(len(testData)), pos)

// Test seeking to middle
pos, err = hasher.Seek(5, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(5), pos)

// Verify hashers were reset after seek
for _, h := range hasher.GetHashes() {
if h != nil {
mockHash, ok := h.(*mocks.MockTestingHashWithProof)
if ok {
mockHash.AssertCalled(t, "Reset")
}
}
}
}

func TestMultiHasher_Seek_ErrorCases(t *testing.T) {
core.ResetState()
defer core.ResetState()

t.Run("NonSeekableSource", func(t *testing.T) {
// Register mock hash algorithm
mockAlgo := core.HashAlgorithm{Type: 0x12, Name: "SHA-256", Priority: 100, Protocol: "test"}
registry := core.GetHashRegistry()
err := registry.RegisterHashAlgorithm(mockAlgo)
require.NoError(t, err)

// Create hasher with non-seekable source
source := bytes.NewBufferString("test data")
hasher, err := core.NewMultiHasherFromReader(source)
require.NoError(t, err)

_, err = hasher.Seek(0, io.SeekStart)
assert.Error(t, err)
assert.Equal(t, errors.New("source does not support seeking"), err)
})

t.Run("SeekError", func(t *testing.T) {
// Register mock hash algorithm
mockAlgo := core.HashAlgorithm{Type: 0x12, Name: "SHA-256", Priority: 100, Protocol: "test"}
registry := core.GetHashRegistry()
err := registry.RegisterHashAlgorithm(mockAlgo)
require.NoError(t, err)

// Create buffer with failing seek
failingSource := &failingSeeker{buf: bytes.NewBufferString("test data")}

_, err = core.NewMultiHasherFromReader(failingSource)
assert.Error(t, err)
assert.Equal(t, err, io.ErrUnexpectedEOF)
})
}

func TestMultiHasher_EmptyInput(t *testing.T) {
core.ResetState()
defer core.ResetState()
Expand Down Expand Up @@ -773,7 +880,12 @@ func TestNewMultiHasherFromReader_BasicReader(t *testing.T) {
// Create hasher from reader
hasher, err := core.NewMultiHasherFromReader(reader)
require.NoError(t, err)
defer hasher.Close()
defer func(hasher *core.MultiHasher) {
err = hasher.Close()
if err != nil {
require.NoError(t, err)
}
}(hasher)

// Read all data through the hasher
readBuf := make([]byte, len(testData))
Expand Down
69 changes: 60 additions & 9 deletions core/multihasher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"errors"
"github.com/gammazero/workerpool"
mh "github.com/multiformats/go-multihash"
mhcore "github.com/multiformats/go-multihash/core" // Import the core package
Expand Down Expand Up @@ -30,16 +31,22 @@ func (f DefaultHashFactory) GetVariableHasher(code uint64, sizeHint int) (hash.H
return mhcore.GetVariableHasher(code, sizeHint)
}

var (
_ io.Reader = (*MultiHasher)(nil)
_ io.ReadCloser = (*MultiHasher)(nil)
_ io.Seeker = (*MultiHasher)(nil)
)

type MultiHasher struct {
writers []io.Writer
hashes []hash.Hash
algos []HashAlgorithm
pool *workerpool.WorkerPool
mutex sync.Mutex // Protects access to hashes during concurrent writes
verifying bool // If true, we're in verify mode
mutex sync.Mutex // Protects access to hashes during concurrent writes
verifying bool // If true, we're in verify mode
verifyReqs []*VerifyRequest // Store verify requests for Sums()
hashFactory HashFactory // Injected dependency for creating hashers
source io.Reader // Source reader for passthrough mode
hashFactory HashFactory // Injected dependency for creating hashers
source io.Reader // Source reader for passthrough mode
}

type HashResult struct {
Expand Down Expand Up @@ -68,18 +75,21 @@ func NewMultiHasher(verifyReqs ...*VerifyRequest) *MultiHasher {
// The returned MultiHasher implements io.Reader and can be used as a passthrough.
func NewMultiHasherFromReader(r io.Reader) (*MultiHasher, error) {
hasher := NewMultiHasher()

// If the input is seekable, ensure we start from the beginning
if seeker, ok := r.(io.ReadSeeker); ok {
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
hasher.Close()
err2 := hasher.Close()
if err2 != nil {
return nil, err
}
return nil, err
}
}

// Store the source reader
hasher.source = r

return hasher, nil
}

Expand Down Expand Up @@ -224,7 +234,7 @@ func (m *MultiHasher) Sums() []HashResult {
}
if matchingReq != nil {
proof = matchingReq.Proof

// Set proof right before verification if the hasher supports it
if provider, ok := h.(HashProofProvider); ok {
if err := provider.SetProof(proof); err != nil {
Expand Down Expand Up @@ -279,9 +289,50 @@ func (m *MultiHasher) Read(p []byte) (n int, err error) {
return n, err
}

func (m *MultiHasher) Close() {
func (m *MultiHasher) Seek(offset int64, whence int) (int64, error) {
if m.source == nil {
return 0, io.ErrUnexpectedEOF
}

seeker, ok := m.source.(io.Seeker)
if !ok {
return 0, errors.New("source does not support seeking")
}

// Seek the underlying reader
newPos, err := seeker.Seek(offset, whence)
if err != nil {
return newPos, err
}

// Reset all hashers since we're changing position
for _, h := range m.hashes {
if h != nil {
h.Reset()
}
}

return newPos, nil
}

func (m *MultiHasher) Close() error {
// StopWait waits for all submitted tasks to complete.
m.pool.StopWait()

// Close the source reader if it implements io.Closer
if closer, ok := m.source.(io.Closer); ok {
return closer.Close()
}
return nil
}

// GetHashes returns the internal hash implementations.
// This method is primarily intended for testing purposes to inspect the hashers.
// It should not be used in production code.
func (m *MultiHasher) GetHashes() []hash.Hash {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.hashes
}

// SetHashes sets the internal hash implementations.
Expand Down
Loading