local

package
v0.0.0-...-6ad7dde Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LocationRecordArrayFactory lossymap.RecordArrayFactory[object.FlatReference, uint64, EpochIDResolver] = locationRecordArrayFactory{}

LocationRecordArrayFactory is a factory for creating backing stores of the location-record map.

Functions

func NewStore

func NewStore(
	lock *sync.RWMutex,
	referenceLocationMap ReferenceLocationMap,
	locationBlobMap LocationBlobMap,
	epochList EpochList,
	currentRegionSizeBytes uint64,
	newRegionSizeBytes uint64,
) object.Store[object.FlatReference, struct{}]

NewStore creates an object store that uses locally connected disks as its backing store.

func NewStoreFromConfiguration

func NewStoreFromConfiguration(terminationGroup program.Group, configuration *configuration_pb.StoreConfiguration) (object.Store[object.FlatReference, struct{}], error)

NewStoreFromConfiguration creates a new local object store that uses the block devices and parameters specified in a Protobuf configuration message.

Types

type DataSyncer

type DataSyncer func() error

DataSyncer is a callback that PeriodicSyncer.ProcessLocationsChanged() invokes into to request that the contents of objects are synchronized to disk.

Synchronizing these is a requirements to ensure that the ReferenceLocationMap does not reference objects that are only partially written.

type EpochIDResolver

type EpochIDResolver interface {
	GetEpochStateForEpochID(epochID uint32) (EpochState, bool)
	GetCurrentEpochState() (EpochState, uint32)
}

EpochIDResolver is used by implementations of ReferenceLocationRecordArray to determine whether any entries it stores still point to valid data in the location-blob map.

type EpochList

type EpochList interface {
	EpochIDResolver

	// Indicate that a write of an object to the location-blob map
	// ending at a given location has completed, and that a
	// reference-location map entry is about to be written.
	FinalizeWriteUpToLocation(location uint64) error

	// Indicate that an object ending at a given location was read
	// from the location-blob map, but that its contents were
	// invalid. This means that either data corruption has occurred,
	// or that the object got overwritten while being read.
	DiscardUpToLocation(location uint64)
}

EpochList is used by the local object store to manage the creation and lifetime of epochs. Epochs are assigned to reference-location map entries and act as a way of ensuring consistency between the reference-location map and location-blob map, even if unclean shutdowns are performed.

func NewVolatileEpochList

func NewVolatileEpochList(maximumLocationSpan uint64, randomNumberGenerator random.SingleThreadedGenerator) EpochList

NewVolatileEpochList creates a simple EpochList that does not support any persistency. This is sufficient for setups where losing data upon restart is acceptable (e.g., worker level caches).

type EpochState

type EpochState struct {
	HashSeed        uint64
	MinimumLocation uint64
	MaximumLocation uint64
}

EpochState describes for a given state what the range of valid locations are. This allows implementations of ReferenceLocationRecordArray to suppress and overwrite entries that are no longer valid.

EpochState also contains a hash seed that may be used by ReferenceLocationRecordArray when computing hashes of entries. This is needed to ensure that entries belonging to previous unclean shutdowns are suppressed.

func (*EpochState) IsValidLocation

func (s *EpochState) IsValidLocation(location uint64, sizeBytes int) bool

IsValidLocation returns true if a given location and object size are valid within the current epoch. This is used to ensure that entries in the ReferenceLocationRecordArray are suppressed if they refer to locations on the block device that have in the meantime been overwritten.

type LocationBlobMap

type LocationBlobMap interface {
	Get(location uint64, sizeBytes int) ([]byte, error)
	Put([]byte) (uint64, error)
	GetNextPutLocation() uint64
}

LocationBlobMap is a map of object location to its contents. Every time an object is stored, it is assigned a location. Locations are expected to increase monotonically. As storage space is only finite, implementations of LocationBlobMap are expected to behave like ring buffers. Once space is exhausted, objects with the lowest location values are expected to be overwritten.

func NewBlockDeviceBackedLocationBlobMap

func NewBlockDeviceBackedLocationBlobMap(blockDevice blockdevice.BlockDevice, sectorSizeBytes int, sectorCount int64, initialLocation uint64) LocationBlobMap

NewBlockDeviceBackedLocationBlobMap creates a location-blob map that is backed by a block device. This implementation is the one that is most suitable for production workloads.

func NewInMemoryLocationBlobMap

func NewInMemoryLocationBlobMap(sizeBytes int) LocationBlobMap

