queuedproto

package module
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: MIT Imports: 7 Imported by: 0

README

queuedproto

import "github.com/my-mail-ru/go-queuedproto"

Package queuedproto - поддержка протокола queued.

Кроме структур протокола самого queued, поддерживаются форматы перловых очередей. Perl addons (используются во всех современных проектах на perl) поддерживаются, perl extensions (используются только в одном крупном старом проекте) - нет.

Типы/константы названы так же, как в исходниках queued. В перле используется другая терминология.

Index

Constants

Коды команд queued

const (
    CmdAddItem      = uint32(20)
    CmdGetActive    = uint32(21)
    CmdDeleteItems  = uint32(22)
    CmdUpdateItems  = uint32(24)
    CmdGetItems     = uint32(28)
    CmdFullUpdate   = uint32(30)
    CmdAddData      = uint32(34)
    CmdGetQueueStat = uint32(36)
)

Коды ответов queued

const (
    // errors
    RcOK                  = uint8(0)
    RcLogicError          = uint8(1)
    RcWrongItem           = uint8(2)
    RcMemAllocationFailed = uint8(3)
    RcUnknownQueueType    = uint8(4)
    RcReqAlreadyProcessed = uint8(8)
    RcItemLocked          = uint8(9)
    RcWrongRequest        = uint8(10)
    RcBadRequestLength    = uint8(11)
    RcWrongVersion        = uint8(12)

    // warnings
    RcNoActiveItems           = uint8(5)
    RcInsufficientActiveCount = uint8(6)
    RcWrongID                 = uint8(7)
)

Флаги расширений, указываются в поле Version

const (
    FlagEnableExtensions = uint8(0x80)
    FlagEncodeInUTF8     = uint8(0x40)
    FlagEnableAddons     = uint8(0x20)

    VersionMask = ^(FlagEnableExtensions | FlagEncodeInUTF8 | FlagEnableAddons)
)

Типы времени для команды CmdUpdateItems

const (
    TimestampAbsolute      = uint8(0) // указанный UnixTime - абсолютный (секунды с 1970-01-01T00:00:00Z)
    TimestampRelativePlus  = uint8(1) // в Unixtime относительное время, которое необходимо прибавить к текущему серверному времени
    TimestampRelativeMinus = uint8(2) // в Unixtime относительное время, которое необходимо отнять от текущего серверного времени
)

FlagIgnoreWrongItem - флаг команды CmdAddItem

const FlagIgnoreWrongItem = uint32(1)

SkipTimeUpdate - для запроса ReqFullUpdate - magic value для UnixTime, в случае передачи этого значения время не изменяется.

const SkipTimeUpdate = 0xFFFFFFFF

Variables

Протокольные ошибки queued

var (
    ErrQueued              = errors.New("queued error")
    ErrLogic               = fmt.Errorf("%w: logic error", ErrQueued)
    ErrWrongItem           = fmt.Errorf("%w: wrong item", ErrQueued)
    ErrMemAllocationFailed = fmt.Errorf("%w: queued memory allocation failure", ErrQueued)
    ErrUnknownQueueType    = fmt.Errorf("%w: unknown queue type", ErrQueued)
    ErrReqAlreadyProcessed = fmt.Errorf("%w: request has been already processed (duplicate ReqID)", ErrQueued)
    ErrItemLocked          = fmt.Errorf("%w: item locked", ErrQueued)
    ErrWrongRequest        = fmt.Errorf("%w: wrong request", ErrQueued)
    ErrBadRequestLength    = fmt.Errorf("%w: bad request length", ErrQueued)
    ErrWrongVersion        = fmt.Errorf("%w: wrong version", ErrQueued)
)

func ErrorByRetCode

func ErrorByRetCode(rc uint8) error

ErrorByRetCode - возвращает ошибку по коду, для неизвестных ошибок в тексте сообщается код. Для RcOK и всех ворнингов возвращается nil, если ворнинги нужны - их нужно обрабатывать вручную.

func IsSoftError

func IsSoftError(err error) bool

IsSoftError сообщает, является ли ошибка логической - т.е. не связанной с низкоуровневыми проблемами (разрывы соединения, таймауты, блокировки, сбои сервера).

Все неизвестные протокольные ошибки считаются "жёсткими".

Запросы с логическими ошибками повторять не надо, с низкоуровневыми - можно/нужно.

Можно передавать также обёрнутые ошибки.

type EventID

EventID - составной ID с номером шарда. Нужен, т.к. протокол (и сам queued) не поддерживает шардинг. Связка Shard:ID уникальна, тогда как сам по себе ID в списке событий может быть неуникальным, если события пришли из разных шардов. Такое возможно только при автогенерации ID событий на стороне queued.

В структурах запросах этот тип используется только для слайсов (чтобы не перелопачивать слайсы целиком из параметров метода, которому номер шарда должен как-то поступать). Скалярные айдишники надо будет скопировать (только ID, передающийся по прококолу).

type EventID struct {
    Shard uint `iproto:"-"`
    ID    uint64
}

func (EventID) MarshalIProto
func (recvEventID EventID) MarshalIProto(buf []byte) ([]byte, error)

func (EventID) MarshalText
func (eid EventID) MarshalText() ([]byte, error)

MarshalText - для упрощения логгирования списков событий при помощи zerolog.Event.Interface (не нужно переваливать слайс айдишников событий в слайс строк/стрингеров).

func (EventID) String
func (eid EventID) String() string

String возвращает EventID в перловом текстовом формате

func (*EventID) UnmarshalIProto
func (recv_EventID *EventID) UnmarshalIProto(buf []byte) ([]byte, error)

