diff --git a/core/integration/trg/ctp_status_reader.go b/core/integration/trg/ctp_status_reader.go new file mode 100644 index 00000000..37de809c --- /dev/null +++ b/core/integration/trg/ctp_status_reader.go @@ -0,0 +1,72 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2026 CERN and copyright holders of ALICE O². + * Author: Piotr Konopka + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package trg + +import ( + "context" + "fmt" + + trgpb "github.com/AliceO2Group/Control/core/integration/trg/protos" + "github.com/segmentio/kafka-go" + "github.com/spf13/viper" + "google.golang.org/protobuf/proto" +) + +// CtpStatusReader reads CTP status messages from Kafka +type CtpStatusReader struct { + *kafka.Reader +} + +// NewCtpStatusReader creates a new reader for CTP status messages +func NewCtpStatusReader(topic string, groupID string) *CtpStatusReader { + cfg := kafka.ReaderConfig{ + Brokers: viper.GetStringSlice("kafkaEndpoints"), + Topic: topic, + GroupID: groupID, + MinBytes: 1, + MaxBytes: 10e7, + } + + return &CtpStatusReader{ + Reader: kafka.NewReader(cfg), + } +} + +// Next reads the next CTP status message from Kafka +func (r *CtpStatusReader) Next(ctx context.Context) (*trgpb.Status, error) { + if r == nil { + return nil, fmt.Errorf("nil reader") + } + msg, err := r.ReadMessage(ctx) + if err != nil { + return nil, err + } + + var status trgpb.Status + if err := proto.Unmarshal(msg.Value, &status); err != nil { + return nil, fmt.Errorf("failed to unmarshal CTP status message: %w", err) + } + return &status, nil +} diff --git a/core/integration/trg/plugin.go b/core/integration/trg/plugin.go index 556706f9..e1e80fbf 100644 --- a/core/integration/trg/plugin.go +++ b/core/integration/trg/plugin.go @@ -24,6 +24,7 @@ */ //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/ctpecs.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/ctp_status.proto // Package trg provides integration with the ALICE trigger system. package trg @@ -31,7 +32,9 @@ package trg import ( "context" "encoding/json" + "errors" "fmt" + "io" "net/url" "strconv" "strings" @@ -64,25 +67,37 @@ const ( TOPIC = topic.IntegratedService + topic.Separator + "trg" ) +var ctpStatusTopic topic.Topic = "ctp.status" + type Plugin struct { trgHost string trgPort int - trgClient *RpcClient + trgClient *RpcClient + trgClientCancel context.CancelFunc + + kafkaReader *CtpStatusReader + kafkaReaderCtx context.Context + kafkaReaderCancel context.CancelFunc pendingRunStops map[string] /*envId*/ int64 pendingRunUnloads map[string] /*envId*/ int64 - cachedStatus *TrgStatus - cachedStatusMu sync.RWMutex - cachedStatusCancelFunc context.CancelFunc + cachedStatus *TrgStatus + cachedStatusMu sync.RWMutex } type TrgStatus struct { + // Fields from TRG RunList queries RunCount int `json:"runCount,omitempty"` Lines []string `json:"lines,omitempty"` Structured Runs `json:"structured,omitempty"` EnvMap map[uid.ID]Run `json:"envMap,omitempty"` + + // Fields from CTP status Kafka messages + Clock string `json:"clock,omitempty"` + DetectorsInHoldover []string `json:"detectorsInHoldover,omitempty"` + ClockTransitionTimeExpected int64 `json:"clockTransitionTimeExpected,omitempty"` // nanoseconds since epoch, 0 if not expected } func NewPlugin(endpoint string) integration.Plugin { @@ -102,6 +117,7 @@ func NewPlugin(endpoint string) integration.Plugin { trgClient: nil, pendingRunStops: make(map[string]int64), pendingRunUnloads: make(map[string]int64), + cachedStatus: &TrgStatus{}, } } @@ -189,15 +205,11 @@ func (p *Plugin) queryRunList() { } } - out := &TrgStatus{ - RunCount: int(runReply.Rc), - Lines: strings.Split(runReply.Msg, "\n"), - Structured: structured, - EnvMap: envMap, - } - p.cachedStatusMu.Lock() - p.cachedStatus = out + p.cachedStatus.RunCount = int(runReply.Rc) + p.cachedStatus.Lines = strings.Split(runReply.Msg, "\n") + p.cachedStatus.Structured = structured + p.cachedStatus.EnvMap = envMap p.cachedStatusMu.Unlock() } @@ -231,10 +243,6 @@ func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string { defer p.cachedStatusMu.RUnlock() out := make(map[uid.ID]string) - - if p.cachedStatus == nil { - return nil - } envMap := p.cachedStatus.EnvMap for _, envId := range envIds { if run, ok := envMap[envId]; !ok { @@ -267,7 +275,7 @@ func (p *Plugin) Init(instanceId string) error { } var ctx context.Context - ctx, p.cachedStatusCancelFunc = context.WithCancel(context.Background()) + ctx, p.trgClientCancel = context.WithCancel(context.Background()) trgPollingInterval := viper.GetDuration("trgPollingInterval") @@ -283,6 +291,21 @@ func (p *Plugin) Init(instanceId string) error { } } }() + + // Initialize CTP status reader + p.kafkaReaderCtx, p.kafkaReaderCancel = context.WithCancel(context.Background()) + p.kafkaReader = NewCtpStatusReader(string(ctpStatusTopic), "o2-aliecs-core.trg") + if p.kafkaReader != nil { + log.WithField("level", infologger.IL_Devel).Info("TRG plugin: draining CTP status backlog") + p.drainCtpStatusBacklog(2 * time.Second) + + // Start reading CTP status updates + go p.readCtpStatusUpdates() + } else { + log.WithField("level", infologger.IL_Support). + Warn("could not create CTP status reader, CTP status monitoring disabled") + } + return nil } @@ -1434,7 +1457,104 @@ func (p *Plugin) parseDetectors(ctsDetectorsParam string) (detectors string, err return } +func (p *Plugin) drainCtpStatusBacklog(timeout time.Duration) { + drainCtx, cancel := context.WithTimeout(p.kafkaReaderCtx, timeout) + defer cancel() + for { + ctpStatus, err := p.kafkaReader.Next(drainCtx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + break + } + // transient error: small sleep and continue until timeout + time.Sleep(50 * time.Millisecond) + continue + } + if ctpStatus == nil { + continue + } + + detectorsInHoldover := extractDetectorsInHoldover(ctpStatus) + clockTransitionTimeExpected := extractClockTransitionExpected(ctpStatus) + currentClock := ctpStatus.Clock.String() + + p.cachedStatusMu.Lock() + p.cachedStatus.Clock = currentClock + p.cachedStatus.DetectorsInHoldover = detectorsInHoldover + p.cachedStatus.ClockTransitionTimeExpected = clockTransitionTimeExpected + p.cachedStatusMu.Unlock() + } +} + +func extractClockTransitionExpected(ctpStatus *trgpb.Status) int64 { + var clockTransitionTimeExpected int64 + if ctpStatus.ClockTransitionExpected > 0 { + clockTransitionTimeExpected = int64(ctpStatus.TimestampNano) + int64(ctpStatus.ClockTransitionExpected)*1e9 + } + return clockTransitionTimeExpected +} + +func extractDetectorsInHoldover(ctpStatus *trgpb.Status) []string { + var detectorsInHoldover []string + for _, detInfo := range ctpStatus.DetectorInfo { + if detInfo.HoldoverOngoing { + detectorsInHoldover = append(detectorsInHoldover, detInfo.Detector.String()) + } + } + return detectorsInHoldover +} + +func (p *Plugin) readCtpStatusUpdates() { + for { + ctpStatus, err := p.kafkaReader.Next(p.kafkaReaderCtx) + if errors.Is(err, io.EOF) { + log.WithField(infologger.Level, infologger.IL_Support). + Debug("received EOF from CTP status reader, likely reading was cancelled, exiting") + break + } + if err != nil { + log.WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Error("error while reading CTP status from Kafka") + time.Sleep(time.Second * 1) // throttle to avoid spamming infologger + continue + } + if ctpStatus == nil { + continue + } + + detectorsInHoldover := extractDetectorsInHoldover(ctpStatus) + clockTransitionTimeExpected := extractClockTransitionExpected(ctpStatus) + currentClock := ctpStatus.Clock.String() + + p.cachedStatusMu.Lock() + previousClockTransitionTime := p.cachedStatus.ClockTransitionTimeExpected + p.cachedStatus.Clock = currentClock + p.cachedStatus.DetectorsInHoldover = detectorsInHoldover + p.cachedStatus.ClockTransitionTimeExpected = clockTransitionTimeExpected + p.cachedStatusMu.Unlock() + + if ctpStatus.ClockTransitionExpected > 0 && len(detectorsInHoldover) > 0 { + detectorsStr := strings.Join(detectorsInHoldover, ", ") + log.WithField(infologger.Level, infologger.IL_Ops). + Warnf("CTP clock transition expected in less than %ds for the detectors: %s. Starting a triggered run with these detectors may cause instabilities", + ctpStatus.ClockTransitionExpected, detectorsStr) + } else if ctpStatus.ClockTransitionExpected == 0 && previousClockTransitionTime > 0 { + log.WithField(infologger.Level, infologger.IL_Ops). + Info("CTP clock transition has just been performed") + } + } +} + func (p *Plugin) Destroy() error { - p.cachedStatusCancelFunc() + p.trgClientCancel() + + if p.kafkaReaderCancel != nil { + p.kafkaReaderCancel() + } + if p.kafkaReader != nil { + _ = p.kafkaReader.Close() + } + return p.trgClient.Close() } diff --git a/core/integration/trg/protos/ctp_status.pb.go b/core/integration/trg/protos/ctp_status.pb.go new file mode 100644 index 00000000..39bd8c3d --- /dev/null +++ b/core/integration/trg/protos/ctp_status.pb.go @@ -0,0 +1,387 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.1 +// protoc v5.29.3 +// source: protos/ctp_status.proto + +package ctpecs + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Detector int32 + +const ( + Detector_NULL_DETECTOR Detector = 0 + Detector_ITS Detector = 6 + Detector_MCH Detector = 8 + Detector_MFT Detector = 9 + Detector_TPC Detector = 13 +) + +// Enum value maps for Detector. +var ( + Detector_name = map[int32]string{ + 0: "NULL_DETECTOR", + 6: "ITS", + 8: "MCH", + 9: "MFT", + 13: "TPC", + } + Detector_value = map[string]int32{ + "NULL_DETECTOR": 0, + "ITS": 6, + "MCH": 8, + "MFT": 9, + "TPC": 13, + } +) + +func (x Detector) Enum() *Detector { + p := new(Detector) + *p = x + return p +} + +func (x Detector) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Detector) Descriptor() protoreflect.EnumDescriptor { + return file_protos_ctp_status_proto_enumTypes[0].Descriptor() +} + +func (Detector) Type() protoreflect.EnumType { + return &file_protos_ctp_status_proto_enumTypes[0] +} + +func (x Detector) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Detector.Descriptor instead. +func (Detector) EnumDescriptor() ([]byte, []int) { + return file_protos_ctp_status_proto_rawDescGZIP(), []int{0} +} + +type Clock int32 + +const ( + Clock_NULL Clock = 0 + Clock_LOCAL Clock = 1 + Clock_BEAM1 Clock = 2 +) + +// Enum value maps for Clock. +var ( + Clock_name = map[int32]string{ + 0: "NULL", + 1: "LOCAL", + 2: "BEAM1", + } + Clock_value = map[string]int32{ + "NULL": 0, + "LOCAL": 1, + "BEAM1": 2, + } +) + +func (x Clock) Enum() *Clock { + p := new(Clock) + *p = x + return p +} + +func (x Clock) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Clock) Descriptor() protoreflect.EnumDescriptor { + return file_protos_ctp_status_proto_enumTypes[1].Descriptor() +} + +func (Clock) Type() protoreflect.EnumType { + return &file_protos_ctp_status_proto_enumTypes[1] +} + +func (x Clock) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Clock.Descriptor instead. +func (Clock) EnumDescriptor() ([]byte, []int) { + return file_protos_ctp_status_proto_rawDescGZIP(), []int{1} +} + +type DetectorInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Detector Detector `protobuf:"varint,1,opt,name=detector,proto3,enum=ctp_status.Detector" json:"detector,omitempty"` + HoldoverOngoing bool `protobuf:"varint,2,opt,name=holdover_ongoing,json=holdoverOngoing,proto3" json:"holdover_ongoing,omitempty"` // detector:holdover, holdover=True if holdover ON or holdover OFF not finished, holdover=False if holdover procedure finished +} + +func (x *DetectorInfo) Reset() { + *x = DetectorInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_ctp_status_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DetectorInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DetectorInfo) ProtoMessage() {} + +func (x *DetectorInfo) ProtoReflect() protoreflect.Message { + mi := &file_protos_ctp_status_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DetectorInfo.ProtoReflect.Descriptor instead. +func (*DetectorInfo) Descriptor() ([]byte, []int) { + return file_protos_ctp_status_proto_rawDescGZIP(), []int{0} +} + +func (x *DetectorInfo) GetDetector() Detector { + if x != nil { + return x.Detector + } + return Detector_NULL_DETECTOR +} + +func (x *DetectorInfo) GetHoldoverOngoing() bool { + if x != nil { + return x.HoldoverOngoing + } + return false +} + +type Status struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TimestampNano uint64 `protobuf:"varint,1,opt,name=timestamp_nano,json=timestampNano,proto3" json:"timestamp_nano,omitempty"` // nanoseconds since epoch or whatever precision you can offer + DetectorInfo []*DetectorInfo `protobuf:"bytes,2,rep,name=detector_info,json=detectorInfo,proto3" json:"detector_info,omitempty"` + Clock Clock `protobuf:"varint,3,opt,name=clock,proto3,enum=ctp_status.Clock" json:"clock,omitempty"` + ClockTransitionExpected uint32 `protobuf:"varint,4,opt,name=clock_transition_expected,json=clockTransitionExpected,proto3" json:"clock_transition_expected,omitempty"` // Integer, values only 90,60,30,0 + Ready bool `protobuf:"varint,5,opt,name=ready,proto3" json:"ready,omitempty"` // True if no holdover for any detector and clocktransition=0, False otherwise +} + +func (x *Status) Reset() { + *x = Status{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_ctp_status_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Status) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Status) ProtoMessage() {} + +func (x *Status) ProtoReflect() protoreflect.Message { + mi := &file_protos_ctp_status_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Status.ProtoReflect.Descriptor instead. +func (*Status) Descriptor() ([]byte, []int) { + return file_protos_ctp_status_proto_rawDescGZIP(), []int{1} +} + +func (x *Status) GetTimestampNano() uint64 { + if x != nil { + return x.TimestampNano + } + return 0 +} + +func (x *Status) GetDetectorInfo() []*DetectorInfo { + if x != nil { + return x.DetectorInfo + } + return nil +} + +func (x *Status) GetClock() Clock { + if x != nil { + return x.Clock + } + return Clock_NULL +} + +func (x *Status) GetClockTransitionExpected() uint32 { + if x != nil { + return x.ClockTransitionExpected + } + return 0 +} + +func (x *Status) GetReady() bool { + if x != nil { + return x.Ready + } + return false +} + +var File_protos_ctp_status_proto protoreflect.FileDescriptor + +var file_protos_ctp_status_proto_rawDesc = []byte{ + 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x63, 0x74, 0x70, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x74, 0x70, 0x5f, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x6b, 0x0a, 0x0c, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x30, 0x0a, 0x08, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x63, 0x74, 0x70, 0x5f, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x2e, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x08, 0x64, + 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x10, 0x68, 0x6f, 0x6c, 0x64, 0x6f, + 0x76, 0x65, 0x72, 0x5f, 0x6f, 0x6e, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x0f, 0x68, 0x6f, 0x6c, 0x64, 0x6f, 0x76, 0x65, 0x72, 0x4f, 0x6e, 0x67, 0x6f, 0x69, + 0x6e, 0x67, 0x22, 0xe9, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x25, 0x0a, + 0x0e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x6e, 0x61, 0x6e, 0x6f, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x4e, 0x61, 0x6e, 0x6f, 0x12, 0x3d, 0x0a, 0x0d, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, 0x72, + 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x63, 0x74, + 0x70, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0c, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x27, 0x0a, 0x05, 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x11, 0x2e, 0x63, 0x74, 0x70, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, + 0x43, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x05, 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x3a, 0x0a, 0x19, + 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x17, 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x45, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x2a, 0x83, + 0x01, 0x0a, 0x08, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x11, 0x0a, 0x0d, 0x4e, + 0x55, 0x4c, 0x4c, 0x5f, 0x44, 0x45, 0x54, 0x45, 0x43, 0x54, 0x4f, 0x52, 0x10, 0x00, 0x12, 0x07, + 0x0a, 0x03, 0x49, 0x54, 0x53, 0x10, 0x06, 0x12, 0x07, 0x0a, 0x03, 0x4d, 0x43, 0x48, 0x10, 0x08, + 0x12, 0x07, 0x0a, 0x03, 0x4d, 0x46, 0x54, 0x10, 0x09, 0x12, 0x07, 0x0a, 0x03, 0x54, 0x50, 0x43, + 0x10, 0x0d, 0x22, 0x04, 0x08, 0x01, 0x10, 0x01, 0x22, 0x04, 0x08, 0x02, 0x10, 0x02, 0x22, 0x04, + 0x08, 0x03, 0x10, 0x03, 0x22, 0x04, 0x08, 0x04, 0x10, 0x04, 0x22, 0x04, 0x08, 0x05, 0x10, 0x05, + 0x22, 0x04, 0x08, 0x07, 0x10, 0x07, 0x22, 0x04, 0x08, 0x0a, 0x10, 0x0a, 0x22, 0x04, 0x08, 0x0b, + 0x10, 0x0b, 0x22, 0x04, 0x08, 0x0c, 0x10, 0x0c, 0x22, 0x04, 0x08, 0x0e, 0x10, 0x0e, 0x22, 0x04, + 0x08, 0x0f, 0x10, 0x0f, 0x2a, 0x27, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x08, 0x0a, + 0x04, 0x4e, 0x55, 0x4c, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x4c, 0x4f, 0x43, 0x41, 0x4c, + 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x45, 0x41, 0x4d, 0x31, 0x10, 0x02, 0x42, 0x44, 0x5a, + 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x41, 0x6c, 0x69, 0x63, + 0x65, 0x4f, 0x32, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x2f, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, + 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2f, 0x74, 0x72, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x3b, 0x63, 0x74, 0x70, + 0x65, 0x63, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_protos_ctp_status_proto_rawDescOnce sync.Once + file_protos_ctp_status_proto_rawDescData = file_protos_ctp_status_proto_rawDesc +) + +func file_protos_ctp_status_proto_rawDescGZIP() []byte { + file_protos_ctp_status_proto_rawDescOnce.Do(func() { + file_protos_ctp_status_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_ctp_status_proto_rawDescData) + }) + return file_protos_ctp_status_proto_rawDescData +} + +var file_protos_ctp_status_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_protos_ctp_status_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_protos_ctp_status_proto_goTypes = []interface{}{ + (Detector)(0), // 0: ctp_status.Detector + (Clock)(0), // 1: ctp_status.Clock + (*DetectorInfo)(nil), // 2: ctp_status.DetectorInfo + (*Status)(nil), // 3: ctp_status.Status +} +var file_protos_ctp_status_proto_depIdxs = []int32{ + 0, // 0: ctp_status.DetectorInfo.detector:type_name -> ctp_status.Detector + 2, // 1: ctp_status.Status.detector_info:type_name -> ctp_status.DetectorInfo + 1, // 2: ctp_status.Status.clock:type_name -> ctp_status.Clock + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_protos_ctp_status_proto_init() } +func file_protos_ctp_status_proto_init() { + if File_protos_ctp_status_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_protos_ctp_status_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DetectorInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_ctp_status_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Status); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_protos_ctp_status_proto_rawDesc, + NumEnums: 2, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_protos_ctp_status_proto_goTypes, + DependencyIndexes: file_protos_ctp_status_proto_depIdxs, + EnumInfos: file_protos_ctp_status_proto_enumTypes, + MessageInfos: file_protos_ctp_status_proto_msgTypes, + }.Build() + File_protos_ctp_status_proto = out.File + file_protos_ctp_status_proto_rawDesc = nil + file_protos_ctp_status_proto_goTypes = nil + file_protos_ctp_status_proto_depIdxs = nil +} diff --git a/core/integration/trg/protos/ctp_status.proto b/core/integration/trg/protos/ctp_status.proto new file mode 100644 index 00000000..78b153d3 --- /dev/null +++ b/core/integration/trg/protos/ctp_status.proto @@ -0,0 +1,46 @@ +syntax="proto3"; + +package ctp_status; +option go_package = "github.com/AliceO2Group/Control/core/integration/trg/protos;ctpecs"; + + +enum Detector { +NULL_DETECTOR = 0; +reserved 1; // CPV was removed during YETS 2024-2025 +reserved 2; // EMC not using holdover +reserved 3; // FDD not using holdover +reserved 4; // FT0 not using holdover +reserved 5; // FV0 not using holdover +ITS = 6; +reserved 7; // HMP not using holdover +MCH = 8; +MFT = 9; +reserved 10; // MID not using holdover +reserved 11; // PHS was removed during YETS 2024-2025 +reserved 12; // TOF not using holdover +TPC = 13; +reserved 14; // TRD not using holdover +reserved 15; // ZDC not using holdover +} + + +message DetectorInfo { +Detector detector = 1; +bool holdover_ongoing = 2; // detector:holdover, holdover=True if holdover ON or holdover OFF not finished, holdover=False if holdover procedure finished +} + + +enum Clock { +NULL = 0; +LOCAL = 1; +BEAM1 = 2; +} + +message Status { // General status message for kafka + +uint64 timestamp_nano = 1; // nanoseconds since epoch or whatever precision you can offer +repeated DetectorInfo detector_info = 2; +Clock clock = 3; +uint32 clock_transition_expected = 4; // Integer, values only 90,60,30,0 +bool ready = 5; // True if no holdover for any detector and clocktransition=0, False otherwise +} \ No newline at end of file