*Pipe介绍

---Pipe的实现是与系统无关的

---用于在同一jvm之内传输数据,即nio中的Pipe是线程级,Unix的Pipe是进程级的概念.

---Pipe和一个Selector可以实现在一个线程中从多个channel中收集数据,提升效率.(from nio)

*源码分析

---Pipe的实现方式一:在本地建立loopback connection,那么意味着使用Pipe会占用系统的2个端口.

/*
    2    * Copyright 2002-2004 Sun Microsystems, Inc.  All Rights Reserved.
    3    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
    4    *
    5    * This code is free software; you can redistribute it and/or modify it
    6    * under the terms of the GNU General Public License version 2 only, as
    7    * published by the Free Software Foundation.  Sun designates this
    8    * particular file as subject to the "Classpath" exception as provided
    9    * by Sun in the LICENSE file that accompanied this code.
   10    *
   11    * This code is distributed in the hope that it will be useful, but WITHOUT
   12    * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
   13    * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
   14    * version 2 for more details (a copy is included in the LICENSE file that
   15    * accompanied this code).
   16    *
   17    * You should have received a copy of the GNU General Public License version
   18    * 2 along with this work; if not, write to the Free Software Foundation,
   19    * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   20    *
   21    * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
   22    * CA 95054 USA or visit www.sun.com if you need additional information or
   23    * have any questions.
   24    */
   25   
   26   
   27   
   28   package sun.nio.ch;
   29   
   30   import java.io.IOException;
   31   import java.net.InetAddress;
   32   import java.net.InetSocketAddress;
   33   import java.nio;
   34   import java.nio.channels;
   35   import java.nio.channels.spi;
   36   import java.security.AccessController;
   37   import java.security.PrivilegedExceptionAction;
   38   import java.security.PrivilegedActionException;
   39   import java.util.Random;
   40   
   41   
   42   /**
   43    * A simple Pipe implementation based on a socket connection.
   44    */
   45   
   46   class PipeImpl
   47       extends Pipe
   48   {
   49   
   50       // Source and sink channels
   51       private SourceChannel source;
   52       private SinkChannel sink;
   53   
   54       // Random object for handshake values
   55       private static final Random rnd;
   56   
   57       static {
   58           Util.load();
   59           byte[] someBytes = new byte[8];
   60           boolean resultOK = IOUtil.randomBytes(someBytes);
   61           if (resultOK) {
   62               rnd = new Random(ByteBuffer.wrap(someBytes).getLong());
   63           } else {
   64               rnd = new Random();
   65           }
   66       }
   67   
   68       private class Initializer
   69           implements PrivilegedExceptionAction
   70       {
   71   
   72           private final SelectorProvider sp;
   73   
   74           private Initializer(SelectorProvider sp) {
   75               this.sp = sp;
   76           }
   77   
   78           public Object run() throws IOException {
   79               ServerSocketChannel ssc = null;
   80               SocketChannel sc1 = null;
   81               SocketChannel sc2 = null;
   82   
   83               try {
   84                   // loopback address
   85                   InetAddress lb = InetAddress.getByName("127.0.0.1");
   86                   assert(lb.isLoopbackAddress());
   87   
   88                   // bind ServerSocketChannel to a port on the loopback address
   89                   ssc = ServerSocketChannel.open();
   90                   ssc.socket().bind(new InetSocketAddress(lb, 0));
   91   
   92                   // Establish connection (assumes connections are eagerly
   93                   // accepted)
   94                   InetSocketAddress sa
   95                       = new InetSocketAddress(lb, ssc.socket().getLocalPort());
   96                   sc1 = SocketChannel.open(sa);
   97   
   98                   ByteBuffer bb = ByteBuffer.allocate(8);
   99                   long secret = rnd.nextLong();
  100                   bb.putLong(secret).flip();
  101                   sc1.write(bb);
  102   
  103                   // Get a connection and verify it is legitimate
  104                   for (;;) {
  105                       sc2 = ssc.accept();
  106                       bb.clear();
  107                       sc2.read(bb);
  108                       bb.rewind();
  109                       if (bb.getLong() == secret)
  110                           break;
  111                       sc2.close();
  112                   }
  113   
  114                   // Create source and sink channels
  115                   source = new SourceChannelImpl(sp, sc1);
  116                   sink = new SinkChannelImpl(sp, sc2);
  117               } catch (IOException e) {
  118                   try {
  119                       if (sc1 != null)
  120                           sc1.close();
  121                       if (sc2 != null)
  122                           sc2.close();
  123                   } catch (IOException e2) { }
  124                   IOException x = new IOException("Unable to establish"
  125                                                   + " loopback connection");
  126                   x.initCause(e);
  127                   throw x;
  128               } finally {
  129                   try {
  130                       if (ssc != null)
  131                           ssc.close();
  132                   } catch (IOException e2) { }
  133               }
  134               return null;
  135           }
  136       }
  137   
  138       PipeImpl(final SelectorProvider sp) throws IOException {
  139           try {
  140               AccessController.doPrivileged(new Initializer(sp));
  141           } catch (PrivilegedActionException x) {
  142               throw (IOException)x.getCause();
  143           }
  144       }
  145   
  146   
  147       public SourceChannel source() {
  148           return source;
  149       }
  150   
  151       public SinkChannel sink() {
  152           return sink;
  153       }
  154   
  155   }