type ItemList

ItemList - список событий

type ItemList struct {
    Items []QueueItem `iproto:"u32"`
}

func (ItemList) MarshalIProto
func (recvItemList ItemList) MarshalIProto(buf []byte) ([]byte, error)

func (*ItemList) UnmarshalIProto
func (recv_ItemList *ItemList) UnmarshalIProto(buf []byte) ([]byte, error)

type PerlAddonData

PerlAddonData - данные аддона. Константы ID и структуры формата данных оперделены в пакете addons

type PerlAddonData struct {
    ID   uint16 // addons.CreationTimeID, addons.RetryID, addons.Retry2ID или addons.ProducerID
    Data []byte `iproto:"u8"`
}

type PerlAddons

PerlAddons - список аддонов

type PerlAddons struct {
    List []PerlAddonData `iproto:"u8"`
}

func (PerlAddons) MarshalIProto
func (recvPerlAddons PerlAddons) MarshalIProto(buf []byte) ([]byte, error)

func (*PerlAddons) UnmarshalIProto
func (recv_PerlAddons *PerlAddons) UnmarshalIProto(buf []byte) ([]byte, error)

type PerlData

PerlData - событие очереди в перловом формате.

Аддоны декодируются, только если установлен флаг FlagEnableAddons, иначе Addons.List устанавливается в nil. При кодировании, если len(Addons.List) != 0, они кодируются, и автоматически устанавливается флаг FlagEnableAddons.

Extensions - аналогично, однако их внутренности (пока) не обрабатываются (просто декодируются, как слайс байтов).

При кодировании флаг FlagEncodeInUTF8 устанавливается автоматически всегда.

type PerlData struct {
    Version    uint8          // версия (содержит флаги FlagEnableExtensions, FlagEncodeInUTF8, FlagEnableAddons)
    Addons     PerlAddons     // список данных аддонов  (если установлен FlagEnableAddons - иначе поле не передаётся!)
    Extensions PerlExtensions // данные расширения (если установлен FlagEnableExtensions - иначе поле не передаётся!)
    Data       []byte         // данные в формате perl pack
}

func (PerlData) MarshalIProto
func (pd PerlData) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto кодирует данные в перловом формате, передавая необязательные поля, только если они заданы. В этом случае устанавливаются соответствующие им флаги (FlagEnableAddons - для Addons, FlagEnableExtensions - для Extensions).

func (*PerlData) UnmarshalIProto
func (pd *PerlData) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto декодирует данные в перловом формате, обрабатывая необязательные поля, только если установлены соответствующие им флаги (FlagEnableAddons - для Addons, FlagEnableExtensions - для Extensions).

Если флаг для необязательного поля не установлен, оно сбрасывается в дефлотное значение, так сделано, чтобы не прилетели данные от предыдущей записи в случае переиспользования объекта.

type PerlExtensions

PerlExtensions - данные расширений (пока не обрабатываются, де/кодируются как слайс байтов)

type PerlExtensions struct {
    Flags uint32
    Data  []byte `iproto:"u32"`
}

func (PerlExtensions) MarshalIProto
func (recvPerlExtensions PerlExtensions) MarshalIProto(buf []byte) ([]byte, error)

func (*PerlExtensions) UnmarshalIProto
func (recv_PerlExtensions *PerlExtensions) UnmarshalIProto(buf []byte) ([]byte, error)

type QueueItem

QueueItem - событие в очереди

type QueueItem struct {
    EventID EventID
    Data    []byte `iproto:"u16"` // Данные (до 4кБ)
}

type ReqAddData

ReqAddData - формат tuple запроса на добавление данных в конец события

type ReqAddData struct {
    StorageType uint16 // номер очереди
    ID          uint64 // ID изменяемого события
    Data        []byte `iproto:"u16"` // дописываемые данные (в сумме с существующими должно быть до 4кБ)
}

func (ReqAddData) Cmd
func (ReqAddData) Cmd() uint32

Cmd - команда queued: CmdAddData (34)

func (ReqAddData) MarshalIProto
func (recvReqAddData ReqAddData) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqAddData) UnmarshalIProto
func (recv_ReqAddData *ReqAddData) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqAddItem

ReqAddItem - формат tuple запроса на создание новой записи в очереди.

type ReqAddItem struct {
    Pid         uint32 // pid клиента
    ReqID       uint32 // номер запроса, в случае таймаута на сервере при перепосылке запроса ReqID 2й попытки должен совпадать с ReqID первой попытки
    StorageType uint16 // номер очереди
    ID          uint64 // ID события, 0 - queued сам назначает ID (автоинкремент)
    UnixTime    uint32 // время активации события
    Data        []byte `iproto:"u16"` // данные события, до 4096 байт
    Flags       uint32 `iproto:"-"`   // FlagIgnoreWrongItem. Поле опциональное (передаётся, только если не 0)
}

func (ReqAddItem) Cmd
func (ReqAddItem) Cmd() uint32

Cmd - команда queued: CmdAddItem (20)

func (ReqAddItem) MarshalIProto
func (req ReqAddItem) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto кодирует запрос на создание новой записи в очереди.

Кастомный маршалер необходим из-за опциональности поля Flags.

func (*ReqAddItem) UnmarshalIProto
func (req *ReqAddItem) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto декодирует запрос на создание новой записи в очереди.

Метод объявлен чисто для симметрии маршалеров, для проектов клиентов/обработчиков очередей он не нужен. Пригодится для тестов, прокси iproto/queued->grpc, ну и для гошной версии queued :)

