diff --git a/.gitignore b/.gitignore index 8f1fdc779..a705c1129 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ target *.log #Ignore Test Output -test-output \ No newline at end of file +test-output +/.checkstyle diff --git a/pom.xml b/pom.xml index b1b6d96a9..5cc227ee3 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,5 @@ - + 4.0.0 @@ -67,7 +68,7 @@ 1.7.21 1.54 - 2015-01-27T15-02-14 + 2.0.4 19.0 @@ -115,11 +116,15 @@ ${jersey.version} - de.gesellix - unix-socket-factory - ${unix-socket-factory.version} + com.kohlschutter.junixsocket + junixsocket-common + ${junixsocket.version} + + + com.kohlschutter.junixsocket + junixsocket-native-common + ${junixsocket.version} - org.apache.commons commons-compress @@ -266,13 +271,6 @@ - - - - - - - @@ -419,7 +417,7 @@ true 1 integration - integration-auth + integration-auth **/*Test.java @@ -483,7 +481,8 @@ true true false - + src/test/resources/checkstyle/checkstyle-config.xml diff --git a/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java b/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java index f120012af..fc48f57c3 100644 --- a/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java +++ b/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java @@ -75,11 +75,13 @@ public void onComplete() { @Override public void close() throws IOException { - closed = true; - if (stream != null) { - stream.close(); + if (!closed) { + closed = true; + if (stream != null) { + stream.close(); + } + completed.countDown(); } - completed.countDown(); } /** diff --git a/src/main/java/com/github/dockerjava/jaxrs/util/WrappedResponseInputStream.java b/src/main/java/com/github/dockerjava/jaxrs/util/WrappedResponseInputStream.java index afe0dce92..4ec74ddcc 100644 --- a/src/main/java/com/github/dockerjava/jaxrs/util/WrappedResponseInputStream.java +++ b/src/main/java/com/github/dockerjava/jaxrs/util/WrappedResponseInputStream.java @@ -53,9 +53,12 @@ public int available() throws IOException { } public void close() throws IOException { + if (closed) { + return; + } closed = true; - response.close(); delegate.close(); + response.close(); } public void mark(int readlimit) { diff --git a/src/main/java/org/apache/http/impl/io/ChunkedInputStream.java b/src/main/java/org/apache/http/impl/io/ChunkedInputStream.java new file mode 100644 index 000000000..17c339e3e --- /dev/null +++ b/src/main/java/org/apache/http/impl/io/ChunkedInputStream.java @@ -0,0 +1,337 @@ +// Modified version (see https://github.com/docker-java/docker-java/pull/697) +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.impl.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.http.ConnectionClosedException; +import org.apache.http.Header; +import org.apache.http.HttpException; +import org.apache.http.MalformedChunkCodingException; +import org.apache.http.TruncatedChunkException; +import org.apache.http.config.MessageConstraints; +import org.apache.http.io.BufferInfo; +import org.apache.http.io.SessionInputBuffer; +import org.apache.http.util.Args; +import org.apache.http.util.CharArrayBuffer; + +/** + * Implements chunked transfer coding. The content is received in small chunks. + * Entities transferred using this input stream can be of unlimited length. + * After the stream is read to the end, it provides access to the trailers, + * if any. + *

