00001 00002 package protocol; 00003 00004 import java.net.InetAddress; 00005 import java.util.Vector; 00006 00007 import utils.Log; 00008 import utils.OctetBuffer; 00009 00010 /** 00011 * Encapsulates the link between the UDP channel and a CallContext. 00012 * UDP channel receives packets from all remote peers and dispatches them 00013 * to particular RemotePeer handler. RemotePeer might have multiple calls 00014 * in real PBX, howerever, this implementation allows only single CallContext 00015 * per RemotePeer. 00016 * 00017 * @author Mikica B Kocic 00018 */ 00019 public class RemotePeer implements Runnable 00020 { 00021 /** The UDP channel */ 00022 private DatagramChannel socket = null; 00023 00024 /** The call context */ 00025 private CallContext call = null; 00026 00027 /** PBXClient's (chat server's) User ID of the remote peer */ 00028 private String remoteUserId = null; 00029 00030 /** Remote IP address where to send PDUs */ 00031 private InetAddress remoteAddr = null; 00032 00033 /** Remote UDP port where to send PDUs */ 00034 private int remotePort = -1; 00035 00036 /** The queue of incoming PDUs from remote peer via our UDP channel */ 00037 protected Vector<byte[]> inboundPDUs = null; 00038 00039 /** The receiving process thread. */ 00040 protected Thread pduReceiverThread = null; 00041 00042 /** Indicates whether communication with peer is active or not */ 00043 private volatile boolean transmitting = false; 00044 00045 /** The time-stamp of the last received PDU */ 00046 private long lastReceiverTimestamp = 0; 00047 00048 /** 00049 * Constructor for the RemotePeer object 00050 * 00051 * @param socket instance of DatagramChannel used for communication 00052 * @param remoteUserId peer's user ID 00053 * @param remoteAddr peer's IP address 00054 * @param remotePort peer's UDP port listening for ours PDUs 00055 */ 00056 public RemotePeer( DatagramChannel socket, String remoteUserId, 00057 InetAddress remoteAddr, int remotePort ) 00058 { 00059 this.remoteUserId = remoteUserId; 00060 this.remoteAddr = remoteAddr; 00061 this.remotePort = remotePort; 00062 00063 this.call = null; 00064 00065 this.inboundPDUs = new Vector<byte[]> (); 00066 00067 synchronized( socket ) 00068 { 00069 this.socket = socket; 00070 this.socket.addNewPeer( this ); 00071 00072 if ( this.remoteAddr != null 00073 && this.remotePort > 0 && this.remotePort <= 65535 ) 00074 { 00075 this.startReceiver (); 00076 } 00077 } 00078 } 00079 00080 /** 00081 * Returns remote name (remote userid) of the peer 00082 */ 00083 public String getRemoteUserId () 00084 { 00085 return remoteUserId; 00086 } 00087 00088 /** 00089 * This method starts the receiver thread for inbound PDUs. 00090 */ 00091 synchronized private void startReceiver () 00092 { 00093 this.transmitting = true; 00094 00095 this.lastReceiverTimestamp = System.currentTimeMillis (); 00096 00097 this.pduReceiverThread = new Thread( this, 00098 "Peer-" + remoteAddr.getHostAddress() + ":" + remotePort ); 00099 00100 this.pduReceiverThread.setPriority( Thread.MAX_PRIORITY - 1 ); 00101 00102 if ( this.call != null ) { 00103 this.call.resetClock (); 00104 } 00105 00106 this.pduReceiverThread.start (); 00107 } 00108 00109 /** 00110 * Returns elapsed time since last received packet 00111 */ 00112 synchronized public long receiverIdleTime () 00113 { 00114 return System.currentTimeMillis () - this.lastReceiverTimestamp; 00115 } 00116 00117 /** 00118 * Adds an incoming PDUs (as bytes) to the PDUs queue. We are 00119 * still on the recv thread; this data was received by binder and 00120 * passed to us via friend. The PDUs are stored in the queue and 00121 * we deal with them on our own thread to relief the recv thread. 00122 * In other words, this is the last thing we do on the recv thread! 00123 * 00124 * @param data The PDU octets 00125 */ 00126 synchronized public void addIncomingPDU( byte[] data ) 00127 { 00128 if ( ! this.transmitting || data == null ) { 00129 return; 00130 } 00131 00132 this.lastReceiverTimestamp = System.currentTimeMillis (); 00133 this.inboundPDUs.addElement( data ); 00134 this.notifyAll (); 00135 } 00136 00137 /** 00138 * Manages the incoming PDUs stored in the PDUs queue. 00139 * This thread is started by startReceiver() and run separately from the 00140 * binder's receiver thread. 00141 */ 00142 public void run () 00143 { 00144 Log.trace( "Thread started" ); 00145 00146 while ( this.transmitting ) 00147 { 00148 Object[] pdusToSend = new Object[0]; 00149 00150 synchronized( this ) 00151 { 00152 try { 00153 this.wait (); 00154 } 00155 catch( InterruptedException e ) { 00156 /* ignored */ 00157 } 00158 00159 int pduCount = this.inboundPDUs.size (); 00160 00161 // Do some smart stuff here? (limit the take to only 20 PDUs?) 00162 // You'd hope that normally we'd get 1 or maybe 2 PDUs here. 00163 // 00164 if ( pduCount > 0 ) 00165 { 00166 pdusToSend = new Object[ pduCount ]; 00167 for ( int i = 0; i < pduCount; ++i ) { 00168 pdusToSend[i] = this.inboundPDUs.elementAt(i); 00169 } 00170 this.inboundPDUs.removeAllElements (); 00171 } 00172 } 00173 00174 // After releasing the lock, let's deal with the list... 00175 // we are now on the thread of the call, any time we waste is 00176 // our own. 00177 // should really sort these into sequence before we dispose of them. 00178 00179 for( int i = 0; i < pdusToSend.length; ++i ) 00180 { 00181 try { 00182 parsePDU( (byte[]) pdusToSend[i] ); 00183 } 00184 catch( Throwable e ) { 00185 Log.error( "ParsePDU failed; " + e.toString () ); 00186 Log.where (); 00187 } 00188 } 00189 } 00190 00191 this.socket = null; 00192 this.call = null; 00193 } 00194 00195 /** 00196 * Deals with newly received PDU octets. This method encapsulates them 00197 * into a instance of PDU class, deal with internal counters, sends an 00198 * acknowledgement and notifies the PDU it has arrived. 00199 */ 00200 void parsePDU( byte[] octets ) 00201 { 00202 if ( this.call != null && octets != null ) 00203 { 00204 /* Parse PDU 00205 */ 00206 ProtocolDataUnit pdu = ProtocolDataUnit.create( this.call, octets ); 00207 00208 /* Dispatch PDU if it is tagged with valid call numbers. 00209 * Note: Dispatching is hard-coded here, but this place might be 00210 * a core for real PBX call-id handling. 00211 */ 00212 if ( pdu != null && pdu.destinationCallNumber == 0x5926 00213 && pdu.sourceCallNumber == 0x3141 ) 00214 { 00215 pdu.onArrivedPDU (); 00216 } 00217 else if ( pdu != null ) 00218 { 00219 Log.warn( "Ignored PDU with destCall# " + pdu.destinationCallNumber 00220 + ", srcCalL# " + pdu.sourceCallNumber ); 00221 } 00222 } 00223 } 00224 00225 /** 00226 * Adds the new (not owned) call to the peer. 00227 */ 00228 public void addNewCall( CallContext call ) 00229 { 00230 this.call = call; 00231 00232 /* Tag the call with call numbers. 00233 * Note: Tagging is hard-coded here, but this place might be a core 00234 * for real PBX call-id handling. 00235 */ 00236 this.call.setSourceCallNumber( 0x3141 ); 00237 this.call.setDestinationCallNumber( 0x5926 ); 00238 } 00239 00240 /** 00241 * Stops transmitting and c cleans up resources (local and used by the calls). 00242 */ 00243 public void cleanUp () 00244 { 00245 if ( this.call != null ) 00246 { 00247 this.call.cleanUp (); 00248 this.transmitting = false; 00249 } 00250 } 00251 00252 /** 00253 * Sends packet to remote peer over datagram channel 00254 */ 00255 public void send( OctetBuffer pdu ) 00256 { 00257 if ( this.transmitting ) 00258 { 00259 this.socket.send( pdu, this.remoteAddr, this.remotePort ); 00260 } 00261 } 00262 }