type ReqDeleteItems

ReqDeleteItems - формат tuple запроса на удаление событий по списку ID

type ReqDeleteItems struct {
    StorageType uint16   // номер очереди
    IDs         []uint64 `iproto:"u32"`
}

func (ReqDeleteItems) Cmd
func (ReqDeleteItems) Cmd() uint32

Cmd - команда queued: CmdDeleteItems (22)

func (ReqDeleteItems) MarshalIProto
func (recvReqDeleteItems ReqDeleteItems) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqDeleteItems) UnmarshalIProto
func (recv_ReqDeleteItems *ReqDeleteItems) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqFullUpdate

ReqFullUpdate - формат tuple запроса на обновление нескольких событий в очереди

type ReqFullUpdate struct {
    StorageType uint16         // номер очереди
    UnixTime    uint32         // новое время активации обновлённых событий, если [SkipTimeUpdate] (0xFFFFFFFF), время не обновляется
    Items       []UpdQueueItem `iproto:"u32"`
}

func (ReqFullUpdate) Cmd
func (ReqFullUpdate) Cmd() uint32

Cmd - команда queued: CmdFullUpdate (30)

func (ReqFullUpdate) MarshalIProto
func (recvReqFullUpdate ReqFullUpdate) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqFullUpdate) UnmarshalIProto
func (recv_ReqFullUpdate *ReqFullUpdate) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqGetActive

ReqGetActive - формат tuple запроса на получение Count активных событий из очереди

type ReqGetActive struct {
    StorageType uint16 // номер очереди
    Count       uint32 // количество событий в ответе
}

func (ReqGetActive) Cmd
func (ReqGetActive) Cmd() uint32

Cmd - команда queued: CmdGetActive (21)

func (ReqGetActive) MarshalIProto
func (recvReqGetActive ReqGetActive) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqGetActive) UnmarshalIProto
func (recv_ReqGetActive *ReqGetActive) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqGetItems

ReqGetItems - формат tuple запроса на получение событий по ID

type ReqGetItems struct {
    StorageType uint16   // номер очереди
    IDs         []uint64 `iproto:"u32"`
}

func (ReqGetItems) Cmd
func (ReqGetItems) Cmd() uint32

Cmd - команда queued: CmdGetItems (28)

func (ReqGetItems) MarshalIProto
func (recvReqGetItems ReqGetItems) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqGetItems) UnmarshalIProto
func (recv_ReqGetItems *ReqGetItems) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqQueueStat

ReqQueueStat - получение статистики

type ReqQueueStat struct {
    Version     uint8
    StorageType uint16
}

func (ReqQueueStat) Cmd
func (ReqQueueStat) Cmd() uint32

Cmd - команда queued: CmdGetQueueStat (36)

func (ReqQueueStat) MarshalIProto
func (recvReqQueueStat ReqQueueStat) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqQueueStat) UnmarshalIProto
func (recv_ReqQueueStat *ReqQueueStat) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqUpdateItems

ReqUpdateItems - формат tuple запроса на обновление времени срабатывания события

type ReqUpdateItems struct {
    StorageType   uint16   // номер очереди
    UnixTime      uint32   // новое время активации для всех указанных событий
    IDs           []uint64 `iproto:"u32"`
    TimestampType uint8    // TimestampAbsolute, TimestampRelativePlus или TimestampRelativeMinus
}

func (ReqUpdateItems) Cmd
func (ReqUpdateItems) Cmd() uint32

Cmd - команда queued: CmdUpdateItems(24)

func (ReqUpdateItems) MarshalIProto
func (recvReqUpdateItems ReqUpdateItems) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqUpdateItems) UnmarshalIProto
func (recv_ReqUpdateItems *ReqUpdateItems) UnmarshalIProto(buf []byte) ([]byte, error)

type Request

Request - структуры команд протокола должны поддерживать этот интерфейс (возвращать код команды методом Cmd)

type Request interface {
    iproto.Marshaler
    Cmd() uint32
}

type RespQueueStat

RespQueueStat - ответ на команду запроса статистики

type RespQueueStat struct {
    ItemsCount  uint32 // кол-во событий в очереди
    ActiveCount uint32 // кол-во активных событий в очереди
    LockedCount uint32 // кол-во заблокированных событий в очереди
}

func (RespQueueStat) MarshalIProto
func (recvRespQueueStat RespQueueStat) MarshalIProto(buf []byte) ([]byte, error)

func (RespQueueStat) String
func (i RespQueueStat) String() string

String возвращает статистику в формате строки

func (*RespQueueStat) UnmarshalIProto
func (recv_RespQueueStat *RespQueueStat) UnmarshalIProto(buf []byte) ([]byte, error)

type UpdQueueItem

UpdQueueItem - аналог структуры QueueItem для запроса ReqFullUpdate. Если Data == nil, данные не обновляются - только время активации. Чтобы передать пустой массив данных, надо указать []byte{}.

type UpdQueueItem struct {
    EventID EventID
    Data    []byte `iproto:"u16"` // Данные (до 4кБ)
}

func (UpdQueueItem) MarshalIProto
func (uqi UpdQueueItem) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto кодирует структуру UpdQueueItem. Если Data == nil, передаётся длина 0xFFFF.

func (*UpdQueueItem) UnmarshalIProto
func (uqi *UpdQueueItem) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto декодирует структуру UpdQueueItem. Если переданæ длина данных 0xFFFF, поле Data будет равняться nil.

addons

import "github.com/my-mail-ru/go-queuedproto/addons"

Package addons - поддержка перловых аддонов.