+ * Note that this class NEVER closes the underlying stream, even when close + * gets called. Instead, it will read until the "end" of its chunking on + * close, which allows for the seamless execution of subsequent HTTP 1.1 + * requests, while not requiring the client to remember to read the entire + * contents of the response. + * + * + * @since 4.0 + * + */ +public class ChunkedInputStream extends InputStream { + + private static final int CHUNK_LEN = 1; + private static final int CHUNK_DATA = 2; + private static final int CHUNK_CRLF = 3; + private static final int CHUNK_INVALID = Integer.MAX_VALUE; + + private static final int BUFFER_SIZE = 2048; + + /** The session input buffer */ + private final SessionInputBuffer in; + private final CharArrayBuffer buffer; + private final MessageConstraints constraints; + + private int state; + + /** The chunk size */ + private long chunkSize; + + /** The current position within the current chunk */ + private long pos; + + /** True if we've reached the end of stream */ + private boolean eof = false; + + /** True if this stream is closed */ + private boolean closed = false; + + private Header[] footers = new Header[] {}; + + /** + * Wraps session input stream and reads chunk coded input. + * + * @param in The session input buffer + * @param constraints Message constraints. If {@code null} + * {@link MessageConstraints#DEFAULT} will be used. + * + * @since 4.4 + */ + public ChunkedInputStream(final SessionInputBuffer in, final MessageConstraints constraints) { + super(); + this.in = Args.notNull(in, "Session input buffer"); + this.pos = 0L; + this.buffer = new CharArrayBuffer(16); + this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT; + this.state = CHUNK_LEN; + } + + /** + * Wraps session input stream and reads chunk coded input. + * + * @param in The session input buffer + */ + public ChunkedInputStream(final SessionInputBuffer in) { + this(in, null); + } + + @Override + public int available() throws IOException { + if (this.in instanceof BufferInfo) { + final int len = ((BufferInfo) this.in).length(); + return (int) Math.min(len, this.chunkSize - this.pos); + } else { + return 0; + } + } + + /** + *

Returns all the data in a chunked stream in coalesced form. A chunk + * is followed by a CRLF. The method returns -1 as soon as a chunksize of 0 + * is detected.

+ * + *

Trailer headers are read automatically at the end of the stream and + * can be obtained with the getResponseFooters() method.

+ * + * @return -1 of the end of the stream has been reached or the next data + * byte + * @throws IOException in case of an I/O error + */ + @Override + public int read() throws IOException { + if (this.closed) { + throw new IOException("Attempted read from closed stream."); + } + if (this.eof) { + return -1; + } + if (state != CHUNK_DATA) { + nextChunk(); + if (this.eof) { + return -1; + } + } + final int b = in.read(); + if (b != -1) { + pos++; + if (pos >= chunkSize) { + state = CHUNK_CRLF; + } + } + return b; + } + + /** + * Read some bytes from the stream. + * @param b The byte array that will hold the contents from the stream. + * @param off The offset into the byte array at which bytes will start to be + * placed. + * @param len the maximum number of bytes that can be returned. + * @return The number of bytes returned or -1 if the end of stream has been + * reached. + * @throws IOException in case of an I/O error + */ + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + + if (closed) { + throw new IOException("Attempted read from closed stream."); + } + + if (eof) { + return -1; + } + if (state != CHUNK_DATA) { + nextChunk(); + if (eof) { + return -1; + } + } + final int bytesRead = in.read(b, off, (int) Math.min(len, chunkSize - pos)); + if (bytesRead != -1) { + pos += bytesRead; + if (pos >= chunkSize) { + state = CHUNK_CRLF; + } + return bytesRead; + } else { + eof = true; + throw new TruncatedChunkException("Truncated chunk " + + "( expected size: " + chunkSize + + "; actual size: " + pos + ")"); + } + } + + /** + * Read some bytes from the stream. + * @param b The byte array that will hold the contents from the stream. + * @return The number of bytes returned or -1 if the end of stream has been + * reached. + * @throws IOException in case of an I/O error + */ + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * Read the next chunk. + * @throws IOException in case of an I/O error + */ + private void nextChunk() throws IOException { + if (state == CHUNK_INVALID) { + throw new MalformedChunkCodingException("Corrupt data stream"); + } + try { + chunkSize = getChunkSize(); + if (chunkSize < 0L) { + throw new MalformedChunkCodingException("Negative chunk size"); + } + state = CHUNK_DATA; + pos = 0L; + if (chunkSize == 0L) { + eof = true; + parseTrailerHeaders(); + } + } catch (MalformedChunkCodingException ex) { + state = CHUNK_INVALID; + throw ex; + } + } + + /** + * Expects the stream to start with a chunksize in hex with optional + * comments after a semicolon. The line must end with a CRLF: "a3; some + * comment\r\n" Positions the stream at the start of the next line. + */ + private long getChunkSize() throws IOException { + final int st = this.state; + switch (st) { + case CHUNK_CRLF: + this.buffer.clear(); + final int bytesRead1 = this.in.readLine(this.buffer); + if (bytesRead1 == -1) { + throw new MalformedChunkCodingException( + "CRLF expected at end of chunk"); + } + if (!this.buffer.isEmpty()) { + throw new MalformedChunkCodingException( + "Unexpected content at the end of chunk"); + } + state = CHUNK_LEN; + //$FALL-THROUGH$ + case CHUNK_LEN: + this.buffer.clear(); + final int bytesRead2 = this.in.readLine(this.buffer); + if (bytesRead2 == -1) { + throw new ConnectionClosedException("Premature end of chunk coded message body: " + + "closing chunk expected"); + } + int separator = this.buffer.indexOf(';'); + if (separator < 0) { + separator = this.buffer.length(); + } + final String s = this.buffer.substringTrimmed(0, separator); + try { + return Long.parseLong(s, 16); + } catch (final NumberFormatException e) { + throw new MalformedChunkCodingException("Bad chunk header: " + s); + } + default: + throw new IllegalStateException("Inconsistent codec state"); + } + } + + /** + * Reads and stores the Trailer headers. + * @throws IOException in case of an I/O error + */ + private void parseTrailerHeaders() throws IOException { + try { + this.footers = AbstractMessageParser.parseHeaders(in, + constraints.getMaxHeaderCount(), + constraints.getMaxLineLength(), + null); + } catch (final HttpException ex) { + final IOException ioe = new MalformedChunkCodingException("Invalid footer: " + + ex.getMessage()); + ioe.initCause(ex); + throw ioe; + } + } + + /** + * Upon close, this reads the remainder of the chunked message, + * leaving the underlying socket at a position to start reading the + * next response without scanning. + * @throws IOException in case of an I/O error + */ + @Override + public void close() throws IOException { + if (!closed) { + try { + if (!eof && state != CHUNK_INVALID) { + // read and discard the remainder of the message + final byte[] buff = new byte[BUFFER_SIZE]; + try { + while (read(buff) >= 0) { + continue; + } + } catch (ConnectionClosedException e) { + // just ignore + } catch (TruncatedChunkException e) { + // just ignore + } + } + } finally { + eof = true; + closed = true; + } + } + } + + public Header[] getFooters() { + return this.footers.clone(); + } + +} diff --git a/src/main/java/org/newsclub/net/unix/AFUNIXSocketImpl.java b/src/main/java/org/newsclub/net/unix/AFUNIXSocketImpl.java new file mode 100644 index 000000000..869d987f2 --- /dev/null +++ b/src/main/java/org/newsclub/net/unix/AFUNIXSocketImpl.java @@ -0,0 +1,415 @@ +// Modified version (see https://github.com/docker-java/docker-java/pull/697) +/** + * junixsocket + * + * Copyright (c) 2009,2014 Christian Kohlschütter + * + * The author licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.newsclub.net.unix; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketImpl; +import java.net.SocketOptions; + +/** + * The Java-part of the {@link AFUNIXSocket} implementation. + * + * @author Christian Kohlschütter + */ +class AFUNIXSocketImpl extends SocketImpl { + private static final int SHUT_RD = 0; + private static final int SHUT_WR = 1; + private static final int SHUT_RD_WR = 2; + + private String socketFile; + private boolean closed = false; + private boolean bound = false; + private boolean connected = false; + + private boolean closedInputStream = false; + private boolean closedOutputStream = false; + + private final AFUNIXInputStream in = new AFUNIXInputStream(); + private final AFUNIXOutputStream out = new AFUNIXOutputStream(); + + AFUNIXSocketImpl() { + super(); + this.fd = new FileDescriptor(); + } + + FileDescriptor getFD() { + return fd; + } + + @Override + protected void accept(SocketImpl socket) throws IOException { + final AFUNIXSocketImpl si = (AFUNIXSocketImpl) socket; + NativeUnixSocket.accept(socketFile, fd, si.fd); + si.socketFile = socketFile; + si.connected = true; + } + + @Override + protected int available() throws IOException { + return NativeUnixSocket.available(fd); + } + + protected void bind(SocketAddress addr) throws IOException { + bind(0, addr); + } + + protected void bind(int backlog, SocketAddress addr) throws IOException { + if (!(addr instanceof AFUNIXSocketAddress)) { + throw new SocketException("Cannot bind to this type of address: " + addr.getClass()); + } + final AFUNIXSocketAddress socketAddress = (AFUNIXSocketAddress) addr; + socketFile = socketAddress.getSocketFile(); + NativeUnixSocket.bind(socketFile, fd, backlog); + bound = true; + this.localport = socketAddress.getPort(); + } + + @Override + @SuppressWarnings("hiding") + protected void bind(InetAddress host, int port) throws IOException { + throw new SocketException("Cannot bind to this type of address: " + InetAddress.class); + } + + private void checkClose() throws IOException { + //if (closedInputStream && closedOutputStream) { + // close(); + //} + } + + @Override + protected synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (fd.valid()) { + NativeUnixSocket.shutdown(fd, SHUT_RD_WR); + NativeUnixSocket.close(fd); + } + if (bound) { + NativeUnixSocket.unlink(socketFile); + } + connected = false; + } + + @Override + @SuppressWarnings("hiding") + protected void connect(String host, int port) throws IOException { + throw new SocketException("Cannot bind to this type of address: " + InetAddress.class); + } + + @Override + @SuppressWarnings("hiding") + protected void connect(InetAddress address, int port) throws IOException { + throw new SocketException("Cannot bind to this type of address: " + InetAddress.class); + } + + @Override + protected void connect(SocketAddress addr, int timeout) throws IOException { + if (!(addr instanceof AFUNIXSocketAddress)) { + throw new SocketException("Cannot bind to this type of address: " + addr.getClass()); + } + final AFUNIXSocketAddress socketAddress = (AFUNIXSocketAddress) addr; + socketFile = socketAddress.getSocketFile(); + NativeUnixSocket.connect(socketFile, fd); + this.address = socketAddress.getAddress(); + this.port = socketAddress.getPort(); + this.localport = 0; + this.connected = true; + } + + @Override + protected void create(boolean stream) throws IOException { + } + + @Override + protected InputStream getInputStream() throws IOException { + if (!connected && !bound) { + throw new IOException("Not connected/not bound"); + } + return in; + } + + @Override + protected OutputStream getOutputStream() throws IOException { + if (!connected && !bound) { + throw new IOException("Not connected/not bound"); + } + return out; + } + + @Override + protected void listen(int backlog) throws IOException { + NativeUnixSocket.listen(fd, backlog); + } + + @Override + protected void sendUrgentData(int data) throws IOException { + NativeUnixSocket.write(fd, new byte[] {(byte) (data & 0xFF)}, 0, 1); + } + + private final class AFUNIXInputStream extends InputStream { + private boolean streamClosed = false; + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + if (streamClosed) { + throw new IOException("This InputStream has already been closed."); + } + if (len == 0) { + return 0; + } + if (closed) { + return -1; + } + int maxRead = buf.length - off; + if (len > maxRead) { + len = maxRead; + } + try { + return NativeUnixSocket.read(fd, buf, off, len); + } catch (final IOException e) { + throw (IOException) new IOException(e.getMessage() + " at " + + AFUNIXSocketImpl.this.toString()).initCause(e); + } + } + + @Override + public int read() throws IOException { + final byte[] buf1 = new byte[1]; + final int numRead = read(buf1, 0, 1); + if (numRead <= 0) { + return -1; + } else { + return buf1[0] & 0xFF; + } + } + + @Override + public void close() throws IOException { + if (streamClosed) { + return; + } + streamClosed = true; + if (fd.valid()) { + NativeUnixSocket.shutdown(fd, SHUT_RD); + } + + closedInputStream = true; + checkClose(); + } + + @Override + public int available() throws IOException { + final int av = NativeUnixSocket.available(fd); + return av; + } + } + + private final class AFUNIXOutputStream extends OutputStream { + private boolean streamClosed = false; + + @Override + public void write(int oneByte) throws IOException { + final byte[] buf1 = new byte[] {(byte) oneByte}; + write(buf1, 0, 1); + } + + @Override + public void write(byte[] buf, int off, int len) throws IOException { + if (streamClosed) { + throw new AFUNIXSocketException("This OutputStream has already been closed."); + } + if (len > buf.length - off) { + throw new IndexOutOfBoundsException(); + } + try { + while (len > 0 && !Thread.interrupted()) { + final int written = NativeUnixSocket.write(fd, buf, off, len); + if (written == -1) { + throw new IOException("Unspecific error while writing"); + } + len -= written; + off += written; + } + } catch (final IOException e) { + throw (IOException) new IOException(e.getMessage() + " at " + + AFUNIXSocketImpl.this.toString()).initCause(e); + } + } + + @Override + public void close() throws IOException { + if (streamClosed) { + return; + } + streamClosed = true; + if (fd.valid()) { + NativeUnixSocket.shutdown(fd, SHUT_WR); + } + closedOutputStream = true; + checkClose(); + } + } + + @Override + public String toString() { + return super.toString() + "[fd=" + fd + "; file=" + this.socketFile + "; connected=" + + connected + "; bound=" + bound + "]"; + } + + private static int expectInteger(Object value) throws SocketException { + try { + return (Integer) value; + } catch (final ClassCastException e) { + throw new AFUNIXSocketException("Unsupported value: " + value, e); + } catch (final NullPointerException e) { + throw new AFUNIXSocketException("Value must not be null", e); + } + } + + private static int expectBoolean(Object value) throws SocketException { + try { + return ((Boolean) value).booleanValue() ? 1 : 0; + } catch (final ClassCastException e) { + throw new AFUNIXSocketException("Unsupported value: " + value, e); + } catch (final NullPointerException e) { + throw new AFUNIXSocketException("Value must not be null", e); + } + } + + @Override + public Object getOption(int optID) throws SocketException { + try { + switch (optID) { + case SocketOptions.SO_KEEPALIVE: + case SocketOptions.TCP_NODELAY: + return NativeUnixSocket.getSocketOptionInt(fd, optID) != 0 ? true : false; + case SocketOptions.SO_LINGER: + case SocketOptions.SO_TIMEOUT: + case SocketOptions.SO_RCVBUF: + case SocketOptions.SO_SNDBUF: + return NativeUnixSocket.getSocketOptionInt(fd, optID); + default: + throw new AFUNIXSocketException("Unsupported option: " + optID); + } + } catch (final AFUNIXSocketException e) { + throw e; + } catch (final Exception e) { + throw new AFUNIXSocketException("Error while getting option", e); + } + } + + @Override + public void setOption(int optID, Object value) throws SocketException { + try { + switch (optID) { + case SocketOptions.SO_LINGER: + + if (value instanceof Boolean) { + final boolean b = (Boolean) value; + if (b) { + throw new SocketException("Only accepting Boolean.FALSE here"); + } + NativeUnixSocket.setSocketOptionInt(fd, optID, -1); + return; + } + NativeUnixSocket.setSocketOptionInt(fd, optID, expectInteger(value)); + return; + case SocketOptions.SO_RCVBUF: + case SocketOptions.SO_SNDBUF: + case SocketOptions.SO_TIMEOUT: + NativeUnixSocket.setSocketOptionInt(fd, optID, expectInteger(value)); + return; + case SocketOptions.SO_KEEPALIVE: + case SocketOptions.TCP_NODELAY: + NativeUnixSocket.setSocketOptionInt(fd, optID, expectBoolean(value)); + return; + default: + throw new AFUNIXSocketException("Unsupported option: " + optID); + } + } catch (final AFUNIXSocketException e) { + throw e; + } catch (final Exception e) { + throw new AFUNIXSocketException("Error while setting option", e); + } + } + + @Override + protected void shutdownInput() throws IOException { + if (!closed && fd.valid()) { + NativeUnixSocket.shutdown(fd, SHUT_RD); + } + } + + @Override + protected void shutdownOutput() throws IOException { + if (!closed && fd.valid()) { + NativeUnixSocket.shutdown(fd, SHUT_WR); + } + } + + /** + * Changes the behavior to be somewhat lenient with respect to the specification. + * + * In particular, we ignore calls to {@link Socket#getTcpNoDelay()} and + * {@link Socket#setTcpNoDelay(boolean)}. + */ + static class Lenient extends AFUNIXSocketImpl { + Lenient() { + super(); + } + + @Override + public void setOption(int optID, Object value) throws SocketException { + try { + super.setOption(optID, value); + } catch (SocketException e) { + switch (optID) { + case SocketOptions.TCP_NODELAY: + return; + default: + throw e; + } + } + } + + @Override + public Object getOption(int optID) throws SocketException { + try { + return super.getOption(optID); + } catch (SocketException e) { + switch (optID) { + case SocketOptions.TCP_NODELAY: + case SocketOptions.SO_KEEPALIVE: + return false; + default: + throw e; + } + } + } + } +} diff --git a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java index 939f95087..d05327a06 100644 --- a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java @@ -17,11 +17,14 @@ import com.github.dockerjava.api.model.Event; import com.github.dockerjava.client.AbstractDockerClientTest; +/* + * NOTE: These tests may fail if there is a difference between local and daemon time + * (this is especially a problem when using boot2docker as time may not in sync + * with the virtualbox host system) + */ @Test(groups = "integration") public class EventsCmdImplTest extends AbstractDockerClientTest { - private static int KNOWN_NUM_EVENTS = 4; - private static String getEpochTime() { return String.valueOf(System.currentTimeMillis() / 1000); } @@ -46,9 +49,6 @@ public void afterMethod(ITestResult result) { super.afterMethod(result); } - /* - * This specific test may fail with boot2docker as time may not in sync with host system - */ @Test public void testEventStreamTimeBound() throws Exception { // Don't include other tests events @@ -58,82 +58,84 @@ public void testEventStreamTimeBound() throws Exception { int expectedEvents = generateEvents(); String endTime = getEpochTime(); - CountDownLatch countDownLatch = new CountDownLatch(expectedEvents); - EventsTestCallback eventCallback = new EventsTestCallback(countDownLatch); - - dockerClient.eventsCmd().withSince(startTime).withUntil(endTime).exec(eventCallback); + EventsTestCallback eventCallback = new EventsTestCallback(expectedEvents); - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); + dockerClient.eventsCmd() + .withSince(startTime) + .withUntil(endTime) + .exec(eventCallback); - eventCallback.close(); + List events = eventCallback.awaitExpectedEvents(3, TimeUnit.MINUTES); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); + // we may receive more events as expected + assertTrue(events.size() >= expectedEvents, "Received events: " + events); } @Test - public void testEventStreaming1() throws Exception { - // Don't include other tests events - TimeUnit.SECONDS.sleep(1); - - CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS); - EventsTestCallback eventCallback = new EventsTestCallback(countDownLatch); - - dockerClient.eventsCmd().withSince(getEpochTime()).exec(eventCallback); - - generateEvents(); - - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); - - eventCallback.close(); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); - } - - @Test - public void testEventStreaming2() throws Exception { + public void testEventStreaming() throws Exception { // Don't include other tests events TimeUnit.SECONDS.sleep(1); + + String startTime = getEpochTime(); + int expectedEvents = generateEvents(); - CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS); - EventsTestCallback eventCallback = new EventsTestCallback(countDownLatch); + EventsTestCallback eventCallback = new EventsTestCallback(expectedEvents); - dockerClient.eventsCmd().withSince(getEpochTime()).exec(eventCallback); + dockerClient.eventsCmd() + .withSince(startTime) + .exec(eventCallback); generateEvents(); - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); - - eventCallback.close(); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); + List events = eventCallback.awaitExpectedEvents(3, TimeUnit.MINUTES); + + // we may receive more events as expected + assertTrue(events.size() >= expectedEvents, "Received events: " + events); } + public void testEventStreamingWithFilter() throws Exception { // Don't include other tests events TimeUnit.SECONDS.sleep(1); + + String startTime = getEpochTime(); + int expectedEvents = 1; - CountDownLatch countDownLatch = new CountDownLatch(1); - EventsTestCallback eventCallback = dockerClient.eventsCmd().withEventFilter("start") - .exec(new EventsTestCallback(countDownLatch)); + EventsTestCallback eventCallback = new EventsTestCallback(expectedEvents); + + dockerClient.eventsCmd() + .withSince(startTime) + .withEventFilter("start") + .exec(eventCallback); generateEvents(); - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); - - eventCallback.close(); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); + List events = eventCallback.awaitExpectedEvents(3, TimeUnit.MINUTES); + + // we should get exactly one "start" event here + assertEquals(events.size(), expectedEvents, "Received events: " + events); } /** - * This method generates {#link KNOWN_NUM_EVENTS} events + * This method generates some events and returns the number of events being generated */ private int generateEvents() throws Exception { - String testImage = "busybox"; + String testImage = "busybox:latest"; dockerClient.pullImageCmd(testImage).exec(new PullImageResultCallback()).awaitSuccess(); - CreateContainerResponse container = dockerClient.createContainerCmd(testImage).withCmd("sleep", "9999").exec(); dockerClient.startContainerCmd(container.getId()).exec(); - dockerClient.stopContainerCmd(container.getId()).exec(); - return KNOWN_NUM_EVENTS; + dockerClient.stopContainerCmd(container.getId()).withTimeout(1).exec(); + + // generates 5 events with remote api 1.24: + + // Event[status=pull,id=busybox:latest,from=,node=,type=IMAGE,action=pull,actor=com.github.dockerjava.api.model.EventActor@417db6d7[id=busybox:latest,attributes={name=busybox}],time=1473455186,timeNano=1473455186436681587] + // Event[status=create,id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,from=busybox:latest,node=,type=CONTAINER,action=create,actor=com.github.dockerjava.api.model.EventActor@40bcec[id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,attributes={image=busybox:latest, name=sick_lamport}],time=1473455186,timeNano=1473455186470713257] + // Event[status=,id=,from=,node=,type=NETWORK,action=connect,actor=com.github.dockerjava.api.model.EventActor@318a1b01[id=10870ceb13abb7cf841ea68868472da881b33c8ed08d2cde7dbb39d7c24d1d27,attributes={container=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c, name=bridge, type=bridge}],time=1473455186,timeNano=1473455186544318466] + // Event[status=start,id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,from=busybox:latest,node=,type=CONTAINER,action=start,actor=com.github.dockerjava.api.model.EventActor@606f43a3[id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,attributes={image=busybox:latest, name=sick_lamport}],time=1473455186,timeNano=1473455186786163819] + // Event[status=kill,id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,from=busybox:latest,node=,type=CONTAINER,action=kill,actor=com.github.dockerjava.api.model.EventActor@72a9ffcf[id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,attributes={image=busybox:latest, name=sick_lamport, signal=15}],time=1473455186,timeNano=1473455186792963392] + + return 5; } private class EventsTestCallback extends EventsResultCallback { @@ -142,18 +144,24 @@ private class EventsTestCallback extends EventsResultCallback { private final List events = new ArrayList(); - public EventsTestCallback(CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; + public EventsTestCallback(int expextedEvents) { + this.countDownLatch = new CountDownLatch(expextedEvents); } public void onNext(Event event) { LOG.info("Received event #{}: {}", countDownLatch.getCount(), event); - countDownLatch.countDown(); events.add(event); + countDownLatch.countDown(); } - - public List getEvents() { - return new ArrayList(events); + + public List awaitExpectedEvents(long timeout, TimeUnit unit ) { + try { + countDownLatch.await(timeout, unit); + close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new ArrayList(events); } } } diff --git a/src/test/java/com/github/dockerjava/core/command/StopContainerCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/StopContainerCmdImplTest.java index 382e5b29c..7ae6a09cc 100644 --- a/src/test/java/com/github/dockerjava/core/command/StopContainerCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/StopContainerCmdImplTest.java @@ -69,11 +69,9 @@ public void testStopContainer() throws DockerException { assertThat(inspectContainerResponse.getState().getRunning(), is(equalTo(false))); final Integer exitCode = inspectContainerResponse.getState().getExitCode(); - if (apiVersion.equals(VERSION_1_22)) { - assertThat(exitCode, is(0)); - } else { - assertThat(exitCode, not(0)); - } + + assertThat(exitCode, is(137)); + } @Test(expectedExceptions = NotFoundException.class) diff --git a/src/test/java/com/github/dockerjava/core/command/UpdateContainerCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/UpdateContainerCmdImplTest.java index 3c7a9fd47..df53d671b 100644 --- a/src/test/java/com/github/dockerjava/core/command/UpdateContainerCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/UpdateContainerCmdImplTest.java @@ -79,7 +79,7 @@ public void updateContainer() throws DockerException, IOException { // .withCpusetCpus("0") // depends on env .withCpusetMems("0") .withMemory(314572800L) - .withMemorySwap(514288000L) +// .withMemorySwap(514288000L) Your kernel does not support swap limit capabilities, memory limited without swap. .withMemoryReservation(209715200L) // .withKernelMemory(52428800) Can not update kernel memory to a running container, please stop it first. .exec(); @@ -102,7 +102,7 @@ public void updateContainer() throws DockerException, IOException { assertThat(afterHostConfig.getCpusetMems(), is("0")); assertThat(afterHostConfig.getMemoryReservation(), is(209715200L)); - assertThat(afterHostConfig.getMemorySwap(), is(514288000L)); +// assertThat(afterHostConfig.getMemorySwap(), is(514288000L)); } diff --git a/src/test/java/com/github/dockerjava/netty/exec/EventsCmdExecTest.java b/src/test/java/com/github/dockerjava/netty/exec/EventsCmdExecTest.java index a634e2562..2a446569d 100644 --- a/src/test/java/com/github/dockerjava/netty/exec/EventsCmdExecTest.java +++ b/src/test/java/com/github/dockerjava/netty/exec/EventsCmdExecTest.java @@ -22,8 +22,6 @@ @Test(groups = "integration") public class EventsCmdExecTest extends AbstractNettyDockerClientTest { - private static int KNOWN_NUM_EVENTS = 4; - private static String getEpochTime() { return String.valueOf(System.currentTimeMillis() / 1000); } @@ -48,9 +46,6 @@ public void afterMethod(ITestResult result) { super.afterMethod(result); } - /* - * This specific test may fail with boot2docker as time may not in sync with host system - */ @Test public void testEventStreamTimeBound() throws Exception { // Don't include other tests events @@ -60,82 +55,84 @@ public void testEventStreamTimeBound() throws Exception { int expectedEvents = generateEvents(); String endTime = getEpochTime(); - CountDownLatch countDownLatch = new CountDownLatch(expectedEvents); - EventsTestCallback eventCallback = new EventsTestCallback(countDownLatch); - - dockerClient.eventsCmd().withSince(startTime).withUntil(endTime).exec(eventCallback); - - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); - - eventCallback.close(); - - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); - } - - @Test - public void testEventStreaming1() throws Exception { - // Don't include other tests events - TimeUnit.SECONDS.sleep(1); - - CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS); - EventsTestCallback eventCallback = new EventsTestCallback(countDownLatch); + EventsTestCallback eventCallback = new EventsTestCallback(expectedEvents); - dockerClient.eventsCmd().withSince(getEpochTime()).exec(eventCallback); + dockerClient.eventsCmd() + .withSince(startTime) + .withUntil(endTime) + .exec(eventCallback); - generateEvents(); - - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); + List events = eventCallback.awaitExpectedEvents(3, TimeUnit.MINUTES); - eventCallback.close(); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); + // we may receive more events as expected + assertTrue(events.size() >= expectedEvents, "Received events: " + events); } @Test - public void testEventStreaming2() throws Exception { + public void testEventStreaming() throws Exception { // Don't include other tests events TimeUnit.SECONDS.sleep(1); + + String startTime = getEpochTime(); + int expectedEvents = generateEvents(); - CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS); - EventsTestCallback eventCallback = new EventsTestCallback(countDownLatch); + EventsTestCallback eventCallback = new EventsTestCallback(expectedEvents); - dockerClient.eventsCmd().withSince(getEpochTime()).exec(eventCallback); + dockerClient.eventsCmd() + .withSince(startTime) + .exec(eventCallback); generateEvents(); - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); - - eventCallback.close(); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); + List events = eventCallback.awaitExpectedEvents(3, TimeUnit.MINUTES); + + // we may receive more events as expected + assertTrue(events.size() >= expectedEvents, "Received events: " + events); } + public void testEventStreamingWithFilter() throws Exception { // Don't include other tests events TimeUnit.SECONDS.sleep(1); + + String startTime = getEpochTime(); + int expectedEvents = 1; - CountDownLatch countDownLatch = new CountDownLatch(1); - EventsTestCallback eventCallback = dockerClient.eventsCmd().withEventFilter("start") - .exec(new EventsTestCallback(countDownLatch)); + EventsTestCallback eventCallback = new EventsTestCallback(expectedEvents); + + dockerClient.eventsCmd() + .withSince(startTime) + .withEventFilter("start") + .exec(eventCallback); generateEvents(); - Boolean zeroCount = countDownLatch.await(10, TimeUnit.SECONDS); - - eventCallback.close(); - assertTrue(zeroCount, "Received only: " + eventCallback.getEvents()); + List events = eventCallback.awaitExpectedEvents(3, TimeUnit.MINUTES); + + // we should get exactly one "start" event here + assertEquals(events.size(), expectedEvents, "Received events: " + events); } /** - * This method generates {#link KNOWN_NUM_EVENTS} events + * This method generates some events and returns the number of events being generated */ private int generateEvents() throws Exception { - String testImage = "busybox"; + String testImage = "busybox:latest"; dockerClient.pullImageCmd(testImage).exec(new PullImageResultCallback()).awaitSuccess(); - CreateContainerResponse container = dockerClient.createContainerCmd(testImage).withCmd("sleep", "9999").exec(); dockerClient.startContainerCmd(container.getId()).exec(); - dockerClient.stopContainerCmd(container.getId()).exec(); - return KNOWN_NUM_EVENTS; + dockerClient.stopContainerCmd(container.getId()).withTimeout(1).exec(); + + // generates 5 events with remote api 1.24: + + // Event[status=pull,id=busybox:latest,from=,node=,type=IMAGE,action=pull,actor=com.github.dockerjava.api.model.EventActor@417db6d7[id=busybox:latest,attributes={name=busybox}],time=1473455186,timeNano=1473455186436681587] + // Event[status=create,id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,from=busybox:latest,node=,type=CONTAINER,action=create,actor=com.github.dockerjava.api.model.EventActor@40bcec[id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,attributes={image=busybox:latest, name=sick_lamport}],time=1473455186,timeNano=1473455186470713257] + // Event[status=,id=,from=,node=,type=NETWORK,action=connect,actor=com.github.dockerjava.api.model.EventActor@318a1b01[id=10870ceb13abb7cf841ea68868472da881b33c8ed08d2cde7dbb39d7c24d1d27,attributes={container=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c, name=bridge, type=bridge}],time=1473455186,timeNano=1473455186544318466] + // Event[status=start,id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,from=busybox:latest,node=,type=CONTAINER,action=start,actor=com.github.dockerjava.api.model.EventActor@606f43a3[id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,attributes={image=busybox:latest, name=sick_lamport}],time=1473455186,timeNano=1473455186786163819] + // Event[status=kill,id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,from=busybox:latest,node=,type=CONTAINER,action=kill,actor=com.github.dockerjava.api.model.EventActor@72a9ffcf[id=6ec10182cde227040bfead8547b63105e6bbc4e94b99f6098bfad6e158ce0d3c,attributes={image=busybox:latest, name=sick_lamport, signal=15}],time=1473455186,timeNano=1473455186792963392] + + return 5; } private class EventsTestCallback extends EventsResultCallback { @@ -144,18 +141,25 @@ private class EventsTestCallback extends EventsResultCallback { private final List events = new ArrayList(); - public EventsTestCallback(CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; + public EventsTestCallback(int expextedEvents) { + this.countDownLatch = new CountDownLatch(expextedEvents); } public void onNext(Event event) { LOG.info("Received event #{}: {}", countDownLatch.getCount(), event); - countDownLatch.countDown(); events.add(event); + countDownLatch.countDown(); } - - public List getEvents() { - return new ArrayList(events); + + public List awaitExpectedEvents(long timeout, TimeUnit unit ) { + try { + countDownLatch.await(timeout, unit); + close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new ArrayList(events); } } } + diff --git a/src/test/java/com/github/dockerjava/netty/exec/StopContainerCmdExecTest.java b/src/test/java/com/github/dockerjava/netty/exec/StopContainerCmdExecTest.java index 1dca12890..063bcee30 100644 --- a/src/test/java/com/github/dockerjava/netty/exec/StopContainerCmdExecTest.java +++ b/src/test/java/com/github/dockerjava/netty/exec/StopContainerCmdExecTest.java @@ -61,7 +61,7 @@ public void testStopContainer() throws DockerException { dockerClient.startContainerCmd(container.getId()).exec(); LOG.info("Stopping container: {}", container.getId()); - dockerClient.stopContainerCmd(container.getId()).withTimeout(2).exec(); + dockerClient.stopContainerCmd(container.getId()).withTimeout(10).exec(); InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(container.getId()).exec(); LOG.info("Container Inspect: {}", inspectContainerResponse.toString()); @@ -69,17 +69,15 @@ public void testStopContainer() throws DockerException { assertThat(inspectContainerResponse.getState().getRunning(), is(equalTo(false))); final Integer exitCode = inspectContainerResponse.getState().getExitCode(); - if (apiVersion.equals(VERSION_1_22)) { - assertThat(exitCode, is(0)); - } else { - assertThat(exitCode, not(0)); - } + + assertThat(exitCode, is(137)); + } @Test(expectedExceptions = NotFoundException.class) public void testStopNonExistingContainer() throws DockerException { - dockerClient.stopContainerCmd("non-existing").withTimeout(2).exec(); + dockerClient.stopContainerCmd("non-existing").withTimeout(10).exec(); } } diff --git a/src/test/java/com/github/dockerjava/netty/exec/UpdateContainerCmdExecTest.java b/src/test/java/com/github/dockerjava/netty/exec/UpdateContainerCmdExecTest.java index 1134afd6d..681839c17 100644 --- a/src/test/java/com/github/dockerjava/netty/exec/UpdateContainerCmdExecTest.java +++ b/src/test/java/com/github/dockerjava/netty/exec/UpdateContainerCmdExecTest.java @@ -78,7 +78,7 @@ public void updateContainer() throws DockerException, IOException { // .withCpusetCpus("0") // depends on env .withCpusetMems("0") .withMemory(314572800L) - .withMemorySwap(514288000L) +// .withMemorySwap(514288000L) Your kernel does not support swap limit capabilities, memory limited without swap. .withMemoryReservation(209715200L) // .withKernelMemory(52428800) Can not update kernel memory to a running container, please stop it first. .exec(); @@ -101,7 +101,7 @@ public void updateContainer() throws DockerException, IOException { assertThat(afterHostConfig.getCpusetMems(), is("0")); assertThat(afterHostConfig.getMemoryReservation(), is(209715200L)); - assertThat(afterHostConfig.getMemorySwap(), is(514288000L)); +// assertThat(afterHostConfig.getMemorySwap(), is(514288000L)); } }