1111import java .util .List ;
1212import java .util .concurrent .BlockingQueue ;
1313import java .util .concurrent .LinkedBlockingQueue ;
14+ import java .util .concurrent .atomic .AtomicLong ;
1415
1516import org .java_websocket .drafts .Draft ;
1617import org .java_websocket .drafts .Draft .CloseHandshakeType ;
@@ -57,6 +58,8 @@ public enum Role {
5758 public static final int READY_STATE_CLOSING = 2 ;
5859 public static final int READY_STATE_CLOSED = 3 ;
5960
61+ public static int BUFFERSIZE = 512 ;
62+
6063 /**
6164 * The default port of WebSockets, as defined in the spec. If the nullary
6265 * constructor is used, DEFAULT_PORT will be the port the WebSocketServer
@@ -92,11 +95,12 @@ public enum Role {
9295 * Queue of buffers that need to be sent to the client.
9396 */
9497 private BlockingQueue <ByteBuffer > bufferQueue ;
98+
9599 /**
96100 * The amount of bytes still in queue to be sent, at every given time.
97101 * It's updated at every send/sent operation.
98102 */
99- private Long bufferQueueTotalAmount = ( long ) 0 ;
103+ private AtomicLong bufferQueueTotalAmount = new AtomicLong ( 0l ) ;
100104
101105 private Draft draft = null ;
102106
@@ -143,7 +147,7 @@ public WebSocket( WebSocketListener listener , List<Draft> drafts , SocketChanne
143147 private void init ( WebSocketListener listener , Draft draft , SocketChannel socketchannel ) {
144148 this .sockchannel = socketchannel ;
145149 this .bufferQueue = new LinkedBlockingQueue <ByteBuffer >( 3 );
146- this .socketBuffer = ByteBuffer .allocate ( 65558 );
150+ this .socketBuffer = ByteBuffer .allocate ( BUFFERSIZE );
147151 socketBuffer .flip ();
148152 this .wsl = listener ;
149153 this .role = Role .CLIENT ;
@@ -485,7 +489,7 @@ public void close( InvalidDataException e ) {
485489 * @throws InterruptedException
486490 * @throws NotYetConnectedException
487491 */
488- public void send ( String text ) throws IllegalArgumentException , NotYetConnectedException , InterruptedException {
492+ public void send ( String text ) throws NotYetConnectedException , InterruptedException {
489493 if ( text == null )
490494 throw new IllegalArgumentException ( "Cannot send 'null' data to a WebSocket." );
491495 send ( draft .createFrames ( text , role == Role .CLIENT ) );
@@ -498,12 +502,16 @@ public void send( String text ) throws IllegalArgumentException , NotYetConnecte
498502 * @throws InterruptedException
499503 * @throws NotYetConnectedException
500504 */
501- public void send ( byte [] bytes ) throws IllegalArgumentException , NotYetConnectedException , InterruptedException {
505+ public void send ( ByteBuffer bytes ) throws IllegalArgumentException , NotYetConnectedException , InterruptedException {
502506 if ( bytes == null )
503507 throw new IllegalArgumentException ( "Cannot send 'null' data to a WebSocket." );
504508 send ( draft .createFrames ( bytes , role == Role .CLIENT ) );
505509 }
506510
511+ public void send ( byte [] bytes ) throws IllegalArgumentException , NotYetConnectedException , InterruptedException {
512+ send ( ByteBuffer .wrap ( bytes ) );
513+ }
514+
507515 private void send ( Collection <Framedata > frames ) throws InterruptedException {
508516 if ( !this .handshakeComplete )
509517 throw new NotYetConnectedException ();
@@ -534,7 +542,7 @@ boolean hasBufferedData() {
534542 * @return Amount of Data still in Queue and not sent yet of the socket
535543 */
536544 long bufferedDataAmount () {
537- return bufferQueueTotalAmount ;
545+ return bufferQueueTotalAmount . get () ;
538546 }
539547
540548 /**
@@ -545,16 +553,13 @@ public void flush() throws IOException {
545553 ByteBuffer buffer = this .bufferQueue .peek ();
546554
547555 while ( buffer != null ) {
548- sockchannel .write ( buffer );
556+ int written = sockchannel .write ( buffer );
549557 if ( buffer .remaining () > 0 ) {
550558 // there is still stuff to send for this buffer
551559 continue ;
552560 } else {
553- // nothing left for this buffer
554- synchronized ( bufferQueueTotalAmount ) {
555- // subtract this amount of data from the total queued (synchronized over this object)
556- bufferQueueTotalAmount -= buffer .limit ();
557- }
561+ // subtract this amount of data from the total queued (synchronized over this object)
562+ bufferQueueTotalAmount .addAndGet ( -written );
558563
559564 // timing callback
560565 if ( isConnecting () ) {
@@ -608,13 +613,11 @@ public void startHandshake( ClientHandshakeBuilder handshakedata ) throws Invali
608613
609614 private void channelWrite ( ByteBuffer buf ) throws InterruptedException {
610615 if ( DEBUG )
611- System .out .println ( "write(" + buf .limit () + "): {" + ( buf .limit () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
616+ System .out .println ( "write(" + buf .remaining () + "): {" + ( buf .remaining () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
617+
618+ // add up the number of bytes to the total queued (synchronized over this object)
619+ bufferQueueTotalAmount .addAndGet ( buf .remaining () );
612620
613- buf .rewind (); // TODO rewinding should not be nessesary
614- synchronized ( bufferQueueTotalAmount ) {
615- // add up the number of bytes to the total queued (synchronized over this object)
616- bufferQueueTotalAmount += buf .limit ();
617- }
618621 if ( !bufferQueue .offer ( buf ) ) {
619622 try {
620623 flush ();
0 commit comments