Использование

Для использования аддонов, нужно встроить в свою структуру события нужные типы аддонов:

type MyEvent struct {
	tp.BaseEvent
	addons.CreationTime
	addons.Producer
	UserID uint32
	MyData string
}

Высокоуровневая работа со структурами (выделение полей, являющихся аддонами, кеширование всей нужной метаинфы) реализована в пакете go-tp/encodings, здесь реализовано только низкоуровневое кодирование структур.

Некоторые аддоны поддерживают интерфейс Builder (он же - tp.Builder) для автозаполнения полей. Если используется один аддон, произойдёт автоматический проброс этого метода юзерской структуре, но если используется два и более аддонов с таким методом, магии не произойдёт (таковы правила встраивания структур в go), поэтому нужно это реализовать вручную:

func (ev *MyEvent) Build() error {
	ev.CreationTime.Build()    // для совсем простых билдеров ошибку можно не проверять
	return ev.Producer.Build()
}
Детали реализации

Встраивание в качестве механизма реализации аддонов выбрано для упрощения работы разработчика бизнес-логики. Имена типов и полей поможет подсказать IDE.

"Прозрачную" реализацию через рефлексию и автовызов Build (пока?) делать не стал - т.к. это видимый юзеру слой, в отличии от перебора полей, поддерживающих интерфейс PerlAddon (что является костылём изначально, включая перл).

Index

Constants

ID перловых аддонов (queuedproto.PerlAddonData.ID)

const (
    CreationTimeID = uint16(1) // время создания события
    RetryID        = uint16(2) // сохраняет в событии счётчик ретраев, при объявлении очереди указывается лимит ретраев, и задержка
    Retry2ID       = uint16(3) // два независимых счётчика ретраев
    ProducerID     = uint16(4) // в перле: UNKNOWN, UWSGI, TP, SCRIPT. пока при создании событий в go пишем сюда TP-GO, хотя это и некорректно (продьюсят не только обработчики очередей)
    CreatedAtID    = uint16(5) // время создания события в наносекундах (int64)
    MaxAddonID
)

DefaultProducer - Producer по умолчанию (для перлового формата, если подключён этот аддон)

TODO добавить стандартные константы для других источников событий.

const DefaultProducer = "TP-GO"

type Builder

Builder - для автозаполнения структур, используется в tp (копия интерфейса tp.Builder)

type Builder interface {
    Build() error
}

type CreatedAt

CreatedAt - время создания события (заполняется автоматически текущим временем)

type CreatedAt int64

func (CreatedAt) AddonID
func (CreatedAt) AddonID() uint16

AddonID - возвращает CreatedAtID (5)

func (*CreatedAt) Build
func (ca *CreatedAt) Build() error

Build записывает в *ca текущее время в наносекундах, если там дефолтное значение (0).

func (CreatedAt) GetCreatedAt
func (ca CreatedAt) GetCreatedAt() CreatedAt

GetCreatedAt возвращает значение CreatedAt, для которого он вызван. Использовать для получения времени создания из произвольной структуры события со встроенным CreatedAt.

func (CreatedAt) MarshalAddon
func (ca CreatedAt) MarshalAddon() ([]byte, error)

MarshalAddon кодирует данные аддона

func (CreatedAt) MarshalIProto
func (CreatedAt) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto - no op (чтобы данные аддона не попали в стандартный payload).

TODO выкинуть эти методы, когда библиотека iproto сможет парсить директивы встроенных в структуру типов.

func (*CreatedAt) UnmarshalAddon
func (ca *CreatedAt) UnmarshalAddon(data []byte) error

UnmarshalAddon декодирует данные аддона

func (CreatedAt) UnmarshalIProto
func (CreatedAt) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto - no op (не пытаемся читать данные из стандартного payload).

type CreationTime

CreationTime - время создания события (заполняется автоматически текущим временем)

type CreationTime uint32

func (CreationTime) AddonID
func (CreationTime) AddonID() uint16

AddonID - возвращает CreationTimeID (1)

func (*CreationTime) Build
func (ct *CreationTime) Build() error

Build записывает в *ct текущее время, если там дефолтное значение (0).

func (CreationTime) GetCreationTime
func (ct CreationTime) GetCreationTime() CreationTime

GetCreationTime возвращает значение CreationTime, для которого он вызван. Использовать для получения времени создания из произвольной структуры события со встроенным CreationTime.

func (CreationTime) MarshalAddon
func (ct CreationTime) MarshalAddon() ([]byte, error)

MarshalAddon кодирует данные аддона

func (CreationTime) MarshalIProto
func (CreationTime) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto - no op (чтобы данные аддона не попали в стандартный payload).

TODO выкинуть эти методы, когда библиотека iproto сможет парсить директивы встроенных в структуру типов.

func (*CreationTime) UnmarshalAddon
func (ct *CreationTime) UnmarshalAddon(data []byte) error

UnmarshalAddon декодирует данные аддона

func (CreationTime) UnmarshalIProto
func (CreationTime) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto - no op (не пытаемся читать данные из стандартного payload).

type PerlAddon

PerlAddon - интерфейс всех перловых аддонов с поддержкой специфических для аддонов де/кодирующих функций

type PerlAddon interface {
    MarshalAddon() ([]byte, error) // не стал делать encoding.BinaryMarshaler, во избежание проброса на уровень юзерской структуры
    UnmarshalAddon([]byte) error
    AddonID() uint16
}

type Producer

Producer - имя источника события (для статы/метрик, заполняется автоматически значением "TP-GO").