NewInMemoryLocationBlobMap creates a location-blob map that is backed by a simple fixed size byte slice that resides in memory. This may not be useful for most production worthy setups, as they need to store more data than fits in memory.

type PeriodicSyncer

type PeriodicSyncer struct {
	// contains filtered or unexported fields
}

PeriodicSyncer can be used to monitor PersistentEpochList for allocations for storing objects. When such events occur, the state of the PersistentEpochList is extracted and written to disk. This allows its contents to be recovered after a restart.

func NewPeriodicSyncer

func NewPeriodicSyncer(
	source PersistentStateSource,
	sourceLock *sync.RWMutex,
	store PersistentStateStore,
	clock clock.Clock,
	errorLogger util.ErrorLogger,
	errorRetryInterval,
	minimumEpochInterval time.Duration,
	referenceLocationMapHashInitialization uint64,
	dataSyncer DataSyncer,
) *PeriodicSyncer

NewPeriodicSyncer creates a new PeriodicSyncer according to the arguments provided.

func (*PeriodicSyncer) ProcessLocationsChanged

func (ps *PeriodicSyncer) ProcessLocationsChanged(ctx context.Context) bool

ProcessLocationsChanged waits for allocations of locations for storing object contents to be made against a PersistentEpochList. It causes data on the underlying block device to be synchronized after a certain amount of time, followed by updating the persistent state stored on disk.

This function must generally be called in a loop in a separate goroutine, so that the persistent state is updated continuously. The return value of this method denotes whether the caller must continue to call this method. When false, it indicates the provided context was cancelled, due to a shutdown being requested.

type PersistentEpochList

type PersistentEpochList struct {
	// contains filtered or unexported fields
}

PersistentEpochList is an implementation of EpochList whose internal state can be extracted and persisted. This allows data contained in the local object store to be accessed after restarts.

func NewPersistentEpochList

func NewPersistentEpochList(
	maximumLocationSpan uint64,
	randomNumberGenerator random.SingleThreadedGenerator,
	minimumEpochID uint32,
	minimumLocation uint64,
	epochs []*pb.EpochState,
) *PersistentEpochList

NewPersistentEpochList creates an instance of PersistentEpochList having state on epochs that have been reloaded from a persistent state file.

func (*PersistentEpochList) DiscardUpToLocation

func (el *PersistentEpochList) DiscardUpToLocation(location uint64)

DiscardUpToLocation discards epochs up to a given location. This method is invoked when data corruption is detected.

func (*PersistentEpochList) FinalizeWriteUpToLocation

func (el *PersistentEpochList) FinalizeWriteUpToLocation(location uint64) error

FinalizeWriteUpToLocation is called after writing an object to storage has finished. This either causes a new epoch to be started, or the maximum location covered by the latest epoch to be raised. It may also cause old epochs to be discarded, if it is known that all objects covered by those epochs have in the meantime been overwritten.

func (*PersistentEpochList) GetCurrentEpochState

func (el *PersistentEpochList) GetCurrentEpochState() (EpochState, uint32)

GetCurrentEpochState returns the hash seed to use when writing new entries into the reference-location map.

func (*PersistentEpochList) GetEpochStateForEpochID

func (el *PersistentEpochList) GetEpochStateForEpochID(epochID uint32) (EpochState, bool)

GetEpochStateForEpochID returns a hash seed that was used by a given epoch ID, and the minimum and maximum locations covered by this epoch. This can be used to suppress reference-location map entries that refer to overwritten or corrupted data.

func (*PersistentEpochList) GetLocationsChangedWakeup

func (el *PersistentEpochList) GetLocationsChangedWakeup() <-chan struct{}

GetLocationsChangedWakeup returns a channel that triggers when there was data stored in the location-blob map since the last persistent state was written to disk.

func (*PersistentEpochList) GetPersistentState

func (el *PersistentEpochList) GetPersistentState() (uint32, uint64, []*pb.EpochState)

GetPersistentState returns information that needs to be persisted to disk to be able to restore the layout of the EpochList after a restart.

func (*PersistentEpochList) NotifySyncCompleted

func (el *PersistentEpochList) NotifySyncCompleted()

NotifySyncCompleted needs to be called right after the data on the storage medium underneath the LocationBlobMap is synchronized. This causes the next call to GetPersistentState() to return information on the newly synchronized data.

func (*PersistentEpochList) NotifySyncStarting

func (el *PersistentEpochList) NotifySyncStarting(isFinalSync bool)