*示例

---来自nio的示例,说明了使用方法

package nio.channel.pipe;

import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Random;

/**
 * Test Pipe objects using a worker thread.
 *
 * Created April, 2002
 * @author Ron Hitchens (ron@ronsoft.com)
 */
public class PipeTest
{
	public static void main (String [] argv)
	throws Exception
	{
		// Wrap a channel around stdout
		WritableByteChannel out = Channels.newChannel (System.out);
		// Start worker and get read end of channel
		ReadableByteChannel workerChannel = startWorker (10);
		ByteBuffer buffer = ByteBuffer.allocate (100);
		while (workerChannel.read (buffer) >= 0) {
			buffer.flip( );
			out.write (buffer);
			buffer.clear( );
		}
		
	}
	// This method could return a SocketChannel or
	// FileChannel instance just as easily
	private static ReadableByteChannel startWorker (int reps)
	throws Exception
	{
		Pipe pipe = Pipe.open( );
		Worker worker = new Worker (pipe.sink( ), reps);
		worker.start();
		return (pipe.source());
	}
		// -----------------------------------------------------------------
		/**
		 * A worker thread object which writes data down a channel.
		 * Note: this object knows nothing about Pipe, uses only a
		 * generic WritableByteChannel.
		 */
		private static class Worker extends Thread
		{
			WritableByteChannel channel;
			private int reps;
			Worker (WritableByteChannel channel, int reps)
			{
				this.channel = channel;
				this.reps = reps;
			}
			// Thread execution begins here
			public void run( )
			{
				ByteBuffer buffer = ByteBuffer.allocate (100);
				try {
					for (int i = 0; i < this.reps; i++) {
						doSomeWork (buffer);
						// channel may not take it all at once
						while (channel.write (buffer) > 0) {
							// empty
						}
					}
					this.channel.close( );
				} catch (Exception e) {
					// easy way out; this is demo code
					e.printStackTrace( );
				}
			}
			private String [] products = {
					"No good deed goes unpunished",
					"To be, or what?",
					"No matter where you go, there you are",
					"Just say \"Yo\"",
					"My karma ran over my dogma"
			};
			
			private Random rand = new Random(
			);
			
			private void doSomeWork (ByteBuffer buffer)
			{
				int product = rand.nextInt (products.length);
				buffer.clear( );
				buffer.put (products [product].getBytes());
				buffer.put ("\r\n".getBytes( ));
				buffer.flip( );
			}
		}
}

相关文章:

  • 2021-07-29
  • 2021-05-27
  • 2021-04-01
  • 2022-02-28
猜你喜欢
  • 2021-09-05
  • 2022-12-23
  • 2021-09-16
  • 2021-06-25
  • 2021-10-18
  • 2021-10-21
  • 2021-11-17
相关资源
相似解决方案