Для отключения автозаполнения Producer, либо задания своего значения, проще всего определить свой тип:

type MyProducer struct {
	addons.Producer
}

func (mp *MyProducer) Build() error {
	mp.Producer = "GO-REST"
	return nil
}

type MyEvent struct {
	MyProducer
	...
}
type Producer string

func (Producer) AddonID
func (Producer) AddonID() uint16

AddonID - возвращает ProducerID (4)

func (*Producer) Build
func (p *Producer) Build() error

Build записывает в *p значение DefaultProducer, если в *p дефолтное значение (пустая строка).

func (Producer) MarshalAddon
func (p Producer) MarshalAddon() ([]byte, error)

MarshalAddon кодирует данные аддона

func (Producer) MarshalIProto
func (Producer) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto - no op (чтобы данные аддона не попали в стандартный payload).

TODO выкинуть эти методы, когда библиотека iproto сможет парсить директивы встроенных в структуру типов.

func (*Producer) UnmarshalAddon
func (p *Producer) UnmarshalAddon(data []byte) error

UnmarshalAddon декодирует данные аддона

func (Producer) UnmarshalIProto
func (Producer) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto - no op (не пытаемся читать данные из стандартного payload).

type Retryable

Retryable - счётчик повторных попыток обработки события.

type Retryable struct {
    // contains filtered or unexported fields
}

func (Retryable) AddonID
func (Retryable) AddonID() uint16

AddonID - возвращает RetryID (2)

func (Retryable) GetRetryCount
func (r Retryable) GetRetryCount() uint32

GetRetryCount возвращает счётчик повторных попыток обработки. Для объектов, не полученных при помощи Retryable.UnmarshalAddon, возвращает 0.

func (Retryable) IncRetryCount
func (r Retryable) IncRetryCount()

IncRetryCount увеличивает счётчик повторных обработок. Для использования из tp. Не следует вызывать этот метод из кода обработчиков. Для объектов, не полученных при помощи Retryable.UnmarshalAddon, не делает ничего.

func (Retryable) MarshalAddon
func (r Retryable) MarshalAddon() ([]byte, error)

MarshalAddon кодирует данные аддона.

func (Retryable) NeedRetry
func (r Retryable) NeedRetry() bool

NeedRetry возвращает признак необходимости повторной обработки. Для объектов, не полученных при помощи Retryable.UnmarshalAddon, возвращает false.

func (Retryable) Retry
func (r Retryable) Retry()

Retry помечает событие подлежащим повторной обработке. Вызывать из обработчиков очередей в случае возникновения ошибки с ограниченным кол-вом повторов. Для объектов, не полученных при помощи Retryable.UnmarshalAddon, не делает ничего.

func (*Retryable) UnmarshalAddon
func (r *Retryable) UnmarshalAddon(data []byte) error

UnmarshalAddon декодирует данные аддона.

type Retryable2

Retryable2 - два независимых счётчика повторных попыток обработки события.

type Retryable2 struct {
    // contains filtered or unexported fields
}

func (Retryable2) AddonID
func (Retryable2) AddonID() uint16

AddonID - возвращает Retry2ID (3)

func (Retryable2) GetRetry1Count
func (r Retryable2) GetRetry1Count() uint32

GetRetry1Count возвращает первый счётчик повторных попыток обработки. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, возвращает 0.

func (Retryable2) GetRetry2Count
func (r Retryable2) GetRetry2Count() uint32

GetRetry2Count возвращает второй счётчик повторных попыток обработки. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, возвращает 0.

func (Retryable2) IncRetry1Count
func (r Retryable2) IncRetry1Count()

IncRetry1Count увеличивает первый счётчик повторных попыток обработки. Для использования из tp. Не следует вызывать этот метод из кода обработчиков. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, не делает ничего.

func (Retryable2) IncRetry2Count
func (r Retryable2) IncRetry2Count()

IncRetry2Count увеличивает второй счётчик повторных попыток обработки. Для использования из tp. Не следует вызывать этот метод из кода обработчиков. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, не делает ничего.

func (Retryable2) MarshalAddon
func (r Retryable2) MarshalAddon() ([]byte, error)

MarshalAddon кодирует данные аддона.

func (Retryable2) NeedRetry1
func (r Retryable2) NeedRetry1() bool

NeedRetry1 возвращает первый признак необходимости повторной обработки. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, возвращает false.

func (Retryable2) NeedRetry2
func (r Retryable2) NeedRetry2() bool

NeedRetry2 возвращает второй признак необходимости повторной обработки. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, возвращает false.

func (Retryable2) Retry1
func (r Retryable2) Retry1()

Retry1 помечает событие подлежащим повторной обработке с учётом первого счётчика. Вызывать из обработчиков очередей в случае возникновения ошибки с ограниченным кол-вом повторов. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, не делает ничего.

func (Retryable2) Retry2
func (r Retryable2) Retry2()

Retry2 помечает событие подлежащим повторной обработке с учётом второго счётчика. Вызывать из обработчиков очередей в случае возникновения ошибки с ограниченным кол-вом повторов. Для объектов, не полученных при помощи Retryable2.UnmarshalAddon, не делает ничего.

func (*Retryable2) UnmarshalAddon
func (r *Retryable2) UnmarshalAddon(data []byte) error

UnmarshalAddon декодирует данные аддона.

Generated by gomarkdoc

Documentation

Overview

Package queuedproto - поддержка протокола queued.

Кроме структур протокола самого queued, поддерживаются форматы перловых очередей. Perl addons (используются во всех современных проектах на perl) поддерживаются, perl extensions (используются только в одном крупном старом проекте) - нет.

