• Main Page
  • Related Pages
  • Packages
  • Classes
  • Files
  • File List

protocol/RemotePeer.java

Go to the documentation of this file.
00001 
00002 package protocol;
00003 
00004 import java.net.InetAddress;
00005 import java.util.Vector;
00006 
00007 import utils.Log;
00008 import utils.OctetBuffer;
00009 
00010 /**
00011  *  Encapsulates the link between the UDP channel and a CallContext.
00012  *  UDP channel receives packets from all remote peers and dispatches them 
00013  *  to particular RemotePeer handler. RemotePeer might have multiple calls
00014  *  in real PBX, howerever, this implementation allows only single CallContext
00015  *  per RemotePeer.
00016  *  
00017  *  @author Mikica B Kocic
00018  */
00019 public class RemotePeer implements Runnable
00020 {
00021     /** The UDP channel */
00022     private DatagramChannel socket = null;
00023     
00024     /** The call context */
00025     private CallContext call = null;
00026 
00027     /** PBXClient's (chat server's) User ID of the remote peer */
00028     private String remoteUserId = null;
00029     
00030     /** Remote IP address where to send PDUs */
00031     private InetAddress remoteAddr = null;
00032     
00033     /** Remote UDP port where to send PDUs */
00034     private int remotePort = -1;
00035 
00036     /** The queue of incoming PDUs from remote peer via our UDP channel */
00037     protected Vector<byte[]> inboundPDUs = null;
00038     
00039     /** The receiving process thread. */
00040     protected Thread pduReceiverThread = null;
00041 
00042     /** Indicates whether communication with peer is active or not */
00043     private volatile boolean transmitting = false;
00044 
00045     /** The time-stamp of the last received PDU */
00046     private long lastReceiverTimestamp = 0;
00047 
00048     /**
00049      *  Constructor for the RemotePeer object
00050      *
00051      *  @param socket           instance of DatagramChannel used for communication
00052      *  @param remoteUserId     peer's user ID
00053      *  @param remoteAddr       peer's IP address
00054      *  @param remotePort       peer's UDP port listening for ours PDUs       
00055      */
00056     public RemotePeer( DatagramChannel socket, String remoteUserId, 
00057             InetAddress remoteAddr, int remotePort )
00058     {
00059         this.remoteUserId = remoteUserId;
00060         this.remoteAddr   = remoteAddr;
00061         this.remotePort   = remotePort;
00062         
00063         this.call = null;
00064 
00065         this.inboundPDUs = new Vector<byte[]> ();
00066         
00067         synchronized( socket )
00068         {
00069             this.socket = socket;
00070             this.socket.addNewPeer( this );
00071             
00072             if ( this.remoteAddr != null 
00073                     && this.remotePort > 0 && this.remotePort <= 65535 ) 
00074             {
00075                 this.startReceiver ();
00076             }
00077         }
00078     }
00079 
00080     /**
00081      *  Returns remote name (remote userid) of the peer
00082      */
00083     public String getRemoteUserId ()
00084     {
00085         return remoteUserId;
00086     }
00087 
00088     /**
00089      *  This method starts the receiver thread for inbound PDUs.
00090      */
00091     synchronized private void startReceiver () 
00092     {
00093         this.transmitting = true;
00094 
00095         this.lastReceiverTimestamp = System.currentTimeMillis ();
00096             
00097         this.pduReceiverThread = new Thread( this, 
00098                 "Peer-" + remoteAddr.getHostAddress() + ":" + remotePort );
00099         
00100         this.pduReceiverThread.setPriority( Thread.MAX_PRIORITY - 1 );
00101         
00102         if ( this.call != null ) {
00103             this.call.resetClock ();
00104         }
00105         
00106         this.pduReceiverThread.start ();
00107     }
00108 
00109     /**
00110      *  Returns elapsed time since last received packet
00111      */
00112     synchronized public long receiverIdleTime ()
00113     {
00114         return System.currentTimeMillis () - this.lastReceiverTimestamp;
00115     }
00116 
00117     /**
00118      * Adds an incoming PDUs (as bytes) to the PDUs queue. We are
00119      * still on the recv thread; this data was received by binder and
00120      * passed to us via friend. The PDUs are stored in the queue and
00121      * we deal with them on our own thread to relief the recv thread.
00122      * In other words, this is the last thing we do on the recv thread!
00123      *
00124      * @param data  The PDU octets
00125      */
00126     synchronized public void addIncomingPDU( byte[] data ) 
00127     {
00128         if ( ! this.transmitting || data == null ) {
00129             return;
00130         }
00131 
00132         this.lastReceiverTimestamp = System.currentTimeMillis ();
00133         this.inboundPDUs.addElement( data );
00134         this.notifyAll ();
00135     }
00136 
00137     /**
00138      *  Manages the incoming PDUs stored in the PDUs queue.
00139      *  This thread is started by startReceiver() and run separately from the
00140      *  binder's receiver thread.
00141      */
00142     public void run () 
00143     {
00144         Log.trace( "Thread started" );
00145         
00146         while ( this.transmitting )
00147         {
00148             Object[] pdusToSend = new Object[0];
00149             
00150             synchronized( this ) 
00151             {
00152                 try {
00153                     this.wait ();
00154                 }
00155                 catch( InterruptedException e ) {
00156                     /* ignored */
00157                 }
00158                 
00159                 int pduCount = this.inboundPDUs.size ();
00160                 
00161                 // Do some smart stuff here? (limit the take to only 20 PDUs?)
00162                 // You'd hope that normally we'd get 1 or maybe 2 PDUs here.
00163                 //
00164                 if ( pduCount > 0 )
00165                 {
00166                     pdusToSend = new Object[ pduCount ];
00167                     for ( int i = 0; i < pduCount; ++i ) {
00168                         pdusToSend[i] = this.inboundPDUs.elementAt(i);
00169                     }
00170                     this.inboundPDUs.removeAllElements ();
00171                 }
00172             }
00173             
00174             // After releasing the lock, let's deal with the list...
00175             // we are now on the thread of the call, any time we waste is
00176             // our own.
00177             // should really sort these into sequence before we dispose of them.
00178             
00179             for( int i = 0; i < pdusToSend.length; ++i )
00180             {
00181                 try {
00182                     parsePDU( (byte[]) pdusToSend[i] );
00183                 }
00184                 catch( Throwable e ) {
00185                     Log.error( "ParsePDU failed; " + e.toString () );
00186                     Log.where ();
00187                 }
00188             }
00189         }
00190         
00191         this.socket = null;
00192         this.call = null;
00193     }
00194 
00195     /**
00196      *  Deals with newly received PDU octets. This method encapsulates them
00197      *  into a instance of PDU class, deal with internal counters, sends an
00198      *  acknowledgement and notifies the PDU it has arrived.
00199      */
00200     void parsePDU( byte[] octets ) 
00201     {
00202         if ( this.call != null && octets != null ) 
00203         {
00204             /* Parse PDU
00205              */
00206             ProtocolDataUnit pdu = ProtocolDataUnit.create( this.call, octets );
00207 
00208             /* Dispatch PDU if it is tagged with valid call numbers.
00209              * Note: Dispatching is hard-coded here, but this place might be
00210              * a core for real PBX call-id handling.
00211              */
00212             if ( pdu != null && pdu.destinationCallNumber == 0x5926 
00213                              && pdu.sourceCallNumber == 0x3141 )
00214             {
00215                 pdu.onArrivedPDU ();
00216             }
00217             else if ( pdu != null )
00218             {
00219                 Log.warn( "Ignored PDU with destCall# " + pdu.destinationCallNumber
00220                         + ", srcCalL# " + pdu.sourceCallNumber );
00221             }
00222         }
00223     }
00224 
00225     /**
00226      *  Adds the new (not owned) call to the peer.
00227      */
00228     public void addNewCall( CallContext call )
00229     {
00230         this.call = call;
00231         
00232         /* Tag the call with call numbers. 
00233          * Note: Tagging is hard-coded here, but this place might be a core 
00234          * for real PBX call-id handling.
00235          */
00236         this.call.setSourceCallNumber( 0x3141 );
00237         this.call.setDestinationCallNumber( 0x5926 );
00238     }
00239 
00240     /**
00241      *  Stops transmitting and c cleans up resources (local and used by the calls).
00242      */
00243     public void cleanUp ()
00244     {
00245         if ( this.call != null )
00246         {
00247             this.call.cleanUp ();
00248             this.transmitting = false;
00249         }
00250     }
00251 
00252     /**
00253      *  Sends packet to remote peer over datagram channel
00254      */
00255     public void send( OctetBuffer pdu )
00256     {
00257         if ( this.transmitting ) 
00258         {
00259             this.socket.send( pdu, this.remoteAddr, this.remotePort );
00260         }
00261     }
00262 }

Generated on Thu Dec 16 2010 14:44:42 for VoIP Kryptofon by  doxygen 1.7.2