Skip to content

Commit

Permalink
feat: support health checker for heartbeat
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Feb 21, 2024
1 parent 739bb75 commit 667fc5f
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 597 deletions.
45 changes: 8 additions & 37 deletions cmd/yurt-iot-dock/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,6 @@ func Run(opts *options.YurtIoTDockOptions, stopCh <-chan struct{}) {
setupLog.Error(err, "unable to create controller", "controller", "DeviceProfile")
os.Exit(1)
}
// dfs, err := controllers.NewDeviceProfileSyncer(mgr.GetClient(), opts, edgexdock)
// if err != nil {
// setupLog.Error(err, "unable to create syncer", "syncer", "DeviceProfile")
// os.Exit(1)
// }
// err = mgr.Add(dfs.NewDeviceProfileSyncerRunnable())
// if err != nil {
// setupLog.Error(err, "unable to create syncer runnable", "syncer", "DeviceProfile")
// os.Exit(1)
// }

// setup the Device Reconciler and Syncer
if err = (&controllers.DeviceReconciler{
Expand All @@ -145,18 +135,17 @@ func Run(opts *options.YurtIoTDockOptions, stopCh <-chan struct{}) {
setupLog.Error(err, "unable to create controller", "controller", "Device")
os.Exit(1)
}
ds, err := controllers.NewDeviceSyncer(mgr.GetClient(), opts, edgexdock)
if err != nil {
setupLog.Error(err, "unable to create syncer", "controller", "Device")
os.Exit(1)
}
err = mgr.Add(ds.NewDeviceSyncerRunnable())
if err != nil {
setupLog.Error(err, "unable to create syncer runnable", "syncer", "Device")

// setup the DeviceService Reconciler and Syncer
if err = (&controllers.DeviceServiceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr, opts, edgexdock); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DeviceService")
os.Exit(1)
}

setupLog.Info("[NewPulsimeter] run the pulsimeter1205")
setupLog.Info("[NewPulsimeter] run the pulsimeter controller")
pm, err := controllers.NewPulsimeter(mgr.GetClient(), opts, edgexdock)
if err != nil {
setupLog.Error(err, "unable to create plusimeter", "controller", "Device")
Expand All @@ -168,24 +157,6 @@ func Run(opts *options.YurtIoTDockOptions, stopCh <-chan struct{}) {
os.Exit(1)
}

// setup the DeviceService Reconciler and Syncer
if err = (&controllers.DeviceServiceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr, opts, edgexdock); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DeviceService")
os.Exit(1)
}
// dss, err := controllers.NewDeviceServiceSyncer(mgr.GetClient(), opts, edgexdock)
// if err != nil {
// setupLog.Error(err, "unable to create syncer", "syncer", "DeviceService")
// os.Exit(1)
// }
// err = mgr.Add(dss.NewDeviceServiceSyncerRunnable())
// if err != nil {
// setupLog.Error(err, "unable to create syncer runnable", "syncer", "DeviceService")
// os.Exit(1)
// }
// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
Expand Down
37 changes: 29 additions & 8 deletions cmd/yurt-iot-dock/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,24 @@ type YurtIoTDockOptions struct {
CoreDataAddr string
CoreMetadataAddr string
CoreCommandAddr string
RedisAddr string
RedisPort uint
MessageBusOptions MessageBusOptions
EdgeSyncPeriod uint
}

type MessageBusOptions struct {
// Host is the hostname or IP address of the messaging broker, if applicable.
Host string
// Port defines the port on which to access the message queue.
Port int
// Protocol indicates the protocol to use when accessing the message queue.
Protocol string
// Type indicates the message queue platform being used. eg. "redis" for Redis Pub/Sub
Type string
HeartbeatInterval int
// Name is the name of the message bus instance.
Name string
}

func NewYurtIoTDockOptions() *YurtIoTDockOptions {
return &YurtIoTDockOptions{
MetricsAddr: ":8080",
Expand All @@ -50,9 +63,13 @@ func NewYurtIoTDockOptions() *YurtIoTDockOptions {
CoreDataAddr: "edgex-core-data:59880",
CoreMetadataAddr: "edgex-core-metadata:59881",
CoreCommandAddr: "edgex-core-command:59882",
RedisAddr: "edgex-redis",
RedisPort: 6379,
EdgeSyncPeriod: 5,
MessageBusOptions: MessageBusOptions{
Host: "edgex-redis",
Port: 6379,
Protocol: "redis",
Type: "redis",
},
EdgeSyncPeriod: 120,
}
}

Expand All @@ -73,9 +90,13 @@ func (o *YurtIoTDockOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.CoreDataAddr, "core-data-address", "edgex-core-data:59880", "The address of edge core-data service.")
fs.StringVar(&o.CoreMetadataAddr, "core-metadata-address", "edgex-core-metadata:59881", "The address of edge core-metadata service.")
fs.StringVar(&o.CoreCommandAddr, "core-command-address", "edgex-core-command:59882", "The address of edge core-command service.")
fs.StringVar(&o.RedisAddr, "edgex-redis-address", "edgex-redis", "The address of edge database service.")
fs.UintVar(&o.RedisPort, "edgex-redis-port", 6379, "The port of the redis service.")
fs.UintVar(&o.EdgeSyncPeriod, "edge-sync-period", 5, "The period of the device management platform synchronizing the device status to the cloud.(in seconds,not less than 5 seconds)")
fs.StringVar(&o.MessageBusOptions.Host, "message-bus-host", "edgex-redis", "The hostname or IP address of the messaging broker, if applicable.")
fs.IntVar(&o.MessageBusOptions.Port, "message-bus-port", 6379, "The port on which to access the message queue.")
fs.StringVar(&o.MessageBusOptions.Protocol, "message-bus-protocol", "redis", "The protocol to use when accessing the message queue.")
fs.StringVar(&o.MessageBusOptions.Type, "message-bus-type", "redis", "The message queue platform being used. eg. \"redis\" for Redis Pub/Sub")
fs.IntVar(&o.MessageBusOptions.HeartbeatInterval, "message-bus-heartbeat-interval", 30, "The heartbeat interval for iot-dock to checker the connection with message bus.(in seconds,not less than 30 seconds)")
fs.StringVar(&o.MessageBusOptions.Name, "message-bus-name", "edgex-redis", "The name of the message bus instance.")
fs.UintVar(&o.EdgeSyncPeriod, "edge-sync-period", 2*60, "The period of the device management platform synchronizing the device status to the cloud.(in seconds,not less than 2 minutes)")
}

func ValidateEdgePlatformAddress(options *YurtIoTDockOptions) error {
Expand Down
10 changes: 0 additions & 10 deletions pkg/yurtiotdock/clients/edgex-foundry/v3/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ func toEdgeXProfileProperty(pp iotv1alpha1.ResourceProperties) dtos.ResourceProp
}
}

func ToKubeDeviceService(ds dtos.DeviceService, namespace string) iotv1alpha1.DeviceService {
return toKubeDeviceService(ds, namespace)
}

func toKubeDeviceService(ds dtos.DeviceService, namespace string) iotv1alpha1.DeviceService {
return iotv1alpha1.DeviceService{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -202,9 +198,6 @@ func toEdgeXOperatingState(os iotv1alpha1.OperatingState) models.OperatingState
}
return models.Unknown
}
func ToKubeDevice(ed dtos.Device, namespace string) iotv1alpha1.Device {
return toKubeDevice(ed, namespace)
}

// toKubeDevice serialize the EdgeX Device to the corresponding Kubernetes Device
func toKubeDevice(ed dtos.Device, namespace string) iotv1alpha1.Device {
Expand Down Expand Up @@ -268,9 +261,6 @@ func toKubeProtocols(
}
return ret
}
func ToKubeDeviceProfile(dp dtos.DeviceProfile, namespace string) iotv1alpha1.DeviceProfile {
return toKubeDeviceProfile(&dp, namespace)
}

// toKubeDeviceProfile create DeviceProfile in cloud according to devicProfile in edge
func toKubeDeviceProfile(dp *dtos.DeviceProfile, namespace string) iotv1alpha1.DeviceProfile {
Expand Down
Loading

0 comments on commit 667fc5f

Please sign in to comment.