Типы/константы названы так же, как в исходниках queued. В перле используется другая терминология.

Index

Constants

View Source
const (
	CmdAddItem      = uint32(20)
	CmdGetActive    = uint32(21)
	CmdDeleteItems  = uint32(22)
	CmdUpdateItems  = uint32(24)
	CmdGetItems     = uint32(28)
	CmdFullUpdate   = uint32(30)
	CmdAddData      = uint32(34)
	CmdGetQueueStat = uint32(36)
)

Коды команд queued

View Source
const (
	// errors
	RcOK                  = uint8(0)
	RcLogicError          = uint8(1)
	RcWrongItem           = uint8(2)
	RcMemAllocationFailed = uint8(3)
	RcUnknownQueueType    = uint8(4)
	RcReqAlreadyProcessed = uint8(8)
	RcItemLocked          = uint8(9)
	RcWrongRequest        = uint8(10)
	RcBadRequestLength    = uint8(11)
	RcWrongVersion        = uint8(12)

	// warnings
	RcNoActiveItems           = uint8(5)
	RcInsufficientActiveCount = uint8(6)
	RcWrongID                 = uint8(7)
)

Коды ответов queued

View Source
const (
	FlagEnableExtensions = uint8(0x80)
	FlagEncodeInUTF8     = uint8(0x40)
	FlagEnableAddons     = uint8(0x20)

	VersionMask = ^(FlagEnableExtensions | FlagEncodeInUTF8 | FlagEnableAddons)
)

Флаги расширений, указываются в поле Version

View Source
const (
	TimestampAbsolute      = uint8(0) // указанный UnixTime - абсолютный (секунды с 1970-01-01T00:00:00Z)
	TimestampRelativePlus  = uint8(1) // в Unixtime относительное время, которое необходимо прибавить к текущему серверному времени
	TimestampRelativeMinus = uint8(2) // в Unixtime относительное время, которое необходимо отнять от текущего серверного времени
)

Типы времени для команды CmdUpdateItems

View Source
const FlagIgnoreWrongItem = uint32(1)

FlagIgnoreWrongItem - флаг команды CmdAddItem

View Source
const SkipTimeUpdate = 0xFFFFFFFF

SkipTimeUpdate - для запроса ReqFullUpdate - magic value для UnixTime, в случае передачи этого значения время не изменяется.

Variables

View Source
var (
	ErrQueued              = errors.New("queued error")
	ErrLogic               = fmt.Errorf("%w: logic error", ErrQueued)
	ErrWrongItem           = fmt.Errorf("%w: wrong item", ErrQueued)
	ErrMemAllocationFailed = fmt.Errorf("%w: queued memory allocation failure", ErrQueued)
	ErrUnknownQueueType    = fmt.Errorf("%w: unknown queue type", ErrQueued)
	ErrReqAlreadyProcessed = fmt.Errorf("%w: request has been already processed (duplicate ReqID)", ErrQueued)
	ErrItemLocked          = fmt.Errorf("%w: item locked", ErrQueued)
	ErrWrongRequest        = fmt.Errorf("%w: wrong request", ErrQueued)
	ErrBadRequestLength    = fmt.Errorf("%w: bad request length", ErrQueued)
	ErrWrongVersion        = fmt.Errorf("%w: wrong version", ErrQueued)
)

Протокольные ошибки queued

Functions

func ErrorByRetCode

func ErrorByRetCode(rc uint8) error

ErrorByRetCode - возвращает ошибку по коду, для неизвестных ошибок в тексте сообщается код. Для RcOK и всех ворнингов возвращается nil, если ворнинги нужны - их нужно обрабатывать вручную.

func IsSoftError

func IsSoftError(err error) bool

IsSoftError сообщает, является ли ошибка логической - т.е. не связанной с низкоуровневыми проблемами (разрывы соединения, таймауты, блокировки, сбои сервера).

Все неизвестные протокольные ошибки считаются "жёсткими".

Запросы с логическими ошибками повторять не надо, с низкоуровневыми - можно/нужно.

Можно передавать также обёрнутые ошибки.

Types

type EventID

type EventID struct {
	Shard uint `iproto:"-"`
	ID    uint64
}

EventID - составной ID с номером шарда. Нужен, т.к. протокол (и сам queued) не поддерживает шардинг. Связка Shard:ID уникальна, тогда как сам по себе ID в списке событий может быть неуникальным, если события пришли из разных шардов. Такое возможно только при автогенерации ID событий на стороне queued.

В структурах запросах этот тип используется только для слайсов (чтобы не перелопачивать слайсы целиком из параметров метода, которому номер шарда должен как-то поступать). Скалярные айдишники надо будет скопировать (только ID, передающийся по прококолу).

func (EventID) MarshalIProto

func (recvEventID EventID) MarshalIProto(buf []byte) ([]byte, error)

func (EventID) MarshalText

func (eid EventID) MarshalText() ([]byte, error)

MarshalText - для упрощения логгирования списков событий при помощи zerolog.Event.Interface (не нужно переваливать слайс айдишников событий в слайс строк/стрингеров).

func (EventID) String

func (eid EventID) String() string

String возвращает EventID в перловом текстовом формате

func (*EventID) UnmarshalIProto

func (recv_EventID *EventID) UnmarshalIProto(buf []byte) ([]byte, error)

type ItemList

type ItemList struct {
	Items []QueueItem `iproto:"u32"`
}

ItemList - список событий

func (ItemList) MarshalIProto

func (recvItemList ItemList) MarshalIProto(buf []byte) ([]byte, error)

