diff options
Diffstat (limited to 'server.go')
| -rw-r--r-- | server.go | 385 |
1 files changed, 385 insertions, 0 deletions
diff --git a/server.go b/server.go new file mode 100644 index 0000000..e2d9c75 --- /dev/null +++ b/server.go @@ -0,0 +1,385 @@ +package main + +import ( + "fmt" + "net" + "os" + "path" + "reflect" + _ "runtime/debug" + "time" + + log "github.com/sirupsen/logrus" + "golang.org/x/net/context" + "google.golang.org/grpc" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +const ( + resourceNamePrefix = "deviceplugindemo/" + serverSockPath = pluginapi.DevicePluginPath + // AWS_SN = "F1-Node" +) + +// FPGADevicePluginServer implements the Kubernetes device plugin API +type FPGADevicePluginServer struct { + devType string + devices map[string]Device + socket string + stop chan interface{} + update chan map[string]Device + + server *grpc.Server +} + +type FPGADevicePlugin struct { + devices map[string]map[string]Device + servers map[string]*FPGADevicePluginServer + updateChan chan map[string]map[string]Device +} + +// NewFPGADevicePlugin returns an initialized FPGADevicePlugin +func NewFPGADevicePlugin() *FPGADevicePlugin { + log.Debugf("create FPGA device plugin") + updateChan := make(chan map[string]map[string]Device) + plugin := FPGADevicePlugin{ + devices: make(map[string]map[string]Device), + servers: make(map[string]*FPGADevicePluginServer), + updateChan: updateChan, + } + + go func() { + for { + devices, err := GetDevices() + if err != nil { + time.Sleep(75 * time.Second) + devices, err = GetDevices() + if err != nil { + log.Errorf("Error to get FPGA devices: %v", err) + break + } + } + devMap := make(map[string]map[string]Device) + for _, device := range devices { + + DSAtype := device.index + id := device.deviceID + if subMap, ok := devMap[DSAtype]; ok { + subMap = devMap[DSAtype] + subMap[id] = device + } else { + subMap = make(map[string]Device) + devMap[DSAtype] = subMap + subMap[id] = device + } + } + updateChan <- devMap + time.Sleep(5 * time.Second) + } + close(updateChan) + }() + + return &plugin +} + +func (m *FPGADevicePlugin) checkDeviceUpdate(n map[string]map[string]Device) { + added := make(map[string]map[string]Device) + updated := make(map[string]map[string]Device) + removed := make(map[string]map[string]Device) + + for oDevType, oDevices := range m.devices { + if nDevices, ok := n[oDevType]; ok { + if !reflect.DeepEqual(oDevices, nDevices) { + updated[oDevType] = nDevices + } + delete(n, oDevType) + } else { + removed[oDevType] = oDevices + } + } + for nDevType, nDevices := range n { + added[nDevType] = nDevices + } + + //create new server for added devices + for aDevType, aDevices := range added { + devicePluginServer := m.NewFPGADevicePluginServer(aDevType, aDevices) + m.devices[aDevType] = aDevices + m.servers[aDevType] = devicePluginServer + go func(aDevType string, aDevices map[string]Device, name string) { + if err := m.servers[aDevType].Serve(name); err != nil { + log.Println("Could not contact Kubelet, Exit. Did you enable the device plugin feature gate?") + os.Exit(1) + } + m.servers[aDevType].update <- aDevices + }(aDevType, aDevices, resourceNamePrefix+aDevType) + } + + //stop server for removed devices + for rDevType, rDevices := range removed { + log.Debugf("Remove device %v", rDevices) + m.servers[rDevType].Stop() + delete(m.servers, rDevType) + delete(m.devices, rDevType) + } + + //send update for updated devices + for uDevType, uDevices := range updated { + m.devices[uDevType] = uDevices + m.servers[uDevType].update <- uDevices + } +} + +// NewFPGADevicePluginServer returns an initialized FPGADevicePluginServer +func (m *FPGADevicePlugin) NewFPGADevicePluginServer(devType string, devices map[string]Device) *FPGADevicePluginServer { + return &FPGADevicePluginServer{ + devType: devType, + devices: devices, + socket: path.Join(serverSockPath, devType+"-demodevice.sock"), + stop: make(chan interface{}), + update: make(chan map[string]Device, 1), + } +} + +// waitForServer checks if grpc server is alive +// by making grpc blocking connection to the server socket +func waitForServer(socket string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + conn, err := grpc.DialContext(ctx, socket, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }), + ) + if conn != nil { + conn.Close() + } + + if err != nil { + fmt.Errorf("Failed dial context at %s", socket) + return err + } + return nil +} + +func (m *FPGADevicePluginServer) deviceExists(id string) bool { + for k, _ := range m.devices { + if k == id { + return true + } + } + return false +} + +func (m *FPGADevicePluginServer) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return nil, fmt.Errorf("PreStartContainer() should not be called") +} + +func (m *FPGADevicePluginServer) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + fmt.Println("GetDevicePluginOptions: return empty options") + return new(pluginapi.DevicePluginOptions), nil +} + +// Start starts the gRPC server of the device plugin +func (m *FPGADevicePluginServer) Start() error { + err := m.cleanup() + if err != nil { + return err + } + + sock, err := net.Listen("unix", m.socket) + if err != nil { + return err + } + + m.server = grpc.NewServer() + pluginapi.RegisterDevicePluginServer(m.server, m) + + go m.server.Serve(sock) + + // Wait for the server to start + if err = waitForServer(m.socket, 10*time.Second); err != nil { + return err + } + + return nil +} + +// Stop stops the gRPC server +func (m *FPGADevicePluginServer) Stop() error { + if m.server == nil { + return nil + } + + m.server.Stop() + m.server = nil + close(m.stop) + close(m.update) + + return m.cleanup() +} + +func (m *FPGADevicePluginServer) cleanup() error { + if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +// Register registers the device plugin for the given resourceName with Kubelet. +func (m *FPGADevicePluginServer) Register(kubeletEndpoint, resourceName string) error { + conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + })) + + if err != nil { + log.Debugf("Cann't connect to kubelet service") + return err + } + defer conn.Close() + + client := pluginapi.NewRegistrationClient(conn) + reqt := &pluginapi.RegisterRequest{ + Version: pluginapi.Version, + Endpoint: path.Base(m.socket), + ResourceName: resourceName, + } + + _, err = client.Register(context.Background(), reqt) + if err != nil { + log.Debugf("Cann't register to kubelet service") + return err + } + return nil +} + +// func IsContain(items []string, item string) bool { +// AWS_SN := "F1-Node" +// for _, eachItem := range items { +// if eachItem == item && strings.EqualFold(item, AWS_SN) != true { +// return true +// } +// } +// return false +// } +func (m *FPGADevicePluginServer) sendDevices(s pluginapi.DevicePlugin_ListAndWatchServer) error { + resp := new(pluginapi.ListAndWatchResponse) + + check_range := m.devices + SerialNums := []string{} + for _, device := range check_range { + if device.SN == "" { + log.Printf("Error, Device %v has empty Serial number", device.deviceID) + } else { + SerialNums = append(SerialNums, device.SN) + tem := &pluginapi.Device{ + ID: device.deviceID, + Health: device.Healthy, + } + resp.Devices = append(resp.Devices, tem) + } + } + log.Printf("Check SeialNums arry: %v", SerialNums) + log.Printf("Sending %d device(s) %v to kubelet", len(resp.Devices), resp.Devices) + if err := s.Send(resp); err != nil { + m.Stop() + log.Debugf("Cannot update device list") + return err + } + return nil +} + +// ListAndWatch lists devices and update that list according to the health status +func (m *FPGADevicePluginServer) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { + log.Debugf("In ListAndWatch(%s): stream: %v", m.devType, s) + //debug.PrintStack() + for m.devices = range m.update { + if err := m.sendDevices(s); err != nil { + return err + } + } + return nil +} + +// Allocate which return list of devices. +func (m *FPGADevicePluginServer) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + log.Debugf("In Allocate()") + response := new(pluginapi.AllocateResponse) + for _, creq := range req.ContainerRequests { + log.Debugf("Request IDs: %v", creq.DevicesIDs) + + cres := new(pluginapi.ContainerAllocateResponse) + + // Check same serial number devices, devices with same serail number "F1-node" will be marked as independent devices + deviceIDs_arry := creq.DevicesIDs + + for _, id := range deviceIDs_arry { + log.Printf("Receiving request %s", id) + dev, ok := m.devices[id] + if !ok { + return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id) + } + if !m.deviceExists(id) { + return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) + } + fname := path.Join(SysfsDevices, dev.deviceID, DeviceFile) + cres.Mounts = append(cres.Mounts, &pluginapi.Mount{ + HostPath: fname, + ContainerPath: fname, + ReadOnly: false, + }) + response.ContainerResponses = append(response.ContainerResponses, cres) + } + } + return response, nil +} + +// Serve starts the gRPC server and register the device plugin to Kubelet +func (m *FPGADevicePluginServer) Serve(resourceName string) error { + log.Debugf("In Serve(%s)", m.socket) + err := m.Start() + if err != nil { + log.Errorf("Could not start device plugin: %v", err) + return err + } + log.Infof("Starting to serve on %s", m.socket) + + err = m.Register(pluginapi.KubeletSocket, resourceName) + if err != nil { + log.Errorf("Could not register device plugin: %v", err) + m.Stop() + return err + } + log.Infof("Registered device plugin with Kubelet %s", resourceName) + + return nil +} + +func (m *FPGADevicePluginServer) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + response := new(pluginapi.PreferredAllocationResponse) + for _, creq := range req.ContainerRequests { + log.Debugf("Request IDs: %v", creq.AvailableDeviceIDs) + + cres := new(pluginapi.ContainerPreferredAllocationResponse) + + // Check same serial number devices, devices with same serail number "F1-node" will be marked as independent devices + deviceIDs_arry := creq.AvailableDeviceIDs + + for _, id := range deviceIDs_arry { + log.Printf("Receiving request %s", id) + dev, ok := m.devices[id] + if !ok { + return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id) + } + if !m.deviceExists(id) { + return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) + } + fname := path.Join(SysfsDevices, dev.deviceID, DeviceFile) + cres.DeviceIDs = append(cres.DeviceIDs, fname) + response.ContainerResponses = append(response.ContainerResponses, cres) + } + } + return response, nil +} |
