123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import json
- import os
- import socket
- import time
- import subprocess
- from ctypes import Union
- from gi.overrides import Gtk
- import config
- PORT=8081
- class SPResponse:
- def __init__(self, code, msg, log):
- self.code=code
- self.message=msg
- self.log=log
- def __bool__(self):
- return self.code!=0
- class Socket(socket.socket):
- def __init__(self):
- super().__init__(socket.AF_INET, socket.SOCK_STREAM)
- def readc(self, blocking=True):
- c=b''
- while not len(c):
- c=self.recv(1)
- return c
- def readline(self):
- buffer = bytes()
- ok=False
- while not ok:
- c = self.readc()
- if c[0]==10: #new line
- ok=True
- else:
- buffer+=bytes(c)
- self.readc()
- self.readc()
- return buffer.decode("utf-8")
- def response(self):
- x=self.readline()
- _status = x.split(" ")
- code = int(_status[0])
- msg = " ".join(_status[1:])
- log = ""
- ok=False
- delim=">".encode()
- while not ok:
- c=self.readc()
- if c!=delim:
- log+=self.readline()+"\n"
- else:
- self.readc()
- break
- return SPResponse(code, msg, log)
- def write(self, string):
- if not "\n" in string: string += "\r\n"
- b = string.encode()
- l = len(b)
- i = 0
- while i < l:
- i += self.send(b[i:])
- def connect(self, address) -> None:
- x=super().connect(address)
- self.readc()
- self.readc()
- self.readc()
- def set_pad(self, pad):
- if isinstance(pad, str):
- self.write('pad select "%s"\n'%pad)
- else:
- self.write('pad select %d\n'%pad)
- return self.response()
- def set_configuration(self, path):
- self.write('pad load "%s"\n' % path)
- return self.response()
- def clear(self):
- self.write('pad clear\n')
- return self.response()
- class Response:
- def __init__(self, sock):
- self.code=0
- self.message=""
- self.log=""
- class DisconnectException(Exception):
- def __init__(self, string=""):
- super().__init__(string)
- class SimpleMidi:
- TEMP_FILE=".simple_midi_exchange"
- NB_TRIES=200
- def __init__(self, connect=None):
- self.connect=connect
- self.pid=-1
- self.socket=None
- self.process=None
- def run(self):
- if not self.connect and not self.is_alive():
- padsdir = os.path.normpath(os.path.join(os.getcwd(), "../pads"))
- args=(os.path.join(os.getcwd(), config.PAD_ROUTER_BIN), "--command", "inet", "8081", "-k", "--pads-dir", padsdir)
- fd = open("_stderr", "wb")
- self.process=subprocess.Popen(args, stderr=fd)
- """if (self.pid>0):
- os.kill(self.pid, 9)
- os.wait()
- "ui/" + config.PAD_ROUTER_BIN, ".", "--command", "inet", "8081", "-k", "--pads-dir", padsdir)
- self.pid = os.fork()
- if not self.pid:
- Gtk.main_quit()
- print("Start: ", config.PAD_ROUTER_BIN + " --pads-dir " + SimpleMidi.TEMP_FILE)
- os.chdir("..")
- padsdir = os.path.normpath(os.path.join(os.getcwd(), "pads"))
- args=("ui/" + config.PAD_ROUTER_BIN, ".", "--command", "inet", "8081", "-k", "--pads-dir", padsdir)
- print(" ".join(args))
- os.execl(*args)"""
- self.socket = Socket()
- for i in range(SimpleMidi.NB_TRIES):
- try:
- self.socket.connect(self.connect if self.connect else("127.0.0.1", PORT) )
- return
- except:
- print("Impossible de se connecter, essai %d/%d"%(i+1, SimpleMidi.NB_TRIES))
- time.sleep(0.5)
- if not self.is_alive():
- raise DisconnectException()
- raise DisconnectException()
- def clear(self):
- if not self.is_alive():
- raise DisconnectException()
- self.socket.clear()
- def set_pad(self, pad):
- if isinstance(pad, dict):
- if "pad" in pad:
- pad=pad["pad"]
- elif "name" in pad:
- pad=pad["name"]
- if not self.is_alive():
- raise DisconnectException()
- self.socket.set_pad(pad)
- def set_configuration_data(self, padconf):
- if not self.is_alive():
- self.run()
- with open(SimpleMidi.TEMP_FILE, "w") as f:
- f.write(json.dumps(padconf.json()))
- self.socket.set_configuration("ui/"+SimpleMidi.TEMP_FILE)
- time.sleep(0.1)
- os.remove(SimpleMidi.TEMP_FILE)
- def set_configuration_file(self, padconf):
- if not self.is_alive():
- self.run()
- self.socket.set_configuration(padconf)
- def is_alive(self):
- return self.process and self.process.poll()==None
- if __name__ == "__main__":
- sm = SimpleMidi()
- sm.run()
- sm.set_pad(0)
- sm.clear()
- while True:
- time.sleep(5)
- sm.set_configuration_file("ui/conf2.conf")
- time.sleep(5)
- sm.set_configuration_file("ui/conf3.conf")
|