PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据.
ps:使用这组I/O流必须在多线程环境下.
首先简单的介绍一下这两个类的实现原理,PipedInputStream和PipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东东.还有两个变量,in,out,in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费.
因为生产和消费的方法都是synchronized的,所以肯定是生产者先生产出一定数量的东西,消费者才可以开始消费,所以在生产的时候发现in==out,那一定是满了,同理,在消费的时候发现in==out,那一定是消费完了,因为生产的东西永远要比消费来得早,消费者最多可以消费和生产的数量相等的东西,而不会超出.
好了,介绍完之后,看看SUN高手是怎么实现这些功能的.由于buffer(存放产品的通道)这个关键变量在PipedInputStream消费者这个类中,所以要想对buffer操作,只能通过PipedInputStream来操作,因此将产品放入通道的操作是在PipedInputStream中.
存放产品的行为:
protected synchronized void receive(int b) throws IOException {// 这里好像有些问题,因为这个方法是在PipedOutputStream类中调用的,而这个方法是protected的,下面另一个receive方法就不是protected,可能是我的源码有些问题,也请大家帮我看看
checkStateForReceive();// 检测通道是否连接,准备好接收产品
writeSide = Thread.currentThread();// 当前线程是生产者
if (in == out)
awaitSpace();// 发现通道满了,没地方放东西啦,等吧~~
if (in < 0) {// in<0,表示通道是空的,将生产和消费的位置都置成第一个位置
in = 0;
out = 0;
}
buffer[in++] = (byte) (b & 0xFF);
if (in >= buffer.length) {// 如果生产位置到达了通道的末尾,为了循环利用通道,将in置成0
in = 0;
}
}
synchronized void receive(byte b[], int off, int len) throws IOException {// 看,这个方法不是protected的!
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;// 需要接收多少产品的数量
while (bytesToTransfer > 0) {
if (in == out)
awaitSpace();
int nextTransferAmount = 0;// 本次实际可以接收的数量
if (out < in) {
nextTransferAmount = buffer.length - in;// 如果消费的当前位置<生产的当前位置,则还可以再生产buffer.length-in这么多
} else if (in < out) {
if (in == -1) {
in = out = 0;// 如果已经消费完,则将in,out置成0,从头开始接收
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;// 如果消费的当前位置>生产的当前位置,而且还没消费完,那么至少还可以再生产out-in这么多,注意,这种情况是因为通道被重复利用而产生的!
}
}
if (nextTransferAmount > bytesToTransfer)// 如果本次实际可以接收的数量要大于当前传过来的数量,
nextTransferAmount = bytesToTransfer;// 那么本次实际只能接收当前传过来的这么多了
assert (nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);// 把本次实际接收的数量放进通道
bytesToTransfer -= nextTransferAmount;// 算出还剩多少需要放进通道
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {// 到末尾了,该从头开始了
in = 0;
}
}
}
消费产品的行为:
public synchronized int read() throws IOException {// 消费单个产品
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
&& (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {// in<0,表示通道是空的,等待生产者生产
if (closedByWriter) {
/**//* closed by writer, return EOF */
return -1;// 返回-1表示生产者已经不再生产产品了,closedByWriter为true表示是由生产者将通道关闭的
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/**//* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;// 如果消费到通道的末尾了,从通道头开始继续循环消费
}
if (in == out) {
/**//* now empty */
in = -1;// 消费的位置和生产的位置重合了,表示消费完了,需要生产者生产,in置为-1
}
return ret;
}
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0)
|| ((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/**//* possibly wait on the first character */
int c = read();// 利用消费单个产品来检测通道是否连接,并且通道中是否有东西可消费
if (c < 0) {
return -1;// 返回-1表示生产者生产完了,消费者也消费完了,消费者可以关闭通道了
}
b[off] = (byte) c;
int rlen = 1;
// 这里没有采用receive(byte [], int ,
// int)方法中System.arrayCopy()的方法,其实用System.arrayCopy()的方法也可以实现
/**//*
* 这是用System.arrayCopy()实现的方法 int bytesToConsume = len - 1; while
* (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
* in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
* b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
* buffer.length - out; }
*
* if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
* bytesToConsume; assert (nextConsumeAmount > 0);
* System.arraycopy(buffer, out, b, off, nextConsumeAmount);
* bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
* nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
* buffer.length) { out = 0; } if(in == out) { in = -1; } }
*/
while ((in >= 0) && (--len > 0)) {
b[off + rlen] = buffer[out++];
rlen++;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/**//* now empty */
in = -1;// in==out,表示满了,将in置成-1
}
}
return rlen;
}
分享到:
相关推荐
PipedInputStream和PipedOutputStream_动力节点Java学院整理
主要为大家详细介绍了管道PipedInputStream和PipedOutputStream,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
针对java中的管道流的应用的解析,包括PipedInputStream和PipedOutputStream。
FileWriter、FileReader、CharArrayReader、CharArrayWriter、CharSequence、OutputStreamWriter、FileOutputStream、InputStreamReader、...PipedReader、PipedWriter、PipedInputStream、PipedOutputStream...
|--PipedInputStream/:管道流。管道读取流可以读取管道写入流的数据。注意:需要加多线程技术,否则会发生死锁。read方法是阻塞式的。 | |--ByteArrayInputStream/:字节数组输入流。操作的都是内存中的数组,所以...
PipedInputStream(PipedOutputStream out)直接连接到输出流 4)SequenceInputStream:这个类可以将几个输入流串联在一起合并为一个输入流,构造函数有: SequenceInputStream(Enumeration e)枚举类型e中包含了若干...
PipedInputStream pipedIS = new PipedInputStream(); PipedOutputStream pipedOS = new PipedOutputStream(); try { pipedOS.connect(pipedIS); } catch(IOException e) { System.err.println("连接失败"); ...
4) PipedInputStream:实现了pipe的概念,主要在线程中使用 5) SequenceInputStream:把多个InputStream合并为一个InputStream 2) Out stream 1) ByteArrayOutputStream:把信息存入内存中的一个缓冲区中 2) ...
3) PipedOutputStream:实现了pipe的概念,主要在线程中使用 4) SequenceOutputStream:把多个OutStream合并为一个OutStream 1.2 以Unicode字符为导向的stream 以Unicode字符为导向的stream,表示以Unicode字符为...
4) PipedInputStream:实现了pipe的概念,主要在线程中使用 5) SequenceInputStream:把多个InputStream合并为一个InputStream 2. Out stream 1) ByteArrayOutputStream:把信息存入内存中的一个缓冲区中 2) ...
这是一个最小的工作示例,用于证明PipedInputStream包含一个错误,该错误导致相应的PipedOutputStream等待最多一秒钟以等待可用空间写入,而实际上该空间更快可用。 对于某些应用程序,一秒钟可能相当长。问题描述...
javaIO流原代码,刚刚开始学习java的同志们可以看看.有问题留言.
Java中的PipedWriter、PipedReader类管道的读写依赖于PipedOutputStream、PipedInputStream两个管道输入输出类,这里我们将来举例讲解Java中Piped管道输入输出流的线程通信控制:
包含了Java里面大部分的 流类的小实例Propertity FileReader FileWriter FileInputStream PipedInputStream..........
PipedOutputStream 可以将管道输出流连接到管道输入流来创建通信管道。 PipedReader 传送的字符输入流。 PipedWriter 传送的字符输出流。 PrintStream PrintStream 为其他输出流添加了功能,使它们能够方便地打印...
集合I/O多线程NIOjava基础知识点整理jvmspring 相关MYSQL分布式存储检索java源码学习集合Linkedlist详解Vector详解Stack详解Map构架HashMap详解HashMap...OPipedOutputStream and PipedInputStream 详解BufferInputStream...
PipedInputStream 是从与其它线程共用的管道中读取数据,与Piped 相关的知识后续单独介绍。 ObjectInputStream 和所有FilterInputStream 的子类都是装饰流(装饰器模式的主角)。 2.输出字节流OutputStream IO 中...