func (*ItemList) UnmarshalIProto

func (recv_ItemList *ItemList) UnmarshalIProto(buf []byte) ([]byte, error)

type PerlAddonData

type PerlAddonData struct {
	ID   uint16 // addons.CreationTimeID, addons.RetryID, addons.Retry2ID или addons.ProducerID
	Data []byte `iproto:"u8"`
}

PerlAddonData - данные аддона. Константы ID и структуры формата данных оперделены в пакете addons

type PerlAddons

type PerlAddons struct {
	List []PerlAddonData `iproto:"u8"`
}

PerlAddons - список аддонов

func (PerlAddons) MarshalIProto

func (recvPerlAddons PerlAddons) MarshalIProto(buf []byte) ([]byte, error)

func (*PerlAddons) UnmarshalIProto

func (recv_PerlAddons *PerlAddons) UnmarshalIProto(buf []byte) ([]byte, error)

type PerlData

type PerlData struct {
	Version    uint8          // версия (содержит флаги FlagEnableExtensions, FlagEncodeInUTF8, FlagEnableAddons)
	Addons     PerlAddons     // список данных аддонов  (если установлен FlagEnableAddons - иначе поле не передаётся!)
	Extensions PerlExtensions // данные расширения (если установлен FlagEnableExtensions - иначе поле не передаётся!)
	Data       []byte         // данные в формате perl pack
}

PerlData - событие очереди в перловом формате.

Аддоны декодируются, только если установлен флаг FlagEnableAddons, иначе Addons.List устанавливается в nil. При кодировании, если len(Addons.List) != 0, они кодируются, и автоматически устанавливается флаг FlagEnableAddons.

Extensions - аналогично, однако их внутренности (пока) не обрабатываются (просто декодируются, как слайс байтов).

При кодировании флаг FlagEncodeInUTF8 устанавливается автоматически всегда.

func (PerlData) MarshalIProto

func (pd PerlData) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto кодирует данные в перловом формате, передавая необязательные поля, только если они заданы. В этом случае устанавливаются соответствующие им флаги (FlagEnableAddons - для Addons, FlagEnableExtensions - для Extensions).

func (*PerlData) UnmarshalIProto

func (pd *PerlData) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto декодирует данные в перловом формате, обрабатывая необязательные поля, только если установлены соответствующие им флаги (FlagEnableAddons - для Addons, FlagEnableExtensions - для Extensions).

Если флаг для необязательного поля не установлен, оно сбрасывается в дефлотное значение, так сделано, чтобы не прилетели данные от предыдущей записи в случае переиспользования объекта.

type PerlExtensions

type PerlExtensions struct {
	Flags uint32
	Data  []byte `iproto:"u32"`
}

PerlExtensions - данные расширений (пока не обрабатываются, де/кодируются как слайс байтов)

func (PerlExtensions) MarshalIProto

func (recvPerlExtensions PerlExtensions) MarshalIProto(buf []byte) ([]byte, error)

func (*PerlExtensions) UnmarshalIProto

func (recv_PerlExtensions *PerlExtensions) UnmarshalIProto(buf []byte) ([]byte, error)

type QueueItem

type QueueItem struct {
	EventID EventID
	Data    []byte `iproto:"u16"` // Данные (до 4кБ)
}

QueueItem - событие в очереди

type ReqAddData

type ReqAddData struct {
	StorageType uint16 // номер очереди
	ID          uint64 // ID изменяемого события
	Data        []byte `iproto:"u16"` // дописываемые данные (в сумме с существующими должно быть до 4кБ)
}

ReqAddData - формат tuple запроса на добавление данных в конец события

func (ReqAddData) Cmd

func (ReqAddData) Cmd() uint32

Cmd - команда queued: CmdAddData (34)

func (ReqAddData) MarshalIProto

func (recvReqAddData ReqAddData) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqAddData) UnmarshalIProto

func (recv_ReqAddData *ReqAddData) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqAddItem

type ReqAddItem struct {
	Pid         uint32 // pid клиента
	ReqID       uint32 // номер запроса, в случае таймаута на сервере при перепосылке запроса ReqID 2й попытки должен совпадать с ReqID первой попытки
	StorageType uint16 // номер очереди
	ID          uint64 // ID события, 0 - queued сам назначает ID (автоинкремент)
	UnixTime    uint32 // время активации события
	Data        []byte `iproto:"u16"` // данные события, до 4096 байт
	Flags       uint32 `iproto:"-"`   // FlagIgnoreWrongItem. Поле опциональное (передаётся, только если не 0)
}

ReqAddItem - формат tuple запроса на создание новой записи в очереди.

func (ReqAddItem) Cmd

func (ReqAddItem) Cmd() uint32

Cmd - команда queued: CmdAddItem (20)

func (ReqAddItem) MarshalIProto

func (req ReqAddItem) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto кодирует запрос на создание новой записи в очереди.

Кастомный маршалер необходим из-за опциональности поля Flags.

func (*ReqAddItem) UnmarshalIProto

func (req *ReqAddItem) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto декодирует запрос на создание новой записи в очереди.

Метод объявлен чисто для симметрии маршалеров, для проектов клиентов/обработчиков очередей он не нужен. Пригодится для тестов, прокси iproto/queued->grpc, ну и для гошной версии queued :)

type ReqDeleteItems

type ReqDeleteItems struct {
	StorageType uint16   // номер очереди
	IDs         []uint64 `iproto:"u32"`
}

ReqDeleteItems - формат tuple запроса на удаление событий по списку ID

func (ReqDeleteItems) Cmd

