#!/usr/bin/python from .midimessage import * from rtmidi import MidiIn from rtmidi import MidiOut from rtmidi import MidiOut import copy import time from .alsaconnector import AlsaConnector, AlsaPort from .options import * from .utils import * ALSA = AlsaConnector() class MIDIPortException(Exception): def __init__(self, message): super().__init__(message) def _MidiPort__input_pad_error_cb(err, msg, obj): obj.inputError(err, msg) class _MidiPort(): _DEFAULT_PARAMS={ 'port': None, 'port_name' : "Midi Port", 'client_name' : "Simple MIDI", 'error_callback' : None } def __init__(self, direction,params): param=initParams(_MidiPort._DEFAULT_PARAMS, params) _MidiPort.INPUT=0 _MidiPort.OUTPUT=1 self.direction=direction self.port=param['port'] self.portName=param['port_name'] self.clientName=param['client_name'] self.errorCb=param['error_callback'] if self.port==None: if direction==_MidiPort.INPUT: self.port=MidiIn() else: self.port=MidiOut() self.port.set_error_callback(__input_pad_error_cb, self) self.alsaPort=None def _updateAlsaPort(self): cid=self.alsaPort.clientId pid=self.alsaPort.portId self.alsaPort=AlsaConnector.getPortFromId((cid, pid)) self.portName=self.alsaPort.name def _initClientId(self, cid, pid): x=None if self.direction==_MidiPort.INPUT: x=MidiOut() else: x=MidiIn() self.alsaPort = AlsaConnector.findMe(cid, pid) def inputError(self, err, msg): ERR("MIDI Error :"+type(err).__name__+" "+msg) if self.errorCb: self.errorCb[0](self.errorCb[1], err, msg) def __input_pad_error_cb(err, msg): if self.errorCb: self.errorCb[0](self.errorCb[1], err, msg) def setErrorCallback(self, fct, data): self.errorCb=(fct,data) def cancelErrorCallback(self): self.errorCb=None def setClientName(self, name): self.clientName=name.replace(':', ' ') self.port.set_client_name(self.clientName) if self.isOpen(): self._updateAlsaPort() def setPortName(self, name): self.portName=name.replace(':', ' ') self.portName=name self.port.set_port_name(self.portName) if self.isOpen(): self._updateAlsaPort() def getPortName(self, n=None): if n==None: return self.portName return self.port.get_port_name(n) def isOpen(self): return self.port.is_port_open() def getPorts(self): return self.port.get_ports() def getPortCount(self): return self.port.get_port_count() def getApi(self): return self.port.get_current_api() def open(self, port=None): if self.isOpen(): return True if port==None: port=self.portName self.setClientName(self.clientName) self.port.open_virtual_port(port) if self.isOpen(): self._initClientId(self.clientName,port) INFO("ALSA Midi port: "+str(self.alsaPort)+" created") return True return False def connect(self, to): if isinstance(to, tuple): if self.direction==_MidiPort.OUTPUT: DEBUG("Connecting '"+str(self.alsaPort)+"' to "+str(to)+" ...") ALSA.connect(self.alsaPort.globalId, to) else: DEBUG("Connecting '"+str(to)+"' to "+str(self.alsaPort)+" ...") ALSA.connect(to, self.alsaPort.globalId) elif isinstance(to, str): port=AlsaPort(to) return self.connect(port.globalId) elif isinstance(to, int): return self.connect(self.getPorts()[to]) elif isinstance(to, AlsaPort): return self.connect(to.globalId) def disconnect(self, to): if self.direction==_MidiPort.INPUT: ALSA.disconnect(to, self.portId) else: ALSA.disconnect(self.portId, to) def close(self): self.port.close_port() return not self.isOpen() """ # # Input # # """ def _on_input_callback(data, obj): obj._on_input_callback(data) class MidiInputPort(_MidiPort): _DEFAULT_PARAMS= dictAssign(_MidiPort._DEFAULT_PARAMS,{ 'mode': 0, 'ignore_midi_type' : {}, 'separate_callback': {}, 'input_callback': None }) @staticmethod def fromParams(param): param=initParams(MidiInputPort._DEFAULT_PARAMS, param) if param['port']==None: return MidiInputPort(param) else: return param def __init__(self, params): param=initParams(MidiInputPort._DEFAULT_PARAMS, params) _MidiPort.__init__(self, 0, param) MidiInputPort.MODE_QUEUE=0 MidiInputPort.MODE_CALLBACK=1 self.port.set_callback(_on_input_callback, self) self.port.ignore_types(False,False,False) self.queue=Queue() self.inputCb=param['input_callback'] self.separatesCB=param['separate_callback'] self.mode=param['mode'] self.ignore=param['ignore_midi_type'] """ Modes """ def setMode(self, mode): if self.mode!=MidiInputPort.MODE_QUEUE and \ self.mode!=MidiInputPort.MODE_CALLBACK: raise MIDIPortException("Bad input mode ("+str(self.mode)+")") self.mode=mode def getMode(self): return self.mode """ Callbacks """ def setInputCallback(self, fct, data, typ=None): self.mode=MidiInputPort.MODE_CALLBACK if typ==None: self.inputCb=(fct,data) else: self.separatesCB[typ]=(fct, data) def cancelInputCallback(self, typ=None): if typ==None: self.inputCb=None else: self.separatesCB[typ]=None if len(self.separatesCB.keys())==0 and self.inputCb==None: self.mode=MidiInputPort.MODE_QUEUE def cancelAllCallbacks(self): self.inputCb=None self.separatesCB={} self.mode=MidiInputPort.MODE_QUEUE def _on_input_callback(self, data): t=data[1] obj=MidiMessage.parse(data[0]) typ=obj.getType() if self.mode==MidiInputPort.MODE_QUEUE: if not ((typ in self.ignore.keys()) and self.ignore[typ]==True): self.queue.enqueue( (t,obj) ) elif self.mode==MidiInputPort.MODE_CALLBACK: if self.inputCb and self.inputCb[0]: self.inputCb[0]( t, obj) if typ in self.separatesCB: cb=self.separatesCB[typ] if cb[0]: cb[0](t, obj) """ Others """ def get(self, sync=False): if sync: return self.getSync() if self.mode==MidiInputPort.MODE_QUEUE: x=self.queue.peek() if x: self.queue.dequeue() return x else: raise MIDIPortException("Bad mode for : get") def getSync(self): return self.queue.dequeue() def ignoreMessages(self, data): for k in data.keys(): self.ignore[k]=data[k] """ MIDIInputPort.ALL_IGNORED={ # True event is skipped, False or undefinedevent pass MidiType.NOTE_ON:True, MidiType.NOTE_OFF:True, MidiType.KEY_PRESSURE:True, MidiType.CONTROL_CHANGE:True, MidiType.PROGRAM_CHANGE:True, MidiType.CHANNEL_PRESSURE:True, MidiType.PITCH_WHEEL_CHANGE:True, MidiType.SYSTEM_EXCLUSIVE:True, MidiType.SONG_POSITION_POINTER:True, MidiType.SONG_SELECT:True, MidiType.TUNE_REQUEST:True, MidiType.END_OF_EXCUSIVE:True, MidiType.TIMING_CLOCK:True, MidiType.START:True, MidiType.CONTINUE:True, MidiType.STOP:True, MidiType.ACTIVE_SENSING:True, MidiType.META:True, MidiType.RESET:True, MidiType.META_SEQ:True, MidiType.META_TEXT:True, MidiType.META_COPYRIGHT:True, MidiType.META_TRACK_NAME:True, MidiType.META_INSTRUMENT_NAME:True, MidiType.META_LYRICS:True, MidiType.META_TEXT_MARKER:True, MidiType.META_CUE_POINT:True, MidiType.META_CHANNEL_PREFIX:True, MidiType.META_END_OF_TRACK:True, MidiType.META_SET_TEMPO:True, MidiType.META_SMPTE_OFFSET:True, MidiType.META_TIME_SIGNATURE:True } """ """ # # Output # # """ class MidiOutputPort(_MidiPort): _DEFAULT_PARAMS=_MidiPort._DEFAULT_PARAMS @staticmethod def fromParams(param): param=initParams(MidiOutputPort._DEFAULT_PARAMS, param) if param['port']==None: return MidiOutputPort(param) else: return param def __init__(self, params): _MidiPort.__init__(self, 1, params) def send(self, param): if isinstance(param, (bytes, list)): send_message(self.port, param) #self.port.send_message(param) else: send_message(self.port, param.bytes()) #self.port.send_message(param.bytes()) def noteOn(self, ch, key, vel): self.send(NoteOn.new(ch, key, vel).bytes()) def noteOff(self, ch, key, vel): self.send(NoteOff.new(ch, key, vel).bytes()) def keyPressure(self, ch, key, pressure): self.send(KeyPressure.new(ch, key, pressure).bytes()) def controlChange(self, ch, key, value): self.send(ControlChange.new(ch, key, value).bytes()) def programChange(self, ch, key): self.send(ProgramChange.new(ch, key).bytes()) def channelPressure(self, ch, pressure): self.send(ChannelPressure.new(ch, pressure).bytes()) def pitchWheelChange(self, ch, pressure): self.send(PitchWheelChange.new(ch, pressure).bytes()) def systemExclusive(self, data): self.send(SystemExclusive.new(data).bytes()) def songPointerPosition(self, beats): self.send(SongPointerPosition.new(beats).bytes()) def songSelect(self, song): self.send(SongSelect.new(song).bytes()) def tuneRequest(self): self.send(TuneRequest.new().bytes()) def timingClock(self): self.send(TimingClock.new().bytes()) def start(self): self.send(Start.new().bytes()) def continuer(self): self.send(Continue.new().bytes()) def stop(self): self.send(Stop.new().bytes()) def activeSensing(self): self.send(ActiveSensing.new().bytes()) def endOfTrack(self): self.send(EndOfTrack.new().bytes()) def sequenceNumber(self): self.send(SequenceNumber.new().bytes()) def textEvent(self, text): self.send(TextEvent.new(text).bytes()) def copyright(self, text): self.send(Copyright.new(text).bytes()) def trackName(self, text): self.send(TrackName.new(text).bytes()) def instrumentName(self, text): self.send(InstrumentName.new(text).bytes()) def textLyrics(self, text): self.send(TextLyrics.new(text).bytes()) def textMarker(self, text): self.send(TextMarker.new(text).bytes()) def textEvent(self, text): self.send(TextEvent.new(text).bytes()) def cuePoint(self, text): self.send(CuePoint.new(text).bytes()) def channelPrefix(self, cc): self.send(ChannelPrefix.new(cc).bytes()) def setTempo(self, tempo): self.send(SetTempo.new(tempo).bytes()) def SMPTEOffset(self, hh, mm, ss, fr, ff): self.send(SMPTEOffset.new(hh, mm, ss, fr, ff).bytes()) def timeSignature(self, numerator, denominator, clic, notes): self.send(SMPTEOffset.new(numerator, denominator, clic, notes).bytes()) if __name__=="__main__": ipp=MidiInputPort() ipp.setClientName("Test 1") ipp.open("d") ippp=MidiInputPort() ippp.setClientName("Test 1") ippp.open("c") op=MidiOutputPort() op.setClientName("Test 1") op.open("b") opp=MidiOutputPort() opp.setClientName("Test 1") opp.open("a") time.sleep(100) """ [ ('System', 0, [ ('Timer', 0, ([], [])), ('Announce', 1, ( [ (128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (130, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}) ], [] ) ) ] ) ,('Midi Through', 14, [ ('Midi Through Port-0', 0, ( [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})], [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})] ) ) ] ), ('APC MINI', 28, [ ('APC MINI MIDI 1', 0, ( [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})], [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})] ) ) ] ), ('a2jmidid', 128, [ ('port', 0, ( [(14, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (28, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (129, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})], [(0, 1, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (14, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1}), (28, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1}), (129, 1, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})] ) ) ] ), ('mididings', 129, [ ('in_1', 0, ( [], [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})] ) ), ('out_1', 1, ( [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})], [] ) ) ] ), ('Patchage', 130, [ ('System Announcement Reciever', 0, ( [], [(0, 1, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})] ) ) ] ), ('pyalsa-10554', 131, [ ] ) ] """