diff --git a/lib/async/io/generic.rb b/lib/async/io/generic.rb index 8a20137..a4076f2 100644 --- a/lib/async/io/generic.rb +++ b/lib/async/io/generic.rb @@ -192,6 +192,10 @@ def connected? !@io.closed? end + def readable? + @io.readable? + end + attr_accessor :timeout protected diff --git a/lib/async/io/shared_endpoint.rb b/lib/async/io/shared_endpoint.rb index a929459..a259359 100644 --- a/lib/async/io/shared_endpoint.rb +++ b/lib/async/io/shared_endpoint.rb @@ -24,7 +24,10 @@ def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **opt end server.close_on_exec = close_on_exec - server.reactor = nil + + if server.respond_to?(:reactor=) + server.reactor = nil + end end return self.new(endpoint, wrappers) @@ -35,7 +38,10 @@ def self.connected(endpoint, close_on_exec: false) wrapper = endpoint.connect wrapper.close_on_exec = close_on_exec - wrapper.reactor = nil + + if wrapper.respond_to?(:reactor=) + wrapper.reactor = nil + end return self.new(endpoint, [wrapper]) end diff --git a/lib/async/io/ssl_socket.rb b/lib/async/io/ssl_socket.rb index bba1840..00f5ba1 100644 --- a/lib/async/io/ssl_socket.rb +++ b/lib/async/io/ssl_socket.rb @@ -51,15 +51,21 @@ def initialize(socket, context) super else io = self.class.wrapped_klass.new(socket.to_io, context) - super(io, socket.reactor) - - # We detach the socket from the reactor, otherwise it's possible to add the file descriptor to the selector twice, which is bad. - socket.reactor = nil + if socket.respond_to?(:reactor) + super(io, socket.reactor) + + # We detach the socket from the reactor, otherwise it's possible to add the file descriptor to the selector twice, which is bad. + socket.reactor = nil + else + super(io) + end # This ensures that when the internal IO is closed, it also closes the internal socket: io.sync_close = true - @timeout = socket.timeout + if socket.respond_to?(:timeout) + @timeout = socket.timeout + end end end @@ -112,8 +118,12 @@ def listen(*args) @server.listen(*args) end - def accept(task: Task.current, **options) - peer, address = @server.accept(**options) + def accept(task: Task.current, timeout: nil) + peer, address = @server.accept + + if timeout and peer.respond_to?(:timeout=) + peer.timeout = timeout + end wrapper = SSLSocket.new(peer, @context) diff --git a/lib/async/io/stream.rb b/lib/async/io/stream.rb index baeebc8..56a35e0 100644 --- a/lib/async/io/stream.rb +++ b/lib/async/io/stream.rb @@ -197,6 +197,10 @@ def connected? @io.connected? end + def readable? + @io.readable? + end + def closed? @io.closed? end @@ -246,6 +250,21 @@ def eof! private + def sysread(size, buffer) + while true + result = @io.read_nonblock(size, buffer, exception: false) + + case result + when :wait_readable + @io.wait_readable + when :wait_writable + @io.wait_writable + else + return result + end + end + end + # Fills the buffer from the underlying stream. def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. @@ -257,12 +276,12 @@ def fill_read_buffer(size = @block_size) flush if @read_buffer.empty? - if @io.read_nonblock(size, @read_buffer, exception: false) + if sysread(size, @read_buffer) # Console.logger.debug(self, name: "read") {@read_buffer.inspect} return true end else - if chunk = @io.read_nonblock(size, @input_buffer, exception: false) + if chunk = sysread(size, @input_buffer) @read_buffer << chunk # Console.logger.debug(self, name: "read") {@read_buffer.inspect} diff --git a/lib/async/io/version.rb b/lib/async/io/version.rb index adaf433..3dd45b0 100644 --- a/lib/async/io/version.rb +++ b/lib/async/io/version.rb @@ -5,6 +5,6 @@ module Async module IO - VERSION = "1.42.1" + VERSION = "1.43.2" end end diff --git a/readme.md b/readme.md index 4805fad..1b41e2c 100644 --- a/readme.md +++ b/readme.md @@ -1,5 +1,8 @@ # Async::IO +> [!CAUTION] +> This library is deprecated and should not be used in new projects. Instead, you should use for the endpoint-related functionality, and for stream/buffering functionality. + Async::IO provides builds on [async](https://github.com/socketry/async) and provides asynchronous wrappers for `IO`, `Socket`, and related classes. [![Development Status](https://github.com/socketry/async-io/workflows/Test/badge.svg)](https://github.com/socketry/async-io/actions?workflow=Test) diff --git a/spec/async/io/stream_spec.rb b/spec/async/io/stream_spec.rb index 9b5f18c..b19bfdc 100644 --- a/spec/async/io/stream_spec.rb +++ b/spec/async/io/stream_spec.rb @@ -23,6 +23,26 @@ end end + context "native I/O", if: RUBY_VERSION >= "3.1" do + let(:sockets) do + @sockets = ::Socket.pair(::Socket::AF_UNIX, ::Socket::SOCK_STREAM) + end + + after do + @sockets.each(&:close) + end + + let(:io) {sockets.first} + subject {described_class.new(sockets.last)} + + it "can read data" do + io.write("Hello World") + io.close_write + + expect(subject.read).to be == "Hello World" + end + end + context "socket I/O" do let(:sockets) do @sockets = Async::IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM) diff --git a/tea.yaml b/tea.yaml new file mode 100644 index 0000000..cfafe79 --- /dev/null +++ b/tea.yaml @@ -0,0 +1,6 @@ +# https://tea.xyz/what-is-this-file +--- +version: 1.0.0 +codeOwners: + - '0x03d7E2c0cf7813867DDb318674B66CC53B8497dA' +quorum: 1