import json import os import socket import time import subprocess from ctypes import Union from gi.overrides import Gtk import config PORT=8081 class SPResponse: def __init__(self, code, msg, log): self.code=code self.message=msg self.log=log def __bool__(self): return self.code!=0 class Socket(socket.socket): def __init__(self): super().__init__(socket.AF_INET, socket.SOCK_STREAM) def readc(self, blocking=True): c=b'' while not len(c): c=self.recv(1) return c def readline(self): buffer = bytes() ok=False while not ok: c = self.readc() if c[0]==10: #new line ok=True else: buffer+=bytes(c) self.readc() self.readc() return buffer.decode("utf-8") def response(self): x=self.readline() _status = x.split(" ") code = int(_status[0]) msg = " ".join(_status[1:]) log = "" ok=False delim=">".encode() while not ok: c=self.readc() if c!=delim: log+=self.readline()+"\n" else: self.readc() break return SPResponse(code, msg, log) def write(self, string): if not "\n" in string: string += "\r\n" b = string.encode() l = len(b) i = 0 while i < l: i += self.send(b[i:]) def connect(self, address) -> None: x=super().connect(address) self.readc() self.readc() self.readc() def set_pad(self, pad): if isinstance(pad, str): self.write('pad select "%s"\n'%pad) else: self.write('pad select %d\n'%pad) return self.response() def set_configuration(self, path): self.write('pad load "%s"\n' % path) return self.response() def clear(self): self.write('pad clear\n') return self.response() class Response: def __init__(self, sock): self.code=0 self.message="" self.log="" class DisconnectException(Exception): def __init__(self, string=""): super().__init__(string) class SimpleMidi: TEMP_FILE=".simple_midi_exchange" NB_TRIES=200 def __init__(self, connect=None): self.connect=connect self.pid=-1 self.socket=None self.process=None def run(self): if not self.connect and not self.is_alive(): padsdir = os.path.normpath(os.path.join(os.getcwd(), "../pads")) args=(os.path.join(os.getcwd(), config.PAD_ROUTER_BIN), "--command", "inet", "8081", "-k", "--pads-dir", padsdir) self.process=subprocess.Popen(args) """if (self.pid>0): os.kill(self.pid, 9) os.wait() "ui/" + config.PAD_ROUTER_BIN, ".", "--command", "inet", "8081", "-k", "--pads-dir", padsdir) self.pid = os.fork() if not self.pid: Gtk.main_quit() print("Start: ", config.PAD_ROUTER_BIN + " --pads-dir " + SimpleMidi.TEMP_FILE) os.chdir("..") padsdir = os.path.normpath(os.path.join(os.getcwd(), "pads")) args=("ui/" + config.PAD_ROUTER_BIN, ".", "--command", "inet", "8081", "-k", "--pads-dir", padsdir) print(" ".join(args)) os.execl(*args)""" self.socket = Socket() for i in range(SimpleMidi.NB_TRIES): try: self.socket.connect(self.connect if self.connect else("127.0.0.1", PORT) ) return except: print("Impossible de se connecter, essai %d/%d"%(i+1, SimpleMidi.NB_TRIES)) time.sleep(0.5) if not self.is_alive(): raise DisconnectException() raise DisconnectException() def clear(self): if not self.is_alive(): raise DisconnectException() self.socket.clear() def set_pad(self, pad): if isinstance(pad, dict): if "pad" in pad: pad=pad["pad"] elif "name" in pad: pad=pad["name"] if not self.is_alive(): raise DisconnectException() self.socket.set_pad(pad) def set_configuration_data(self, padconf): if not self.is_alive(): self.run() with open(SimpleMidi.TEMP_FILE, "w") as f: f.write(json.dumps(padconf.json())) self.socket.set_configuration("ui/"+SimpleMidi.TEMP_FILE) def set_configuration_file(self, padconf): if not self.is_alive(): self.run() self.socket.set_configuration(padconf) def is_alive(self): return self.process and self.process.poll()==None if __name__ == "__main__": sm = SimpleMidi() sm.run() sm.set_pad(0) sm.clear() while True: time.sleep(5) sm.set_configuration_file("ui/conf2.conf") time.sleep(5) sm.set_configuration_file("ui/conf3.conf")