From f5aebe5f3138567e1b1e6b459ace35a7a2716637 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 14 Jan 2026 22:46:41 +0000 Subject: [PATCH 1/3] feat: Support dynamic query parameter on reconnect --- .rubocop.yml | 4 +- lib/ld-eventsource/client.rb | 58 ++++++- spec/client_spec.rb | 313 +++++++++++++++++++++++++++++++++++ 3 files changed, 371 insertions(+), 4 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 43d65d5..0d93de4 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,4 +1,4 @@ -require: +plugins: - rubocop-performance AllCops: @@ -289,7 +289,7 @@ Style/PerlBackrefs: StyleGuide: "https://github.com/bbatsov/ruby-style-guide#no-perl-regexp-last-matchers" Enabled: false -Naming/PredicateName: +Naming/PredicatePrefix: Description: "Check the names of predicate methods." StyleGuide: "https://github.com/bbatsov/ruby-style-guide#bool-methods-qmark" ForbiddenPrefixes: diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index f398b16..f226f32 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -164,6 +164,7 @@ def initialize(uri, @on = { event: ->(_) {}, error: ->(_) {} } @last_id = last_event_id + @query_params_callback = nil yield self if block_given? @@ -206,6 +207,36 @@ def on_error(&action) @on[:error] = action end + # + # Specifies a block or Proc to generate query parameters dynamically. This will be called before + # each connection attempt (both initial connection and reconnections), allowing you to update + # query parameters based on the current client state. + # + # The block should return a Hash with string keys and string values, which will be merged with + # any existing query parameters in the base URI. If the callback raises an exception, it will be + # logged and the connection will proceed with the base URI's query parameters (or no query + # parameters if none were present). + # + # This is useful for scenarios where query parameters need to reflect the current state of the + # client, such as sending a "basis" parameter that represents what data the client already has. + # + # @example Using dynamic query parameters + # client = SSE::Client.new(base_uri) do |c| + # c.query_params do + # { + # "basis" => (selector.state if selector.defined?), + # "filter" => filter_key + # }.compact + # end + # c.on_event { |event| handle_event(event) } + # end + # + # @yieldreturn [Hash] a hash of query parameter names to values + # + def query_params(&action) + @query_params_callback = action + end + # # Permanently shuts down the client and its connection. No further events will be dispatched. This # has no effect if called a second time. @@ -289,8 +320,9 @@ def connect end cxn = nil begin - @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request(@method, @uri, build_opts) + uri = build_uri_with_query_params + @logger.info { "Connecting to event stream at #{uri}" } + cxn = @http_client.request(@method, uri, build_opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -397,5 +429,27 @@ def build_opts {headers: build_headers, body: resolved_payload.to_s} end end + + def build_uri_with_query_params + uri = @uri.dup + + if @query_params_callback + begin + dynamic_params = @query_params_callback.call + if dynamic_params.is_a?(Hash) && !dynamic_params.empty? + existing_params = uri.query ? URI.decode_www_form(uri.query).to_h : {} + merged_params = existing_params.merge(dynamic_params) + uri.query = URI.encode_www_form(merged_params) + elsif !dynamic_params.is_a?(Hash) + @logger.warn { "query_params callback returned non-Hash value: #{dynamic_params.class}, ignoring" } + end + rescue StandardError => e + @logger.warn { "query_params callback raised an exception: #{e.inspect}, proceeding with base URI" } + @logger.debug { "Exception trace: #{e.backtrace}" } + end + end + + uri + end end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 62c4968..d15c338 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -203,6 +203,28 @@ def send_stream_content(res, content, keep_open:) end end + it "invokes error handler when server returns 204 No Content" do + with_server do |server| + server.setup_response("/") do |req,res| + res.status = 204 + res.body = "" + res.keep_alive = false + end + + error_sink = Queue.new + client = subject.new(server.base_uri) do |c| + c.on_error { |error| error_sink << error } + end + + with_client(client) do |c| + error = error_sink.pop + expect(error).to be_a(SSE::Errors::HTTPStatusError) + expect(error.status).to eq(204) + expect(error.message).to eq("") + end + end + end + it "reconnects after read timeout" do events_body = simple_event_1_text with_server do |server| @@ -972,4 +994,295 @@ def test_object.to_s end end end + + describe "dynamic query parameters" do + it "sends query parameters from callback on initial connection" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + {"basis" => "p:ABC:123", "filter" => "test"} + end + end) do |client| + received_req = requests.pop + expect(received_req.query_string).to include("basis=p%3AABC%3A123") + expect(received_req.query_string).to include("filter=test") + end + end + end + + it "updates query parameters on reconnection" do + with_server do |server| + requests = Queue.new + attempt = 0 + server.setup_response("/") do |req,res| + requests << req + attempt += 1 + if attempt == 1 + send_stream_content(res, "", keep_open: false) # Close to trigger reconnect + else + send_stream_content(res, "", keep_open: true) + end + end + + counter = 0 + with_client(subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c| + c.query_params do + counter += 1 + {"request_id" => counter.to_s} + end + end) do |client| + req1 = requests.pop + expect(req1.query_string).to eq("request_id=1") + + req2 = requests.pop + expect(req2.query_string).to eq("request_id=2") + expect(attempt).to eq(2) + end + end + end + + it "merges dynamic params with existing query params in URI" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?static=value" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + {"dynamic" => "param"} + end + end) do |client| + received_req = requests.pop + expect(received_req.query_string).to include("static=value") + expect(received_req.query_string).to include("dynamic=param") + end + end + end + + it "allows dynamic params to override existing query params" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?key=original" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + {"key" => "overridden"} + end + end) do |client| + received_req = requests.pop + # Dynamic params should override static ones + expect(received_req.query_string).to eq("key=overridden") + end + end + end + + it "works without query_params callback (backward compatibility)" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri)) do |client| + received_req = requests.pop + expect(received_req.query_string).to be_nil + end + end + end + + it "preserves existing query params when no callback is set" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?existing=param" + with_client(subject.new(base_uri_with_params)) do |client| + received_req = requests.pop + expect(received_req.query_string).to eq("existing=param") + end + end + end + + it "handles callback returning empty hash" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?existing=param" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + {} + end + end) do |client| + received_req = requests.pop + # Empty hash should preserve existing params (consistent with Python behavior) + expect(received_req.query_string).to eq("existing=param") + end + end + end + + it "handles callback returning nil gracefully" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + nil + end + end) do |client| + received_req = requests.pop + # Should proceed with base URI (no query params) + expect(received_req.query_string).to be_nil + end + end + end + + it "handles callback raising exception gracefully" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?fallback=value" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + raise "Test exception" + end + end) do |client| + received_req = requests.pop + # Should fall back to base URI params + expect(received_req.query_string).to eq("fallback=value") + end + end + end + + it "handles callback returning non-Hash value gracefully" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?fallback=value" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + "not a hash" + end + end) do |client| + received_req = requests.pop + # Should fall back to base URI params + expect(received_req.query_string).to eq("fallback=value") + end + end + end + + it "updates query parameters on each reconnection attempt" do + with_server do |server| + requests = Queue.new + attempt = 0 + server.setup_response("/") do |req,res| + requests << req + attempt += 1 + if attempt <= 2 + res.status = 500 + res.body = "error" + res.keep_alive = false + else + send_stream_content(res, "", keep_open: true) + end + end + + connection_count = 0 + with_client(subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c| + c.query_params do + connection_count += 1 + {"connection" => connection_count.to_s} + end + end) do |client| + req1 = requests.pop + expect(req1.query_string).to eq("connection=1") + + req2 = requests.pop + expect(req2.query_string).to eq("connection=2") + + req3 = requests.pop + expect(req3.query_string).to eq("connection=3") + expect(attempt).to eq(3) + end + end + end + + it "handles URL-encoded query parameter values" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + {"basis" => "p:ABC:123", "filter" => "test value with spaces"} + end + end) do |client| + received_req = requests.pop + expect(received_req.query_string).to include("basis=p%3AABC%3A123") + expect(received_req.query_string).to include("filter=test+value+with+spaces") + end + end + end + + it "works with multiple query parameters" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + { + "param1" => "value1", + "param2" => "value2", + "param3" => "value3", + } + end + end) do |client| + received_req = requests.pop + query_string = received_req.query_string + expect(query_string).to include("param1=value1") + expect(query_string).to include("param2=value2") + expect(query_string).to include("param3=value3") + end + end + end + end end From bbf8b1de8621300a9e5506870672abffaed458fa Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Thu, 15 Jan 2026 18:53:40 +0000 Subject: [PATCH 2/3] apply flaky test fix to new tests --- spec/client_spec.rb | 77 ++++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/spec/client_spec.rb b/spec/client_spec.rb index d422125..fe61a52 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -1021,7 +1021,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1031,8 +1032,8 @@ def test_object.to_s end end) do |client| received_req = requests.pop - expect(received_req.query_string).to include("basis=p%3AABC%3A123") - expect(received_req.query_string).to include("filter=test") + expect(received_req[:query_string]).to include("basis=p%3AABC%3A123") + expect(received_req[:query_string]).to include("filter=test") end end end @@ -1042,7 +1043,8 @@ def test_object.to_s requests = Queue.new attempt = 0 server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data attempt += 1 if attempt == 1 send_stream_content(res, "", keep_open: false) # Close to trigger reconnect @@ -1059,10 +1061,10 @@ def test_object.to_s end end) do |client| req1 = requests.pop - expect(req1.query_string).to eq("request_id=1") + expect(req1[:query_string]).to eq("request_id=1") req2 = requests.pop - expect(req2.query_string).to eq("request_id=2") + expect(req2[:query_string]).to eq("request_id=2") expect(attempt).to eq(2) end end @@ -1072,7 +1074,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1083,8 +1086,8 @@ def test_object.to_s end end) do |client| received_req = requests.pop - expect(received_req.query_string).to include("static=value") - expect(received_req.query_string).to include("dynamic=param") + expect(received_req[:query_string]).to include("static=value") + expect(received_req[:query_string]).to include("dynamic=param") end end end @@ -1093,7 +1096,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1105,7 +1109,7 @@ def test_object.to_s end) do |client| received_req = requests.pop # Dynamic params should override static ones - expect(received_req.query_string).to eq("key=overridden") + expect(received_req[:query_string]).to eq("key=overridden") end end end @@ -1114,13 +1118,14 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end with_client(subject.new(server.base_uri)) do |client| received_req = requests.pop - expect(received_req.query_string).to be_nil + expect(received_req[:query_string]).to be_nil end end end @@ -1129,14 +1134,15 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end base_uri_with_params = "#{server.base_uri}?existing=param" with_client(subject.new(base_uri_with_params)) do |client| received_req = requests.pop - expect(received_req.query_string).to eq("existing=param") + expect(received_req[:query_string]).to eq("existing=param") end end end @@ -1145,7 +1151,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1157,7 +1164,7 @@ def test_object.to_s end) do |client| received_req = requests.pop # Empty hash should preserve existing params (consistent with Python behavior) - expect(received_req.query_string).to eq("existing=param") + expect(received_req[:query_string]).to eq("existing=param") end end end @@ -1166,7 +1173,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1177,7 +1185,7 @@ def test_object.to_s end) do |client| received_req = requests.pop # Should proceed with base URI (no query params) - expect(received_req.query_string).to be_nil + expect(received_req[:query_string]).to be_nil end end end @@ -1186,7 +1194,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1198,7 +1207,7 @@ def test_object.to_s end) do |client| received_req = requests.pop # Should fall back to base URI params - expect(received_req.query_string).to eq("fallback=value") + expect(received_req[:query_string]).to eq("fallback=value") end end end @@ -1207,7 +1216,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1219,7 +1229,7 @@ def test_object.to_s end) do |client| received_req = requests.pop # Should fall back to base URI params - expect(received_req.query_string).to eq("fallback=value") + expect(received_req[:query_string]).to eq("fallback=value") end end end @@ -1229,7 +1239,8 @@ def test_object.to_s requests = Queue.new attempt = 0 server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data attempt += 1 if attempt <= 2 res.status = 500 @@ -1248,13 +1259,13 @@ def test_object.to_s end end) do |client| req1 = requests.pop - expect(req1.query_string).to eq("connection=1") + expect(req1[:query_string]).to eq("connection=1") req2 = requests.pop - expect(req2.query_string).to eq("connection=2") + expect(req2[:query_string]).to eq("connection=2") req3 = requests.pop - expect(req3.query_string).to eq("connection=3") + expect(req3[:query_string]).to eq("connection=3") expect(attempt).to eq(3) end end @@ -1264,7 +1275,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1274,8 +1286,8 @@ def test_object.to_s end end) do |client| received_req = requests.pop - expect(received_req.query_string).to include("basis=p%3AABC%3A123") - expect(received_req.query_string).to include("filter=test+value+with+spaces") + expect(received_req[:query_string]).to include("basis=p%3AABC%3A123") + expect(received_req[:query_string]).to include("filter=test+value+with+spaces") end end end @@ -1284,7 +1296,8 @@ def test_object.to_s with_server do |server| requests = Queue.new server.setup_response("/") do |req,res| - requests << req + request_data = { query_string: req.query_string } + requests << request_data send_stream_content(res, "", keep_open: true) end @@ -1298,7 +1311,7 @@ def test_object.to_s end end) do |client| received_req = requests.pop - query_string = received_req.query_string + query_string = received_req[:query_string] expect(query_string).to include("param1=value1") expect(query_string).to include("param2=value2") expect(query_string).to include("param3=value3") From f5cd3fc6cdff7437d50394af3661737dcc59a5dc Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Thu, 15 Jan 2026 19:30:24 +0000 Subject: [PATCH 3/3] remove bad test --- spec/client_spec.rb | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/spec/client_spec.rb b/spec/client_spec.rb index fe61a52..43eaa32 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -205,28 +205,6 @@ def send_stream_content(res, content, keep_open:) end end - it "invokes error handler when server returns 204 No Content" do - with_server do |server| - server.setup_response("/") do |req,res| - res.status = 204 - res.body = "" - res.keep_alive = false - end - - error_sink = Queue.new - client = subject.new(server.base_uri) do |c| - c.on_error { |error| error_sink << error } - end - - with_client(client) do |c| - error = error_sink.pop - expect(error).to be_a(SSE::Errors::HTTPStatusError) - expect(error.status).to eq(204) - expect(error.message).to eq("") - end - end - end - it "reconnects after read timeout" do events_body = simple_event_1_text with_server do |server|