Documentation
¶
Overview ¶
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package xact provides core functionality for the AIStore eXtended Actions (xactions).
- Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
Index ¶
- Constants
- Variables
- func CheckValidKind(kind string) (err error)
- func CheckValidUUID(id string) (err error)
- func Cname(kind, uuid string) string
- func CompareRebIDs(someID, fltID string) int
- func GetCtxVlabs(ctx context.Context) map[string]string
- func GetKindName(kindOrName string) (kind, name string)
- func GetSimilar(kindOrName string) (simKind, simName string)
- func GoRunW(xctn core.Xact)
- func IdlesBeforeFinishing(kindOrName string) bool
- func IsErrRecvAbortWI(err error) bool
- func IsErrRecvAbortXact(err error) bool
- func IsSameScope(kindOrName string, scs ...int) bool
- func IsValidKind(kind string) bool
- func IsValidRebID(id string) (valid bool)
- func IsValidUUID(id string) bool
- func ListDisplayNames(onlyStartable bool) (names []string)
- func NewCtxVlabs(vlabs map[string]string) context.Context
- func ParseCname(cname string) (xactKind, xactID string, _ error)
- func RebID2S(id int64) string
- func RefcntQuiCB(refc *atomic.Int32, maxTimeout, totalSoFar time.Duration) core.QuiRes
- func S2RebID(id string) (int64, error)
- func TuneNumWorkers(xname string, numWorkers, numMpaths int) (int, error)
- type ArgsMsg
- type Base
- func (xctn *Base) Abort(err error) bool
- func (xctn *Base) AbortErr() error
- func (xctn *Base) AbortedAfter(d time.Duration) (err error)
- func (xctn *Base) AddErr(err error, logExtra ...int)
- func (xctn *Base) AddNotif(n core.Notif)
- func (xctn *Base) BcastCtrl(smap *meta.Smap, wid, opcode string, err error)
- func (xctn *Base) Bck() *meta.Bck
- func (xctn *Base) Bytes() int64
- func (xctn *Base) ChanAbort() <-chan error
- func (xctn *Base) Cname() string
- func (xctn *Base) EndTime() time.Time
- func (xctn *Base) Err() (err error)
- func (xctn *Base) ErrCnt() int
- func (xctn *Base) Finish()
- func (*Base) FromTo() (*meta.Bck, *meta.Bck)
- func (xctn *Base) ID() string
- func (xctn *Base) InBytes() int64
- func (xctn *Base) InObjs() int64
- func (xctn *Base) InObjsAdd(cnt int, size int64)
- func (xctn *Base) InitBase(id, kind string, bck *meta.Bck)
- func (xctn *Base) IsAborted() bool
- func (xctn *Base) IsDone() bool
- func (xctn *Base) IsIdle() bool
- func (xctn *Base) IsRunning() (yes bool)
- func (xctn *Base) JoinErr() (int, error)
- func (xctn *Base) Kind() string
- func (xctn *Base) LomAdd(lom *core.LOM)
- func (xctn *Base) Name() string
- func (xctn *Base) NewErrRecvAbortWI(tid, wid, errCause string) error
- func (xctn *Base) NewErrRecvAbortXact(tid, errCause string) error
- func (xctn *Base) NewSnap(self core.Xact) (snap *core.Snap)
- func (xctn *Base) Objs() int64
- func (xctn *Base) ObjsAdd(cnt int, size int64)
- func (xctn *Base) OutBytes() int64
- func (xctn *Base) OutObjs() int64
- func (xctn *Base) OutObjsAdd(cnt int, size int64)
- func (xctn *Base) Quiesce(d time.Duration, cb core.QuiCB) core.QuiRes
- func (xctn *Base) SendCtrl(tsi *meta.Snode, wid, opcode string, body []byte) error
- func (xctn *Base) SetStopping()
- func (xctn *Base) StartTime() time.Time
- func (xctn *Base) String() string
- func (xctn *Base) ToStats(stats *core.Stats)
- type BckJog
- type BckJogRunner
- type BckJogRunnerOpts
- type Demand
- type DemandBase
- func (r *DemandBase) Abort(err error) (ok bool)
- func (r *DemandBase) DecPending()
- func (r *DemandBase) IdleTimer() <-chan struct{}
- func (r *DemandBase) IncPending()
- func (r *DemandBase) Init(uuid, kind string, bck *meta.Bck, idleDur time.Duration, hkcb ...hk.HKCB)
- func (r *DemandBase) IsIdle() bool
- func (r *DemandBase) Pending() (cnt int64)
- func (r *DemandBase) Reset(idleTime time.Duration)
- func (r *DemandBase) Stop()
- func (r *DemandBase) SubPending(n int)
- type Descriptor
- type Marked
- type MultiSnap
- func (xs MultiSnap) AggregateState(xid string) (aborted, running, notstarted bool)
- func (xs MultiSnap) ByteCounts(xid string) (locBytes, outBytes, inBytes int64)
- func (xs MultiSnap) GetUUIDs() []string
- func (xs MultiSnap) ObjCounts(xid string) (locObjs, outObjs, inObjs int64)
- func (xs MultiSnap) RunningTarget(xid string) (string, *core.Snap, error)
- func (xs MultiSnap) ToJSON(tid string, indent bool) ([]byte, error)
- func (xs MultiSnap) TotalRunningTime(xid string) (time.Duration, error)
- type NotifXact
- type NotifXactListener
- type QueryMsg
- type SnapsCond
Constants ¶
const ( PrefixDnlID = "dnl-" // http downloader (not to confuse with blob-downloader) PrefixEtlID = "etl-" // ETL PrefixTcoID = "tco-" // transform/copy objects PrefixInvID = "inv-" // create bucket inventory (NBI) PrefixGbtID = "gbt-" // get-batch (internally, x-moss) PrefixSrtID = "srt-" // conditional linkage (build tag "dsort") // evict PrefixEvictKeepID = "kpmd-" PrefixEvictRemoveID = "rmmd-" )
const ( SepaID = "," LeftID = "[" RightID = "]" )
const ( ScopeG = iota + 1 // cluster ScopeB // bucket ScopeGB // (one bucket) | (all buckets) ScopeT // target )
const ( DefWaitTimeShort = time.Minute // zero `ArgsMsg.Timeout` defaults to DefWaitTimeLong = 7 * 24 * time.Hour // when `ArgsMsg.Timeout` is negative MaxProbingFreq = 30 * time.Second // as the name implies MinPollTime = 2 * time.Second // ditto MaxPollTime = 2 * time.Minute // can grow up to )
global waiting tunables
const ( FlagZeroSize = 1 << iota // usage: x-cleanup (apc.ActStoreCleanup) to remove zero size objects FlagLatestVer FlagSync // the flag overrides the default space-cleanup job operation making it NOT to remove (ie, keep) // misplaced objects FlagKeepMisplaced // makes global rebalance run in special cleanup mode, // safely removing misplaced objects FlagRemoveMisplaced )
ArgsMsg.Flags note: for simplicity, keeping all custom x-flags in one place and one global enum for now
const ( NwpBurstMult = 48 // channel size = burst * num-workers NwpBurstMax = 8192 // upper bound on (shared) workCh size )
shared work channel sizing
const ( NwpNone = -1 // no workers: iterated LOMs executed by the iterating goroutine NwpMin = 2 // throttled minimum NwpDflt = 0 // resolve at runtime via defaultNW() )
num-workers specials
const ( OpcodeStartedWI = "startedWI" OpcodeAbortXact = "abortXact" OpcodeAbortWI = "abortWI" )
const (
IdleDefault = time.Minute // hk -> idle tick
)
const NoneWID = "-" // wid is optional, may not be present
const NumT2TCtrlItems = 5
const (
T2TCtrl = "t2tctrl" // see also: proxyToContTCO
)
POST /v1/xactions/{one of the constants below}
Variables ¶
var IncFinished func()
var Table = map[string]Descriptor{ apc.ActElection: {DisplayName: "elect-primary", Scope: ScopeG, Startable: false}, apc.ActRebalance: {Scope: ScopeG, Startable: true, Metasync: true, Rebalance: true}, apc.ActETLInline: {Scope: ScopeG, Startable: false, AbortByReb: true}, apc.ActLRU: {DisplayName: "lru-eviction", Scope: ScopeGB, Startable: true}, apc.ActStoreCleanup: {DisplayName: "cleanup", Scope: ScopeGB, Startable: true, ConflictRebRes: true}, apc.ActSummaryBck: { DisplayName: "summary", Scope: ScopeGB, Access: apc.AceObjLIST | apc.AceBckHEAD, Startable: false, Metasync: false, }, apc.ActResilver: {Scope: ScopeT, Startable: true, Resilver: true}, apc.ActRechunk: {Scope: ScopeB, Startable: true, RefreshCap: true, ConflictRebRes: true, AbortByReb: true}, apc.ActIndexShard: {Scope: ScopeB, Startable: true, RefreshCap: false, ConflictRebRes: true, AbortByReb: false}, apc.ActECGet: {Scope: ScopeB, Startable: false, Idles: true, ExtendedStats: true}, apc.ActECPut: {Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true, ExtendedStats: true}, apc.ActECRespond: {Scope: ScopeB, Startable: false, Idles: true}, apc.ActPutCopies: {Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true}, apc.ActArchive: {Scope: ScopeB, Access: apc.AccessRW, Startable: false, RefreshCap: true, Idles: true}, apc.ActCopyObjects: { DisplayName: "copy-objects", Scope: ScopeB, Access: apc.AccessRW, Startable: false, RefreshCap: true, Idles: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActETLObjects: { DisplayName: "etl-objects", Scope: ScopeB, Access: apc.AccessRW, Startable: false, RefreshCap: true, Idles: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActBlobDl: {Access: apc.AccessRW, Scope: ScopeB, Startable: true, AbortByReb: true, RefreshCap: true}, apc.ActDownload: {Access: apc.AccessRW, Scope: ScopeG, Startable: false, Idles: true, AbortByReb: true}, apc.ActDsort: { DisplayName: "dsort", Scope: ScopeB, Access: apc.AccessRW, Startable: false, RefreshCap: true, ExtendedStats: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActPromote: { DisplayName: "promote-files", Scope: ScopeB, Access: apc.AcePromote, Startable: false, RefreshCap: true, }, apc.ActEvictObjects: { DisplayName: "evict-objects", Scope: ScopeB, Access: apc.AceObjDELETE, Startable: false, RefreshCap: true, }, apc.ActEvictRemoteBck: { DisplayName: "evict-remote-bucket", Scope: ScopeB, Access: apc.AceObjDELETE, Startable: false, RefreshCap: true, }, apc.ActDeleteObjects: { DisplayName: "delete-objects", Scope: ScopeB, Access: apc.AceObjDELETE, Startable: false, RefreshCap: true, }, apc.ActPrefetchObjects: { DisplayName: "prefetch-objects", Scope: ScopeB, Access: apc.AccessRW, Startable: true, RefreshCap: true, }, apc.ActECEncode: { DisplayName: "ec-bucket", Scope: ScopeB, Access: apc.AccessRW, Startable: true, Metasync: true, RefreshCap: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActMakeNCopies: { DisplayName: "mirror", Scope: ScopeB, Access: apc.AccessRW, Startable: true, Metasync: true, RefreshCap: true, }, apc.ActMoveBck: { DisplayName: "rename-bucket", Scope: ScopeB, Access: apc.AceMoveBucket, Startable: false, Metasync: true, Rebalance: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActCopyBck: { DisplayName: "copy-bucket", Scope: ScopeB, Access: apc.AccessRW, Startable: false, Metasync: true, RefreshCap: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActETLBck: { DisplayName: "etl-bucket", Scope: ScopeB, Access: apc.AccessRW, Startable: false, Metasync: true, RefreshCap: true, ConflictRebRes: true, AbortByReb: true, }, apc.ActList: {Scope: ScopeB, Access: apc.AceObjLIST, Startable: false, Metasync: false, Idles: true, QuietBrief: true}, apc.ActGetBatch: {Scope: ScopeGB, Startable: false, Metasync: false, ConflictRebRes: true, AbortByReb: true, Idles: true, QuietBrief: true}, apc.ActCreateNBI: {Scope: ScopeB, Startable: false, Metasync: false, ConflictRebRes: true, AbortByReb: true, Idles: false}, apc.ActLoadLomCache: {DisplayName: "warm-up-metadata", Scope: ScopeB, Startable: true}, }
Functions ¶
func CheckValidKind ¶ added in v1.3.26
func CheckValidUUID ¶ added in v1.3.26
func CompareRebIDs ¶
func GetKindName ¶ added in v1.3.16
func GetSimilar ¶ added in v1.3.26
func IdlesBeforeFinishing ¶ added in v1.3.16
func IsErrRecvAbortWI ¶ added in v1.4.5
func IsErrRecvAbortXact ¶ added in v1.4.5
func IsSameScope ¶ added in v1.3.16
func IsValidKind ¶
func IsValidRebID ¶
func IsValidUUID ¶ added in v1.3.16
func ListDisplayNames ¶ added in v1.3.16
func ParseCname ¶ added in v1.3.22
func RefcntQuiCB ¶
common ref-counted quiescence
func TuneNumWorkers ¶ added in v1.4.5
estimate the requested number of workers given: a) user-specified number, if any b) current load, and c) media type (NVMe, etc.)
used by all list-range type jobs, tcb, and blob-download - xname: xaction name (for logging) - numWorkers (when non-zero): requested number of workers - numMpaths: number of available mountpaths
Types ¶
type ArgsMsg ¶ added in v1.3.16
type ArgsMsg struct {
ID string // xaction UUID
Kind string // xaction kind _or_ name (see `xact.Table`)
DaemonID string // node that runs this xaction
Bck cmn.Bck // bucket
Buckets []cmn.Bck // list of buckets (e.g., copy-bucket, lru-evict, etc.)
Timeout time.Duration // max time to wait
Flags uint32 `json:"flags,omitempty"` // enum (FlagZeroSize, ...) bitwise
Force bool // force
OnlyRunning bool // only for running xactions
}
either xaction ID or Kind must be specified is getting passed via ActMsg.Value w/ MorphMarshal extraction
func (*ArgsMsg) NotRunning ¶ added in v1.4.2
type Base ¶
type Base struct {
// contains filtered or unexported fields
}
func (*Base) BcastCtrl ¶ added in v1.4.5
asynchronous broadcast with bounded launch parallelism (note: NOT waiting for completion)
func (*Base) IsIdle ¶ added in v1.3.16
NOTE on existing legacy: * default idle == not-running * Demand-based xactions override as needed * Python SDK's Job.wait_for_idle() and, potentially, other callers currently expect this behavior * this MAY change in the future
func (*Base) NewErrRecvAbortWI ¶ added in v1.4.5
func (*Base) NewErrRecvAbortXact ¶ added in v1.4.5
func (*Base) OutObjsAdd ¶
func (*Base) Quiesce ¶
count all the way to duration; reset and adjust every time activity is detected
func (*Base) SetStopping ¶ added in v1.4.1
func (xctn *Base) SetStopping()
mark the xaction as stopping (and finishing soon); EndTime remains zero until Finish()
type BckJogRunner ¶ added in v1.4.5
type BckJogRunner struct {
BckJog
// contains filtered or unexported fields
}
BckJogRunner extends BckJog with a managed worker pool. The caller provides a single CbObj callback; BckJogRunner owns dispatch, channel management, slab allocation, and worker lifecycle.
func (*BckJogRunner) Init ¶ added in v1.4.5
func (r *BckJogRunner) Init(id, kind string, bck *meta.Bck, opts BckJogRunnerOpts, config *cmn.Config) error
Init initializes BckJogRunner from opts. If opts.NumWorkers != NwpNone, it auto-tunes worker count (media + load) and sets up the internal worker pool. Returns an error only if the system is under such extreme pressure that starting workers is unsafe.
func (*BckJogRunner) NumWorkers ¶ added in v1.4.5
func (r *BckJogRunner) NumWorkers() int
NumWorkers returns the configured worker count (0 if no pool).
func (*BckJogRunner) Run ¶ added in v1.4.5
func (r *BckJogRunner) Run()
Run launches workers (if any) then starts the mountpath joggers.
func (*BckJogRunner) Wait ¶ added in v1.4.5
func (r *BckJogRunner) Wait() error
Wait waits for joggers to finish then drains the worker pool.
func (*BckJogRunner) WorkChanFull ¶ added in v1.4.5
func (r *BckJogRunner) WorkChanFull() int64
WorkChanFull returns the accumulated count of work-channel-full events.
type BckJogRunnerOpts ¶ added in v1.4.5
type BckJogRunnerOpts struct {
// CbObj is called for each visited object.
CbObj func(*core.LOM, []byte) error
// WalkBck, when non-nil, overrides the traversal bucket.
// Use when the xaction's logical bucket differs from the walk source
// (e.g. copy-bucket registers the destination but walks the source).
WalkBck *meta.Bck
// Prefix filters visited objects by name prefix.
Prefix string
// RW must be true when the xaction modifies the local filesystem.
RW bool
// NumWorkers controls worker pool size:
// NwpNone (-1): no pool, objects processed inline
// NwpDflt (0): auto-tune based on media type and current load
// > 0: treat as a ceiling; still throttled under load
NumWorkers int
// Burst sets the work-channel lower bound (minimum capacity).
// Zero means use cmn.XactBurstDflt. Callers with their own burst config
// (e.g. TCB uses config.TCB.Burst) should pass it explicitly.
Burst int
}
BckJogRunnerOpts configures BckJogRunner initialization.
type Demand ¶
type Demand interface {
core.Xact
IdleTimer() <-chan struct{}
IncPending()
DecPending()
SubPending(n int)
}
xaction that self-terminates after staying idle for a while with an added capability to renew itself and ref-count its pending work
type DemandBase ¶
type DemandBase struct {
Base
// contains filtered or unexported fields
}
func (*DemandBase) Abort ¶
func (r *DemandBase) Abort(err error) (ok bool)
func (*DemandBase) DecPending ¶
func (r *DemandBase) DecPending()
func (*DemandBase) IdleTimer ¶
func (r *DemandBase) IdleTimer() <-chan struct{}
func (*DemandBase) IncPending ¶
func (r *DemandBase) IncPending()
func (*DemandBase) IsIdle ¶ added in v1.3.16
func (r *DemandBase) IsIdle() bool
NOTE: override `Base.IsIdle`
func (*DemandBase) Pending ¶
func (r *DemandBase) Pending() (cnt int64)
func (*DemandBase) Reset ¶ added in v1.3.22
func (r *DemandBase) Reset(idleTime time.Duration)
(e.g. usage: listed last page)
func (*DemandBase) Stop ¶
func (r *DemandBase) Stop()
func (*DemandBase) SubPending ¶
func (r *DemandBase) SubPending(n int)
type Descriptor ¶
type Descriptor struct {
DisplayName string // as implied
Access apc.AccessAttrs // default access permissions; ais/proxy does most of the checking wo/ relying on these defaults
Scope int // ScopeG (global), etc. - the enum above
Startable bool // true if user can start this xaction (e.g., via `api.StartXaction`)
Metasync bool // true if this xaction changes (and metasyncs) cluster metadata
RefreshCap bool // refresh capacity stats upon completion
// see xreg for "limited coexistence"
Rebalance bool // moves data between nodes
Resilver bool // moves data between mountpaths
ConflictRebRes bool // starting this job would conflict with rebalance or resilver that's currently in progress
AbortByReb bool // gets aborted upon rebalance (coincides with ConflictRebRes with very few exceptions)
// xaction has an intermediate `idle` state whereby it "idles" between requests
// (see related: xact/demand.go)
Idles bool
// xaction returns extended xaction-specific stats
// (see related: `Snap.Ext` in core/xaction.go)
ExtendedStats bool
// suppress verbose per-state log records and keep only hk.OldAgeXshort (1m)
// in registry history
QuietBrief bool
}
func GetDescriptor ¶ added in v1.3.16
func GetDescriptor(kindOrName string) (string, Descriptor, error)
type MultiSnap ¶ added in v1.3.16
`api.QueryXactionSnaps` control structure
func (MultiSnap) AggregateState ¶ added in v1.4.2
return: `aborted` => any selected xaction aborted on any target `running` => any selected xaction currently running on any target `notstarted` => selected xaction not yet visible / not started on any target selection: - xid != "": the specified xaction UUID (all targets) - xid == "": all UUIDs present in this MultiSnap (all targets)
func (MultiSnap) ByteCounts ¶ added in v1.3.16
func (MultiSnap) RunningTarget ¶ added in v1.3.16
type NotifXactListener ¶
type NotifXactListener struct {
nl.ListenerBase
}
func (*NotifXactListener) QueryArgs ¶
func (nxb *NotifXactListener) QueryArgs() cmn.HreqArgs
func (*NotifXactListener) UnmarshalStats ¶
func (*NotifXactListener) UnmarshalStats(rawMsg []byte) (stats any, finished, aborted bool, err error)
func (*NotifXactListener) WithCause ¶ added in v1.3.21
func (nxb *NotifXactListener) WithCause(cause string) *NotifXactListener
type QueryMsg ¶
type QueryMsg struct {
OnlyRunning *bool `json:"show_active"`
Bck cmn.Bck `json:"bck"`
ID string `json:"id"`
Kind string `json:"kind"`
DaemonID string `json:"node,omitempty"`
Buckets []cmn.Bck `json:"buckets,omitempty"`
}
simplified JSON-tagged version of the ArgsMsg (internal use)
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions).
|
Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions). |
|
Package xs is a collection of eXtended actions (xactions), including multi-object operations, list-objects, (cluster) rebalance and (target) resilver, ETL, and more.
|
Package xs is a collection of eXtended actions (xactions), including multi-object operations, list-objects, (cluster) rebalance and (target) resilver, ETL, and more. |