func (ReqDeleteItems) Cmd() uint32

Cmd - команда queued: CmdDeleteItems (22)

func (ReqDeleteItems) MarshalIProto

func (recvReqDeleteItems ReqDeleteItems) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqDeleteItems) UnmarshalIProto

func (recv_ReqDeleteItems *ReqDeleteItems) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqFullUpdate

type ReqFullUpdate struct {
	StorageType uint16         // номер очереди
	UnixTime    uint32         // новое время активации обновлённых событий, если [SkipTimeUpdate] (0xFFFFFFFF), время не обновляется
	Items       []UpdQueueItem `iproto:"u32"`
}

ReqFullUpdate - формат tuple запроса на обновление нескольких событий в очереди

func (ReqFullUpdate) Cmd

func (ReqFullUpdate) Cmd() uint32

Cmd - команда queued: CmdFullUpdate (30)

func (ReqFullUpdate) MarshalIProto

func (recvReqFullUpdate ReqFullUpdate) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqFullUpdate) UnmarshalIProto

func (recv_ReqFullUpdate *ReqFullUpdate) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqGetActive

type ReqGetActive struct {
	StorageType uint16 // номер очереди
	Count       uint32 // количество событий в ответе
}

ReqGetActive - формат tuple запроса на получение Count активных событий из очереди

func (ReqGetActive) Cmd

func (ReqGetActive) Cmd() uint32

Cmd - команда queued: CmdGetActive (21)

func (ReqGetActive) MarshalIProto

func (recvReqGetActive ReqGetActive) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqGetActive) UnmarshalIProto

func (recv_ReqGetActive *ReqGetActive) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqGetItems

type ReqGetItems struct {
	StorageType uint16   // номер очереди
	IDs         []uint64 `iproto:"u32"`
}

ReqGetItems - формат tuple запроса на получение событий по ID

func (ReqGetItems) Cmd

func (ReqGetItems) Cmd() uint32

Cmd - команда queued: CmdGetItems (28)

func (ReqGetItems) MarshalIProto

func (recvReqGetItems ReqGetItems) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqGetItems) UnmarshalIProto

func (recv_ReqGetItems *ReqGetItems) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqQueueStat

type ReqQueueStat struct {
	Version     uint8
	StorageType uint16
}

ReqQueueStat - получение статистики

func (ReqQueueStat) Cmd

func (ReqQueueStat) Cmd() uint32

Cmd - команда queued: CmdGetQueueStat (36)

func (ReqQueueStat) MarshalIProto

func (recvReqQueueStat ReqQueueStat) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqQueueStat) UnmarshalIProto

func (recv_ReqQueueStat *ReqQueueStat) UnmarshalIProto(buf []byte) ([]byte, error)

type ReqUpdateItems

type ReqUpdateItems struct {
	StorageType   uint16   // номер очереди
	UnixTime      uint32   // новое время активации для всех указанных событий
	IDs           []uint64 `iproto:"u32"`
	TimestampType uint8    // TimestampAbsolute, TimestampRelativePlus или TimestampRelativeMinus
}

ReqUpdateItems - формат tuple запроса на обновление времени срабатывания события

func (ReqUpdateItems) Cmd

func (ReqUpdateItems) Cmd() uint32

Cmd - команда queued: CmdUpdateItems(24)

func (ReqUpdateItems) MarshalIProto

func (recvReqUpdateItems ReqUpdateItems) MarshalIProto(buf []byte) ([]byte, error)

func (*ReqUpdateItems) UnmarshalIProto

func (recv_ReqUpdateItems *ReqUpdateItems) UnmarshalIProto(buf []byte) ([]byte, error)

type Request

type Request interface {
	iproto.Marshaler
	Cmd() uint32
}

Request - структуры команд протокола должны поддерживать этот интерфейс (возвращать код команды методом Cmd)

type RespQueueStat

type RespQueueStat struct {
	ItemsCount  uint32 // кол-во событий в очереди
	ActiveCount uint32 // кол-во активных событий в очереди
	LockedCount uint32 // кол-во заблокированных событий в очереди
}

RespQueueStat - ответ на команду запроса статистики

func (RespQueueStat) MarshalIProto

func (recvRespQueueStat RespQueueStat) MarshalIProto(buf []byte) ([]byte, error)

func (RespQueueStat) String

func (i RespQueueStat) String() string

String возвращает статистику в формате строки

func (*RespQueueStat) UnmarshalIProto

func (recv_RespQueueStat *RespQueueStat) UnmarshalIProto(buf []byte) ([]byte, error)

type UpdQueueItem

type UpdQueueItem struct {
	EventID EventID
	Data    []byte `iproto:"u16"` // Данные (до 4кБ)
}

UpdQueueItem - аналог структуры QueueItem для запроса ReqFullUpdate. Если Data == nil, данные не обновляются - только время активации. Чтобы передать пустой массив данных, надо указать []byte{}.

func (UpdQueueItem) MarshalIProto

func (uqi UpdQueueItem) MarshalIProto(buf []byte) ([]byte, error)

MarshalIProto кодирует структуру UpdQueueItem. Если Data == nil, передаётся длина 0xFFFF.

func (*UpdQueueItem) UnmarshalIProto

func (uqi *UpdQueueItem) UnmarshalIProto(buf []byte) ([]byte, error)

UnmarshalIProto декодирует структуру UpdQueueItem. Если переданæ длина данных 0xFFFF, поле Data будет равняться nil.

Directories

Path Synopsis
Package addons - поддержка перловых аддонов.
Package addons - поддержка перловых аддонов.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL