import socket import sys ledNames = [ "power", "lan0", "lan1", "lan2", "lan3", "lan4", "wan", "pci1", "pci2", "pci3", "user1", "user2" ] def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def setLed(index, color): try: red = int(color[0]) green = int(color[1]) blue = int(color[2]) file = open("/sys/class/leds/omnia-led:{}/color".format(ledNames[index]), "w") file.write("{} {} {}".format(red, green, blue)) except ValueError as error: eprint(error) raise Exception("Wrong color values") except KeyError as error: eprint(error) raise Exception("Wrong color values") else: eprint("Red: {}, Green: {}, Blue: {}".format(red,green,blue)) finally: file.close() def handleData(data): databyline = data.split("\n") try: for i in range(len(databyline)): if len(databyline[i].strip()) is 0: continue setLed(i, databyline[i].strip().split(" ")) except Exception: return 1 else: return 0 def setAutonomous(data): pass # Create a TCP/IP socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Bind the socket to the port server_address = ('0.0.0.0', 10000) try: sock.bind(server_address) except OSError: server_address = ('0.0.0.0', 10001) sock.bind(server_address) eprint("Server started on {} on port {}".format(server_address[0], server_address[1])) # Listen for incoming connections sock.listen(1) while True: # Wait for a connection eprint('waiting for connection') conn, client_address = sock.accept() try: eprint('connection from {}'.format(client_address)) while True: allData = ""; while True: data = conn.recv(1024) if not data: break allData+=data.decode('utf-8') if allData.find('\n\n') > -1: break eprint('all data: {}'.format(allData)) if not handleData(allData) is 0: conn.sendall("error".encode('utf-8')) else: conn.sendall("OK".encode('utf-8')) except BrokenPipeError: eprint("connection from {} has broken a pipe.".format(client_address)) conn.close() except ConnectionResetError: eprint("connection from {} reset.".format(client_address)) conn.close() finally: # Clean up the connection conn.close()