123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- #!/usr/bin/python
- from .midimessage import *
- from rtmidi import MidiIn
- from rtmidi import MidiOut
- from rtmidi import MidiOut
- import copy
- import time
- from .alsaconnector import AlsaConnector, AlsaPort
- from .options import *
- from .utils import *
- ALSA = AlsaConnector()
- class MIDIPortException(Exception):
- def __init__(self, message):
- super().__init__(message)
- def _MidiPort__input_pad_error_cb(err, msg, obj):
- obj.inputError(err, msg)
- class _MidiPort():
- _DEFAULT_PARAMS={
- 'port': None,
- 'port_name' : "Midi Port",
- 'client_name' : "Simple MIDI",
- 'error_callback' : None
- }
-
-
- def __init__(self, direction,params):
- param=initParams(_MidiPort._DEFAULT_PARAMS, params)
- _MidiPort.INPUT=0
- _MidiPort.OUTPUT=1
-
-
- self.direction=direction
- self.port=param['port']
- self.portName=param['port_name']
- self.clientName=param['client_name']
- self.errorCb=param['error_callback']
-
- if self.port==None:
- if direction==_MidiPort.INPUT:
- self.port=MidiIn()
- else:
- self.port=MidiOut()
-
- self.port.set_error_callback(__input_pad_error_cb, self)
- self.alsaPort=None
-
-
- def _updateAlsaPort(self):
- cid=self.alsaPort.clientId
- pid=self.alsaPort.portId
- self.alsaPort=AlsaConnector.getPortFromId((cid, pid))
- self.portName=self.alsaPort.name
-
-
- def _initClientId(self, cid, pid):
- x=None
- if self.direction==_MidiPort.INPUT:
- x=MidiOut()
- else:
- x=MidiIn()
- self.alsaPort = AlsaConnector.findMe(cid, pid)
-
- def inputError(self, err, msg):
- ERR("MIDI Error :"+type(err).__name__+" "+msg)
- if self.errorCb:
- self.errorCb[0](self.errorCb[1], err, msg)
-
- def __input_pad_error_cb(err, msg):
- if self.errorCb:
- self.errorCb[0](self.errorCb[1], err, msg)
-
- def setErrorCallback(self, fct, data):
- self.errorCb=(fct,data)
-
- def cancelErrorCallback(self):
- self.errorCb=None
-
- def setClientName(self, name):
- self.clientName=name.replace(':', ' ')
- self.port.set_client_name(self.clientName)
- if self.isOpen(): self._updateAlsaPort()
-
- def setPortName(self, name):
- self.portName=name.replace(':', ' ')
- self.portName=name
- self.port.set_port_name(self.portName)
- if self.isOpen(): self._updateAlsaPort()
-
- def getPortName(self, n=None):
- if n==None:
- return self.portName
- return self.port.get_port_name(n)
-
- def isOpen(self):
- return self.port.is_port_open()
-
- def getPorts(self):
- return self.port.get_ports()
-
- def getPortCount(self):
- return self.port.get_port_count()
-
- def getApi(self):
- return self.port.get_current_api()
-
- def open(self, port=None):
- if self.isOpen():
- return True
- if port==None:
- port=self.portName
-
- self.setClientName(self.clientName)
- self.port.open_virtual_port(port)
- if self.isOpen():
- self._initClientId(self.clientName,port)
- INFO("ALSA Midi port: "+str(self.alsaPort)+" created")
- return True
- return False
-
- def connect(self, to):
- if isinstance(to, tuple):
- if self.direction==_MidiPort.OUTPUT:
- DEBUG("Connecting '"+str(self.alsaPort)+"' to "+str(to)+" ...")
- ALSA.connect(self.alsaPort.globalId, to)
- else:
- DEBUG("Connecting '"+str(to)+"' to "+str(self.alsaPort)+" ...")
- ALSA.connect(to, self.alsaPort.globalId)
- elif isinstance(to, str):
- port=AlsaPort(to)
- return self.connect(port.globalId)
- elif isinstance(to, int):
- return self.connect(self.getPorts()[to])
- elif isinstance(to, AlsaPort):
- return self.connect(to.globalId)
-
-
- def disconnect(self, to):
- if self.direction==_MidiPort.INPUT:
- ALSA.disconnect(to, self.portId)
- else:
- ALSA.disconnect(self.portId, to)
-
-
- def close(self):
- self.port.close_port()
- return not self.isOpen()
-
- """
- #
- # Input
- #
- #
- """
- def _on_input_callback(data, obj):
- obj._on_input_callback(data)
-
- class MidiInputPort(_MidiPort):
-
- _DEFAULT_PARAMS= dictAssign(_MidiPort._DEFAULT_PARAMS,{
- 'mode': 0,
- 'ignore_midi_type' : {},
- 'separate_callback': {},
- 'input_callback': None
- })
-
-
- @staticmethod
- def fromParams(param):
- param=initParams(MidiInputPort._DEFAULT_PARAMS, param)
- if param['port']==None:
- return MidiInputPort(param)
- else:
- return param
-
- def __init__(self, params):
- param=initParams(MidiInputPort._DEFAULT_PARAMS, params)
- _MidiPort.__init__(self, 0, param)
- MidiInputPort.MODE_QUEUE=0
- MidiInputPort.MODE_CALLBACK=1
- self.port.set_callback(_on_input_callback, self)
- self.port.ignore_types(False,False,False)
- self.queue=Queue()
-
- self.inputCb=param['input_callback']
- self.separatesCB=param['separate_callback']
- self.mode=param['mode']
- self.ignore=param['ignore_midi_type']
-
- """
- Modes
- """
- def setMode(self, mode):
- if self.mode!=MidiInputPort.MODE_QUEUE and \
- self.mode!=MidiInputPort.MODE_CALLBACK:
- raise MIDIPortException("Bad input mode ("+str(self.mode)+")")
- self.mode=mode
-
- def getMode(self):
- return self.mode
-
-
- """
- Callbacks
- """
- def setInputCallback(self, fct, data, typ=None):
- self.mode=MidiInputPort.MODE_CALLBACK
- if typ==None:
- self.inputCb=(fct,data)
- else:
- self.separatesCB[typ]=(fct, data)
-
- def cancelInputCallback(self, typ=None):
- if typ==None:
- self.inputCb=None
- else:
- self.separatesCB[typ]=None
- if len(self.separatesCB.keys())==0 and self.inputCb==None:
- self.mode=MidiInputPort.MODE_QUEUE
-
- def cancelAllCallbacks(self):
- self.inputCb=None
- self.separatesCB={}
- self.mode=MidiInputPort.MODE_QUEUE
-
- def _on_input_callback(self, data):
- t=data[1]
- obj=MidiMessage.parse(data[0])
- typ=obj.getType()
- if self.mode==MidiInputPort.MODE_QUEUE:
- if not ((typ in self.ignore.keys()) and self.ignore[typ]==True):
- self.queue.enqueue( (t,obj) )
- elif self.mode==MidiInputPort.MODE_CALLBACK:
- if self.inputCb and self.inputCb[0]:
- self.inputCb[0]( t, obj)
- if typ in self.separatesCB:
- cb=self.separatesCB[typ]
- if cb[0]:
- cb[0](t, obj)
-
- """
- Others
- """
- def get(self, sync=False):
- if sync: return self.getSync()
- if self.mode==MidiInputPort.MODE_QUEUE:
- x=self.queue.peek()
- if x: self.queue.dequeue()
- return x
- else:
- raise MIDIPortException("Bad mode for : get")
-
- def getSync(self):
- return self.queue.dequeue()
-
- def ignoreMessages(self, data):
- for k in data.keys():
- self.ignore[k]=data[k]
- """
- MIDIInputPort.ALL_IGNORED={ # True event is skipped, False or undefinedevent pass
- MidiType.NOTE_ON:True,
- MidiType.NOTE_OFF:True,
- MidiType.KEY_PRESSURE:True,
- MidiType.CONTROL_CHANGE:True,
- MidiType.PROGRAM_CHANGE:True,
- MidiType.CHANNEL_PRESSURE:True,
- MidiType.PITCH_WHEEL_CHANGE:True,
- MidiType.SYSTEM_EXCLUSIVE:True,
- MidiType.SONG_POSITION_POINTER:True,
- MidiType.SONG_SELECT:True,
- MidiType.TUNE_REQUEST:True,
- MidiType.END_OF_EXCUSIVE:True,
- MidiType.TIMING_CLOCK:True,
- MidiType.START:True,
- MidiType.CONTINUE:True,
- MidiType.STOP:True,
- MidiType.ACTIVE_SENSING:True,
- MidiType.META:True,
- MidiType.RESET:True,
- MidiType.META_SEQ:True,
- MidiType.META_TEXT:True,
- MidiType.META_COPYRIGHT:True,
- MidiType.META_TRACK_NAME:True,
- MidiType.META_INSTRUMENT_NAME:True,
- MidiType.META_LYRICS:True,
- MidiType.META_TEXT_MARKER:True,
- MidiType.META_CUE_POINT:True,
- MidiType.META_CHANNEL_PREFIX:True,
- MidiType.META_END_OF_TRACK:True,
- MidiType.META_SET_TEMPO:True,
- MidiType.META_SMPTE_OFFSET:True,
- MidiType.META_TIME_SIGNATURE:True
- }
- """
- """
- #
- # Output
- #
- #
- """
- class MidiOutputPort(_MidiPort):
- _DEFAULT_PARAMS=_MidiPort._DEFAULT_PARAMS
-
-
- @staticmethod
- def fromParams(param):
- param=initParams(MidiOutputPort._DEFAULT_PARAMS, param)
- if param['port']==None:
- return MidiOutputPort(param)
- else:
- return param
-
- def __init__(self, params):
- _MidiPort.__init__(self, 1, params)
-
- def send(self, param):
- if isinstance(param, (bytes, list)):
- send_message(self.port, param)
- #self.port.send_message(param)
- else:
- send_message(self.port, param.bytes())
- #self.port.send_message(param.bytes())
-
- def noteOn(self, ch, key, vel):
- self.send(NoteOn.new(ch, key, vel).bytes())
-
- def noteOff(self, ch, key, vel):
- self.send(NoteOff.new(ch, key, vel).bytes())
-
- def keyPressure(self, ch, key, pressure):
- self.send(KeyPressure.new(ch, key, pressure).bytes())
-
- def controlChange(self, ch, key, value):
- self.send(ControlChange.new(ch, key, value).bytes())
-
- def programChange(self, ch, key):
- self.send(ProgramChange.new(ch, key).bytes())
-
- def channelPressure(self, ch, pressure):
- self.send(ChannelPressure.new(ch, pressure).bytes())
-
- def pitchWheelChange(self, ch, pressure):
- self.send(PitchWheelChange.new(ch, pressure).bytes())
-
- def systemExclusive(self, data):
- self.send(SystemExclusive.new(data).bytes())
-
- def songPointerPosition(self, beats):
- self.send(SongPointerPosition.new(beats).bytes())
-
- def songSelect(self, song):
- self.send(SongSelect.new(song).bytes())
-
- def tuneRequest(self):
- self.send(TuneRequest.new().bytes())
-
- def timingClock(self):
- self.send(TimingClock.new().bytes())
-
- def start(self):
- self.send(Start.new().bytes())
-
- def continuer(self):
- self.send(Continue.new().bytes())
-
- def stop(self):
- self.send(Stop.new().bytes())
-
- def activeSensing(self):
- self.send(ActiveSensing.new().bytes())
-
- def endOfTrack(self):
- self.send(EndOfTrack.new().bytes())
-
- def sequenceNumber(self):
- self.send(SequenceNumber.new().bytes())
-
- def textEvent(self, text):
- self.send(TextEvent.new(text).bytes())
-
- def copyright(self, text):
- self.send(Copyright.new(text).bytes())
-
- def trackName(self, text):
- self.send(TrackName.new(text).bytes())
-
- def instrumentName(self, text):
- self.send(InstrumentName.new(text).bytes())
-
- def textLyrics(self, text):
- self.send(TextLyrics.new(text).bytes())
-
- def textMarker(self, text):
- self.send(TextMarker.new(text).bytes())
-
- def textEvent(self, text):
- self.send(TextEvent.new(text).bytes())
-
- def cuePoint(self, text):
- self.send(CuePoint.new(text).bytes())
-
- def channelPrefix(self, cc):
- self.send(ChannelPrefix.new(cc).bytes())
-
- def setTempo(self, tempo):
- self.send(SetTempo.new(tempo).bytes())
-
- def SMPTEOffset(self, hh, mm, ss, fr, ff):
- self.send(SMPTEOffset.new(hh, mm, ss, fr, ff).bytes())
-
- def timeSignature(self, numerator, denominator, clic, notes):
- self.send(SMPTEOffset.new(numerator, denominator, clic, notes).bytes())
-
- if __name__=="__main__":
- ipp=MidiInputPort()
- ipp.setClientName("Test 1")
- ipp.open("d")
- ippp=MidiInputPort()
- ippp.setClientName("Test 1")
- ippp.open("c")
-
- op=MidiOutputPort()
- op.setClientName("Test 1")
- op.open("b")
- opp=MidiOutputPort()
- opp.setClientName("Test 1")
- opp.open("a")
-
-
- time.sleep(100)
- """
- [
- ('System', 0,
- [
- ('Timer', 0, ([], [])),
- ('Announce', 1,
- (
- [
- (128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}),
- (130, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})
- ],
- []
- )
- )
- ]
- )
- ,('Midi Through', 14,
- [
- ('Midi Through Port-0', 0,
- (
- [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})],
- [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})]
- )
- )
- ]
- ), ('APC MINI', 28,
- [
- ('APC MINI MIDI 1', 0,
- (
- [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})],
- [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})]
- )
- )
- ]
- ), ('a2jmidid', 128,
- [
- ('port', 0,
- (
- [(14, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (28, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (129, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})],
- [(0, 1, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0}), (14, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1}), (28, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1}), (129, 1, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})]
- )
- )
- ]
- ), ('mididings', 129,
- [
- ('in_1', 0,
- (
- [],
- [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})]
- )
- ),
- ('out_1', 1,
- (
- [(128, 0, {'queue': 0, 'exclusive': 0, 'time_update': 1, 'time_real': 1})],
- []
- )
- )
- ]
- ), ('Patchage', 130,
- [
- ('System Announcement Reciever', 0,
- (
- [],
- [(0, 1, {'queue': 0, 'exclusive': 0, 'time_update': 0, 'time_real': 0})]
- )
- )
- ]
- ), ('pyalsa-10554', 131,
- [
- ]
- )
- ]
- """
|