Index: src/squeezeplay/share/jive/net/SlimProto.lua =================================================================== --- src/squeezeplay/share/jive/net/SlimProto.lua (revision 9552) +++ src/squeezeplay/share/jive/net/SlimProto.lua (working copy) @@ -269,7 +269,7 @@ transitionType = string.sub(packet, 15, 15), flags = unpackNumber(packet, 16, 1), outputThreshold = unpackNumber(packet, 17, 1), - -- reserved = unpackNumber(packet, 18, 1), + slaves = unpackNumber(packet, 18, 1), replayGain = unpackNumber(packet, 19, 4), serverPort = unpackNumber(packet, 23, 2), serverIp = unpackNumber(packet, 25, 4), Index: src/squeezeplay/src/audio/streambuf.c =================================================================== --- src/squeezeplay/src/audio/streambuf.c (revision 9552) +++ src/squeezeplay/src/audio/streambuf.c (working copy) @@ -50,6 +50,37 @@ static u32_t icy_meta_interval; static s32_t icy_meta_remaining; +struct chunk { + u8_t *buf; + size_t len; +}; + +static void proxy_chunk (u8_t *buf, size_t size, lua_State *L) +{ + if (L && size) { + struct chunk *chunk; + /* + * Send chunk to proxy clients + * + * Relies on this being sent before wrap-around occurs + * which is ensured by Playback.lua not scheduling an more reads + * on the stream until the queued chunk (or, initially, chunks) + * has been written to all proxy clients. + * + * At the start of a stream, there may be up to 3 queued chunks: + * 1. the header + * 2. the remains of the initial read up until fifo wrap-around + * 3. the remains of the initial read after fifo wrap-around + */ + /* */ + lua_getfield(L, 2, "_proxyQueueSegment"); + lua_pushvalue(L, 2); + chunk = lua_newuserdata(L, sizeof(*chunk)); + chunk->buf = buf; + chunk->len = size; + lua_call(L, 2, 0); + } +} size_t streambuf_get_size(void) { return STREAMBUF_SIZE; @@ -127,7 +158,7 @@ } -void streambuf_feed(u8_t *buf, size_t size) { +static void streambuf_feedL(u8_t *buf, size_t size, lua_State *L) { size_t n; fifo_lock(&streambuf_fifo); @@ -144,6 +175,9 @@ } memcpy(streambuf_buf + streambuf_fifo.wptr, buf, n); + + proxy_chunk(streambuf_buf + streambuf_fifo.wptr, n, L); + fifo_wptr_incby(&streambuf_fifo, n); size -= n; } @@ -151,8 +185,11 @@ fifo_unlock(&streambuf_fifo); } +void streambuf_feed(u8_t *buf, size_t size) { + streambuf_feedL(buf, size, 0); +} -ssize_t streambuf_feed_fd(int fd) { +ssize_t streambuf_feed_fd(int fd, lua_State *L) { ssize_t n, size; fifo_lock(&streambuf_fifo); @@ -181,6 +218,8 @@ streambuf_streaming = FALSE; } else { + proxy_chunk(streambuf_buf + streambuf_fifo.wptr, n, L); + fifo_wptr_incby(&streambuf_fifo, n); streambuf_bytes_received += n; @@ -581,7 +620,7 @@ /* shortcut, just read to streambuf */ if (stream->num_crlf == 4) { - n = streambuf_feed_fd(stream->fd); + n = streambuf_feed_fd(stream->fd, L); if (n == 0) { /* closed */ lua_pushboolean(L, FALSE); @@ -653,7 +692,7 @@ n--; if (stream->num_crlf == 4) { - header_len = body_ptr - stream->body - 1; + header_len = body_ptr - stream->body; //LOG_DEBUG(log_audio_decode, "headers %d %*s\n", header_len, header_len, stream->body); @@ -663,9 +702,12 @@ lua_pushlstring(L, (char *)stream->body, header_len); lua_call(L, 2, 0); - free(stream->body); - stream->body = NULL; - stream->body_len = 0; + /* do not free the header here - leave it to disconnect - + * so that it can be used by the proxy code + */ + + /* Send headers to proxy clients */ + proxy_chunk(stream->body, header_len, L); break; } @@ -676,7 +718,7 @@ streambuf_lptr = streambuf_fifo.wptr; /* feed remaining buffer */ - streambuf_feed(buf_ptr, n); + streambuf_feedL(buf_ptr, n, L); lua_pushboolean(L, TRUE); return 1; @@ -726,6 +768,42 @@ return 1; } +static int stream_proxyWriteL(lua_State *L) { + struct stream *stream; + struct chunk *chunk; + ssize_t n; + size_t len, offset; + + /* + * 1: Stream (self) + * 2: Proxy stream + * 3: chunk + * 4: offset + */ + + stream = lua_touserdata(L, 2); + chunk = lua_touserdata(L, 3); + offset = lua_tointeger(L, 4); + + len = chunk->len - offset; + n = send(stream->fd, chunk->buf + offset, len, MSG_NOSIGNAL); + if (n < 0) { + if (errno != EAGAIN) { + lua_pushnil(L); + lua_pushstring(L, strerror(SOCKETERROR)); + return 2; + } + } else if ((size_t)n < len) { + offset += n; + } else { + /* wrote it all */ + lua_pushnil(L); + return 1; + } + lua_pushinteger(L, offset); + return 1; +} + /* feed data from a lua string into the streambuf fifo */ static int stream_feedfromL(lua_State *L) { @@ -865,6 +943,7 @@ { "loadLoop", stream_load_loopL }, { "markLoop", stream_mark_loopL }, { "icyMetaInterval", stream_icy_metaintervalL }, + { "proxyWrite", stream_proxyWriteL }, { NULL, NULL } }; Index: src/squeezeplay/share/jive/audio/Playback.lua =================================================================== --- src/squeezeplay/share/jive/audio/Playback.lua (revision 9552) +++ src/squeezeplay/share/jive/audio/Playback.lua (working copy) @@ -10,6 +10,7 @@ local hasDecode, decode = pcall(require, "squeezeplay.decode") local hasSprivate, spprivate = pcall(require, "spprivate") local Stream = require("squeezeplay.stream") +local socket = require("socket") -- for proxy streams local SlimProto = require("jive.net.SlimProto") local Player = require("jive.slim.Player") @@ -16,6 +17,7 @@ local Task = require("jive.ui.Task") local Timer = require("jive.ui.Timer") local Framework = require("jive.ui.Framework") +local Networking = require("jive.net.Networking") local debug = require("jive.utils.debug") local log = require("jive.utils.log").logger("audio.decode") @@ -41,10 +43,14 @@ local TCP_CLOSE_LOCAL_TIMEOUT = 4 --- Do NOT set a read timeout, the stream may be paused indefinately +-- Do NOT set a read timeout, the stream may be paused indefinitely local STREAM_READ_TIMEOUT = 0 local STREAM_WRITE_TIMEOUT = 5 +local PROXY_WRITE_TIMEOUT = 0 -- because the stream may be paused +local PROXY_CONNECT_TIMEOUT = STREAM_WRITE_TIMEOUT + 1 +local PROXY_LISTEN_PORT = 9001 + local LOCAL_PAUSE_STOP_TIMEOUT = 400 -- Handlers to allow applets to extend playback capabilties via spdr:// urls @@ -123,6 +129,8 @@ -- signal we are Rtmp capable, but don't load module until used slimproto:capability("Rtmp", 2) + + slimproto:capability("Proxy", tostring(_getIPAddress()) .. ":" .. tostring(PROXY_LISTEN_PORT)) self.mode = 0 self.threshold = 0 @@ -138,6 +146,9 @@ self.sentAudioUnderrunEvent = false self.ignoreStream = false self.decodeThreshold = 2048 + + self.proxy = nil + self.proxyListener = nil return obj end @@ -266,7 +277,7 @@ -- enable stream reads when decode buffer is not full if status.decodeFull < status.decodeSize and self.stream then - self.jnt:t_addRead(self.stream, self.rtask, STREAM_READ_TIMEOUT) + self:_proxyAndStream(true) end if status.decodeState & DECODE_UNDERRUN ~= 0 or @@ -443,7 +454,7 @@ end -function _streamConnect(self, serverIp, serverPort, reader, writer) +function _streamConnect(self, serverIp, serverPort, reader, writer, slaves) log:info("connect ", _ipstring(serverIp), ":", serverPort, " ", string.match(self.header, "(.-)\n")) if serverIp ~= self.slimproto:getServerIp() then @@ -483,6 +494,8 @@ m.read = m._streamRead m.write = m._streamWrite end + + self:_proxyInit(slaves, self.stream) local wtask = Task("streambufW", self, _streamWrite, nil, Task.PRIORITY_AUDIO) self.jnt:t_addWrite(self.stream, wtask, STREAM_WRITE_TIMEOUT) @@ -488,7 +501,7 @@ self.jnt:t_addWrite(self.stream, wtask, STREAM_WRITE_TIMEOUT) self.rtask = Task("streambufR", self, _streamRead, nil, Task.PRIORITY_AUDIO) - self.jnt:t_addRead(self.stream, self.rtask, STREAM_READ_TIMEOUT) + self:_proxyAndStream(true) self.slimproto:sendStatus('STMc') end @@ -493,6 +506,229 @@ self.slimproto:sendStatus('STMc') end +function _proxyQueueSegment(self, chunk) + if self.proxy then + table.insert(self.proxy.q, chunk) + end +end + +function _proxyConnClose(self, conn, err, leaveConnectionTable) + log:info("Proxy connection closed: from ", conn.ip, ':', conn.port, '; ', err or '') + self.jnt:t_removeWrite(conn.stream) + self.jnt:t_removeRead(conn.stream) + conn.stream:close() + conn.chunk = nil + if self.proxy and not leaveConnectionTable then + for i, c in ipairs(self.proxy.connections) do + if c == conn then + table.remove(self.proxy.connections, i) + break + end + end + end +end + +function _proxyWrite(self, conn, networkErr) + if networkErr then + self:_proxyConnClose(conn, networkErr) + return + end + + while true do + local n, err + if conn.chunk then + n, err = Stream:proxyWrite(conn.stream, conn.chunk, conn.chunkOffset) + if n then -- stuff left + conn.chunkOffset = n + else + conn.chunk = nil + self.jnt:t_removeWrite(conn.stream) + -- Let reading be restarted by the status timer + -- once the decoder is running. + -- This prevents the streambuf starving the cpu + self:_proxyAndStream(not self.sentResumeDecoder) + end + end + + if err then + self:_proxyConnClose(conn, err) + break + end + + _, networkErr = Task:yield(false) + end +end + +function _proxyRead(self, conn, networkErr) + if networkErr then + log:info("proxyRead: ", err) + return + end + + while true do + local n, err = conn.stream:receive(10000) + if err then + log:info("proxyRead: ", err) + break + elseif n then + log:info("proxyRead received ", #n); + end + _, networkErr = Task:yield(false) + end + self.jnt:t_removeRead(conn.stream) +end + +function _proxyAccept(self, networkErr) + -- XXX check error + + while true do + local stream + stream, networkErr = self.proxyListener:accept() + + if networkErr then + log:info("proxyAccept: ", networkErr) + break + end + + if stream then + local conn = {} + conn.ip, conn.port = stream:getpeername() + log:info("Proxy connection accepted: from ", conn.ip, ':', conn.port) + conn.stream = stream + stream:settimeout(0) + conn.wtask = Task("proxyW", self, + function (self, networkErr) self:_proxyWrite(conn, networkErr) end, + nil, Task.PRIORITY_AUDIO) + conn.rtask = Task("proxyR", self, + function (self, networkErr) self:_proxyRead(conn, networkErr) end, + nil, Task.PRIORITY_AUDIO) + self.jnt:t_addRead(conn.stream, conn.rtask, STREAM_WRITE_TIMEOUT) -- read and discard the request + table.insert(self.proxy.connections, conn) + + self.proxy.expected = self.proxy.expected - 1 + if self.proxy.expected <= 0 then -- we have them all + log:info("All proxy connections active") + self.jnt:t_removeRead(self.proxyListener) + self.proxy.listenTask = nil + self:_proxyAndStream(true) + return; + end + end + + _, networkErr = Task:yield(false) + end + + self.jnt:t_removeRead(self.proxyListener) + self.proxy.listenTask = nil + + if self.proxy.expected > 0 then + log:warn('Not all proxy connections accepted: ', networkErr) + if self.stream and self.stream == self.proxy.stream then + self:_streamDisconnect(TCP_CLOSE_LOCAL_TIMEOUT) + end + end +end + +function _getIPAddress() + local ip_address, ip_subnet + local ifObj = Networking:activeInterface() + + if ifObj then + ip_address, ip_subnet = ifObj:getIPAddressAndSubnet() + end + + return ip_address or "?.?.?.?" +end + +function _proxyCleanup(self) + if self.proxy then + -- close existing connections, etc. + self.jnt:t_removeRead(self.proxyListener) + self.proxy.listenTask = nil + + for i, c in ipairs(self.proxy.connections) do + self:_proxyConnClose(c, nil, true) + end + + self.proxy = nil + end +end + +function _proxyInit(self, expected, stream) + + if self.proxy then + if self.proxy.close and not self.proxy.listenTask then + -- let them drain + self.proxy.expected = expected + self.proxy.stream = stream + return + else + self:_proxyCleanup() + end + end + + if expected and expected > 0 then + log:info("Proxy: connections expected = ", expected) + local proxy = {} + proxy.expected = expected + proxy.stream = stream + proxy.q = {} + proxy.connections = {} + + if not self.proxyListener then + self.proxyListener = socket.bind(0, PROXY_LISTEN_PORT) + self.proxyListener:settimeout(0) + end + + proxy.listenTask = Task("proxyListen", self, _proxyAccept, nil, Task.PRIORITY_AUDIO) + self.jnt:t_addRead(self.proxyListener, proxy.listenTask, PROXY_CONNECT_TIMEOUT) + + self.proxy = proxy + end +end + +function _proxyAndStream(self, canRead) + if not canRead then + self.jnt:t_removeRead(self.stream) + end + + if self.proxy then + if self.proxy.listenTask then + return + end + + for i, c in ipairs(self.proxy.connections) do + if c.chunk then return end + end + + if #self.proxy.q > 0 then + local chunk = table.remove(self.proxy.q, 1) + for i, c in ipairs(self.proxy.connections) do + c.chunk, c.chunkOffset = chunk, 0 + self.jnt:t_addWrite(c.stream, c.wtask, PROXY_WRITE_TIMEOUT) + end + return + end + + if self.proxy.close then + for i, c in ipairs(self.proxy.connections) do + self:_proxyConnClose(c, nil, true) + end + self.proxy.close = false + self.proxy.connections = {} + if self.proxy.expected > 0 and self.stream == self.proxy.stream then + -- next connection now + self:_proxyInit(self.proxy.expected, self.proxy.stream) + end + end + end + + if canRead then + self.jnt:t_addRead(self.stream, self.rtask, STREAM_READ_TIMEOUT) + end +end + + function _streamDisconnect(self, reason, flush) if not self.stream then @@ -511,6 +747,20 @@ self.stream:disconnect() self.stream = nil + + if self.proxy then + if reason and not reason == TCP_CLOSE_FIN then + self:_proxyCleanup() + else + if self.proxy.listenTask then + self.proxy.listenTask = nil + self.jnt:t_removeRead(self.proxyListener) + end + -- Close any proxy connections as soon as they have drained + self.proxy.close = true + self:_proxyAndStream(false) + end + end -- Notify SqueezeCenter the stream is closed if (flush) then @@ -553,9 +803,7 @@ -- stop reading if the decoder is running. the socket will -- be added again by the status timer. this prevents the -- streambuf starving the cpu - if self.sentResumeDecoder then - self.jnt:t_removeRead(self.stream) - end + self:_proxyAndStream(not self.sentResumeDecoder) _, networkErr = Task:yield(false) @@ -701,7 +949,7 @@ string.byte(data.pcmChannels), string.byte(data.pcmEndianness) ) - self:_streamConnect(serverIp, data.serverPort) + self:_streamConnect(serverIp, data.serverPort, nil, nil, data.slaves) end elseif data.command == 'q' then @@ -757,6 +1005,8 @@ _setSource(self, "off") end self:_streamDisconnect(nil, true) + + self:_proxyCleanup() self.tracksStarted = 0 end Index: src/squeezeplay/src/audio/streambuf.h =================================================================== --- src/squeezeplay/src/audio/streambuf.h (revision 9552) +++ src/squeezeplay/src/audio/streambuf.h (working copy) @@ -44,7 +44,7 @@ extern size_t streambuf_read(u8_t *buf, size_t min, size_t max, bool_t *streaming); -extern ssize_t streambuf_feed_fd(int fd); +extern ssize_t streambuf_feed_fd(int fd, lua_State *L); extern bool_t streambuf_is_copyright();