summaryrefslogtreecommitdiff
path: root/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'server.go')
-rw-r--r--server.go385
1 files changed, 385 insertions, 0 deletions
diff --git a/server.go b/server.go
new file mode 100644
index 0000000..e2d9c75
--- /dev/null
+++ b/server.go
@@ -0,0 +1,385 @@
+package main
+
+import (
+ "fmt"
+ "net"
+ "os"
+ "path"
+ "reflect"
+ _ "runtime/debug"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
+)
+
+const (
+ resourceNamePrefix = "deviceplugindemo/"
+ serverSockPath = pluginapi.DevicePluginPath
+ // AWS_SN = "F1-Node"
+)
+
+// FPGADevicePluginServer implements the Kubernetes device plugin API
+type FPGADevicePluginServer struct {
+ devType string
+ devices map[string]Device
+ socket string
+ stop chan interface{}
+ update chan map[string]Device
+
+ server *grpc.Server
+}
+
+type FPGADevicePlugin struct {
+ devices map[string]map[string]Device
+ servers map[string]*FPGADevicePluginServer
+ updateChan chan map[string]map[string]Device
+}
+
+// NewFPGADevicePlugin returns an initialized FPGADevicePlugin
+func NewFPGADevicePlugin() *FPGADevicePlugin {
+ log.Debugf("create FPGA device plugin")
+ updateChan := make(chan map[string]map[string]Device)
+ plugin := FPGADevicePlugin{
+ devices: make(map[string]map[string]Device),
+ servers: make(map[string]*FPGADevicePluginServer),
+ updateChan: updateChan,
+ }
+
+ go func() {
+ for {
+ devices, err := GetDevices()
+ if err != nil {
+ time.Sleep(75 * time.Second)
+ devices, err = GetDevices()
+ if err != nil {
+ log.Errorf("Error to get FPGA devices: %v", err)
+ break
+ }
+ }
+ devMap := make(map[string]map[string]Device)
+ for _, device := range devices {
+
+ DSAtype := device.index
+ id := device.deviceID
+ if subMap, ok := devMap[DSAtype]; ok {
+ subMap = devMap[DSAtype]
+ subMap[id] = device
+ } else {
+ subMap = make(map[string]Device)
+ devMap[DSAtype] = subMap
+ subMap[id] = device
+ }
+ }
+ updateChan <- devMap
+ time.Sleep(5 * time.Second)
+ }
+ close(updateChan)
+ }()
+
+ return &plugin
+}
+
+func (m *FPGADevicePlugin) checkDeviceUpdate(n map[string]map[string]Device) {
+ added := make(map[string]map[string]Device)
+ updated := make(map[string]map[string]Device)
+ removed := make(map[string]map[string]Device)
+
+ for oDevType, oDevices := range m.devices {
+ if nDevices, ok := n[oDevType]; ok {
+ if !reflect.DeepEqual(oDevices, nDevices) {
+ updated[oDevType] = nDevices
+ }
+ delete(n, oDevType)
+ } else {
+ removed[oDevType] = oDevices
+ }
+ }
+ for nDevType, nDevices := range n {
+ added[nDevType] = nDevices
+ }
+
+ //create new server for added devices
+ for aDevType, aDevices := range added {
+ devicePluginServer := m.NewFPGADevicePluginServer(aDevType, aDevices)
+ m.devices[aDevType] = aDevices
+ m.servers[aDevType] = devicePluginServer
+ go func(aDevType string, aDevices map[string]Device, name string) {
+ if err := m.servers[aDevType].Serve(name); err != nil {
+ log.Println("Could not contact Kubelet, Exit. Did you enable the device plugin feature gate?")
+ os.Exit(1)
+ }
+ m.servers[aDevType].update <- aDevices
+ }(aDevType, aDevices, resourceNamePrefix+aDevType)
+ }
+
+ //stop server for removed devices
+ for rDevType, rDevices := range removed {
+ log.Debugf("Remove device %v", rDevices)
+ m.servers[rDevType].Stop()
+ delete(m.servers, rDevType)
+ delete(m.devices, rDevType)
+ }
+
+ //send update for updated devices
+ for uDevType, uDevices := range updated {
+ m.devices[uDevType] = uDevices
+ m.servers[uDevType].update <- uDevices
+ }
+}
+
+// NewFPGADevicePluginServer returns an initialized FPGADevicePluginServer
+func (m *FPGADevicePlugin) NewFPGADevicePluginServer(devType string, devices map[string]Device) *FPGADevicePluginServer {
+ return &FPGADevicePluginServer{
+ devType: devType,
+ devices: devices,
+ socket: path.Join(serverSockPath, devType+"-demodevice.sock"),
+ stop: make(chan interface{}),
+ update: make(chan map[string]Device, 1),
+ }
+}
+
+// waitForServer checks if grpc server is alive
+// by making grpc blocking connection to the server socket
+func waitForServer(socket string, timeout time.Duration) error {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ conn, err := grpc.DialContext(ctx, socket, grpc.WithInsecure(), grpc.WithBlock(),
+ grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
+ return net.DialTimeout("unix", addr, timeout)
+ }),
+ )
+ if conn != nil {
+ conn.Close()
+ }
+
+ if err != nil {
+ fmt.Errorf("Failed dial context at %s", socket)
+ return err
+ }
+ return nil
+}
+
+func (m *FPGADevicePluginServer) deviceExists(id string) bool {
+ for k, _ := range m.devices {
+ if k == id {
+ return true
+ }
+ }
+ return false
+}
+
+func (m *FPGADevicePluginServer) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
+ return nil, fmt.Errorf("PreStartContainer() should not be called")
+}
+
+func (m *FPGADevicePluginServer) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
+ fmt.Println("GetDevicePluginOptions: return empty options")
+ return new(pluginapi.DevicePluginOptions), nil
+}
+
+// Start starts the gRPC server of the device plugin
+func (m *FPGADevicePluginServer) Start() error {
+ err := m.cleanup()
+ if err != nil {
+ return err
+ }
+
+ sock, err := net.Listen("unix", m.socket)
+ if err != nil {
+ return err
+ }
+
+ m.server = grpc.NewServer()
+ pluginapi.RegisterDevicePluginServer(m.server, m)
+
+ go m.server.Serve(sock)
+
+ // Wait for the server to start
+ if err = waitForServer(m.socket, 10*time.Second); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Stop stops the gRPC server
+func (m *FPGADevicePluginServer) Stop() error {
+ if m.server == nil {
+ return nil
+ }
+
+ m.server.Stop()
+ m.server = nil
+ close(m.stop)
+ close(m.update)
+
+ return m.cleanup()
+}
+
+func (m *FPGADevicePluginServer) cleanup() error {
+ if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ return nil
+}
+
+// Register registers the device plugin for the given resourceName with Kubelet.
+func (m *FPGADevicePluginServer) Register(kubeletEndpoint, resourceName string) error {
+ conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
+ grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
+ return net.DialTimeout("unix", addr, timeout)
+ }))
+
+ if err != nil {
+ log.Debugf("Cann't connect to kubelet service")
+ return err
+ }
+ defer conn.Close()
+
+ client := pluginapi.NewRegistrationClient(conn)
+ reqt := &pluginapi.RegisterRequest{
+ Version: pluginapi.Version,
+ Endpoint: path.Base(m.socket),
+ ResourceName: resourceName,
+ }
+
+ _, err = client.Register(context.Background(), reqt)
+ if err != nil {
+ log.Debugf("Cann't register to kubelet service")
+ return err
+ }
+ return nil
+}
+
+// func IsContain(items []string, item string) bool {
+// AWS_SN := "F1-Node"
+// for _, eachItem := range items {
+// if eachItem == item && strings.EqualFold(item, AWS_SN) != true {
+// return true
+// }
+// }
+// return false
+// }
+func (m *FPGADevicePluginServer) sendDevices(s pluginapi.DevicePlugin_ListAndWatchServer) error {
+ resp := new(pluginapi.ListAndWatchResponse)
+
+ check_range := m.devices
+ SerialNums := []string{}
+ for _, device := range check_range {
+ if device.SN == "" {
+ log.Printf("Error, Device %v has empty Serial number", device.deviceID)
+ } else {
+ SerialNums = append(SerialNums, device.SN)
+ tem := &pluginapi.Device{
+ ID: device.deviceID,
+ Health: device.Healthy,
+ }
+ resp.Devices = append(resp.Devices, tem)
+ }
+ }
+ log.Printf("Check SeialNums arry: %v", SerialNums)
+ log.Printf("Sending %d device(s) %v to kubelet", len(resp.Devices), resp.Devices)
+ if err := s.Send(resp); err != nil {
+ m.Stop()
+ log.Debugf("Cannot update device list")
+ return err
+ }
+ return nil
+}
+
+// ListAndWatch lists devices and update that list according to the health status
+func (m *FPGADevicePluginServer) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
+ log.Debugf("In ListAndWatch(%s): stream: %v", m.devType, s)
+ //debug.PrintStack()
+ for m.devices = range m.update {
+ if err := m.sendDevices(s); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Allocate which return list of devices.
+func (m *FPGADevicePluginServer) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
+ log.Debugf("In Allocate()")
+ response := new(pluginapi.AllocateResponse)
+ for _, creq := range req.ContainerRequests {
+ log.Debugf("Request IDs: %v", creq.DevicesIDs)
+
+ cres := new(pluginapi.ContainerAllocateResponse)
+
+ // Check same serial number devices, devices with same serail number "F1-node" will be marked as independent devices
+ deviceIDs_arry := creq.DevicesIDs
+
+ for _, id := range deviceIDs_arry {
+ log.Printf("Receiving request %s", id)
+ dev, ok := m.devices[id]
+ if !ok {
+ return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
+ }
+ if !m.deviceExists(id) {
+ return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
+ }
+ fname := path.Join(SysfsDevices, dev.deviceID, DeviceFile)
+ cres.Mounts = append(cres.Mounts, &pluginapi.Mount{
+ HostPath: fname,
+ ContainerPath: fname,
+ ReadOnly: false,
+ })
+ response.ContainerResponses = append(response.ContainerResponses, cres)
+ }
+ }
+ return response, nil
+}
+
+// Serve starts the gRPC server and register the device plugin to Kubelet
+func (m *FPGADevicePluginServer) Serve(resourceName string) error {
+ log.Debugf("In Serve(%s)", m.socket)
+ err := m.Start()
+ if err != nil {
+ log.Errorf("Could not start device plugin: %v", err)
+ return err
+ }
+ log.Infof("Starting to serve on %s", m.socket)
+
+ err = m.Register(pluginapi.KubeletSocket, resourceName)
+ if err != nil {
+ log.Errorf("Could not register device plugin: %v", err)
+ m.Stop()
+ return err
+ }
+ log.Infof("Registered device plugin with Kubelet %s", resourceName)
+
+ return nil
+}
+
+func (m *FPGADevicePluginServer) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
+ response := new(pluginapi.PreferredAllocationResponse)
+ for _, creq := range req.ContainerRequests {
+ log.Debugf("Request IDs: %v", creq.AvailableDeviceIDs)
+
+ cres := new(pluginapi.ContainerPreferredAllocationResponse)
+
+ // Check same serial number devices, devices with same serail number "F1-node" will be marked as independent devices
+ deviceIDs_arry := creq.AvailableDeviceIDs
+
+ for _, id := range deviceIDs_arry {
+ log.Printf("Receiving request %s", id)
+ dev, ok := m.devices[id]
+ if !ok {
+ return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
+ }
+ if !m.deviceExists(id) {
+ return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
+ }
+ fname := path.Join(SysfsDevices, dev.deviceID, DeviceFile)
+ cres.DeviceIDs = append(cres.DeviceIDs, fname)
+ response.ContainerResponses = append(response.ContainerResponses, cres)
+ }
+ }
+ return response, nil
+}