NotifySyncStarting needs to be called right before the data on the storage medium underneath the LocationBlobMap is synchronized. This causes the epoch ID to be increased when the next blob is stored.

type PersistentStateSource

type PersistentStateSource interface {
	// GetLocationsChangedWakeup returns a channel that triggers if
	// allocations for storing object contents have been made
	// against an EpochList, or if data is being discarded. This can
	// be used by PeriodicSyncer to synchronize data to storage.
	// PeriodicSyncer may apply a short delay before actually
	// synchronize data to perform some batching.
	//
	// This function must be called while holding a read lock on the
	// EpochList.
	GetLocationsChangedWakeup() <-chan struct{}

	// NotifySyncStarting instructs the EpochList that
	// PeriodicSyncer is about to synchronize data to storage.
	// Successive writes to the EpochList should use a new epoch ID,
	// as there is no guarantee their data is synchronized as part
	// of the current epoch.
	//
	// This function must be called while holding a write lock on
	// the EpochList.
	NotifySyncStarting(isFinalSync bool)

	// NotifySyncCompleted instructs the EpochList that the
	// synchronization performed after the last call to
	// NotifySyncStarting was successful.
	//
	// Future calls to GetPersistentState may now return information
	// about epochs that were created before the previous
	// NotifySyncStarting call.
	//
	// Calling this function may cause the next channel returned by
	// GetLocationsChangedWakeup to block once again.
	//
	// This function must be called while holding a write lock on
	// the EpochList.
	NotifySyncCompleted()

	// GetPersistentState returns information about all epochs that
	// are managed by the EpochList and have been synchronized to
	// storage successfully.
	//
	// This function must be called while holding a read lock on the
	// EpochList.
	GetPersistentState() (minimumEpochID uint32, minimumLocation uint64, epochs []*pb.EpochState)
}

PersistentStateSource is used by PeriodicSyncer to determine whether the persistent state file needs to update, and if so which contents it needs to hold.

type PersistentStateStore

type PersistentStateStore interface {
	ReadPersistentState() (*pb.PersistentState, error)
	WritePersistentState(persistentState *pb.PersistentState) error
}

PersistentStateStore is used by PeriodicSyncer to write PersistentBlockList's state to disk. This state can be reloaded on startup to make it possible to access data that was written in the past.

func NewDirectoryBackedPersistentStateStore

func NewDirectoryBackedPersistentStateStore(directory filesystem.Directory) PersistentStateStore

NewDirectoryBackedPersistentStateStore creates a PersistentStateStore that writes PersistentState Protobuf messages to a file named "state" stored inside a filesystem.Directory.

type ReferenceLocationMap

type ReferenceLocationMap = lossymap.Map[object.FlatReference, uint64, EpochIDResolver]

ReferenceLocationMap is used by the local object store to keep track of the location on disk where a given object is stored.

type ReferenceLocationRecord

type ReferenceLocationRecord = lossymap.Record[object.FlatReference, uint64]

ReferenceLocationRecord represents a record that is stored in a ReferenceLocationRecordArray.

type ReferenceLocationRecordArray

type ReferenceLocationRecordArray = lossymap.RecordArray[object.FlatReference, uint64, EpochIDResolver]

ReferenceLocationRecordArray is an array of ReferenceLocationRecord objects. Implementations of this type are used if a ReferenceLocationMap is backed by a hash table.

func NewBlockDeviceBackedReferenceLocationRecordArray

func NewBlockDeviceBackedReferenceLocationRecordArray(device blockdevice.BlockDevice) ReferenceLocationRecordArray

NewBlockDeviceBackedReferenceLocationRecordArray creates a persistent ReferenceLocationRecordArray. It works by using a block device as an array-like structure, writing serialized ReferenceLocationRecords next to each other.

func NewInMemoryReferenceLocationRecordArray

func NewInMemoryReferenceLocationRecordArray(size int) ReferenceLocationRecordArray

NewInMemoryReferenceLocationRecordArray creates a ReferenceLocationRecordArray that stores its data in memory. This is sufficient for setups where persistency across restarts is not needed. Either because the data store is only used for local caching (e.g., on workers), or because storage nodes use mirroring and the loss of a single replica is tolerated.

type ReferenceLocationRecordKey

type ReferenceLocationRecordKey = lossymap.RecordKey[object.FlatReference]

ReferenceLocationRecordKey represents the key portion of a record that is stored in a ReferenceLocationRecordArray..