Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Elixir CI

on:
push:
branches:
- master
pull_request:

env:
MIX_ENV: test

jobs:
test:
strategy:
matrix:
include:
- otp_version: 25.3
elixir_version: 1.16

- otp_version: 27.2
elixir_version: 1.18
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: erlef/setup-beam@v1
with:
elixir-version: ${{ matrix.elixir_version }}
otp-version: ${{ matrix.otp_version }}

- name: Install dependencies
run: mix deps.get
- name: Compile dependencies
run: mix deps.compile
- name: Run tests
run: mix test --warnings-as-errors
- name: Run Credo
run: mix credo --strict

format:
runs-on: ubuntu-latest
name: mix format
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
with:
otp-version: 27.2
elixir-version: 1.18
- run: mix format --check-formatted
4 changes: 2 additions & 2 deletions examples/publish_mp4.exs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule Publisher do

defp init_tag(%{media: :h264} = track) do
<<_::binary-size(8), dcr::binary>> = ExMP4.Box.serialize(track.priv_data)
VideoData.AVC.new(dcr, :sequence_header, 0) |> VideoData.new(:avc, :keyframe)
VideoData.AVC.new(dcr, :sequence_header, 0) |> VideoData.new(:h264, :keyframe)
end

defp init_tag(%{media: :h265} = track) do
Expand All @@ -116,7 +116,7 @@ defmodule Publisher do
defp video_tag(:h264, sample, ct) do
sample.payload
|> VideoData.AVC.new(:nalu, ct)
|> VideoData.new(:avc, if(sample.sync?, do: :keyframe, else: :interframe))
|> VideoData.new(:h264, if(sample.sync?, do: :keyframe, else: :interframe))
end

defp video_tag(:h265, sample, ct) do
Expand Down
6 changes: 3 additions & 3 deletions examples/read_to_flv.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ defmodule StreamWriter do
end

@impl true
def handle_info({:video, pid, _stream_id, {:codec, :avc, dcr}}, %{client: pid} = state) do
def handle_info({:video, pid, _stream_id, {:codec, :h264, dcr}}, %{client: pid} = state) do
payload =
dcr
|> Tag.AVCVideoPacket.new(:sequence_header, 0)
|> Tag.VideoData.new(:avc, :keyframe)
|> Tag.VideoData.new(:h264, :keyframe)

tag = Tag.serialize(%Tag{type: :video, data: payload, timestamp: 0})
IO.binwrite(state.writer, [tag, <<IO.iodata_length(tag)::32>>])
Expand Down Expand Up @@ -73,7 +73,7 @@ defmodule StreamWriter do
payload
|> Enum.map(&<<byte_size(&1)::32, &1::binary>>)
|> Tag.AVCVideoPacket.new(:nalu, pts - dts)
|> Tag.VideoData.new(:avc, if(sync?, do: :keyframe, else: :interframe))
|> Tag.VideoData.new(:h264, if(sync?, do: :keyframe, else: :interframe))

tag = Tag.serialize(%Tag{type: :video, data: payload, timestamp: dts})
IO.binwrite(state.writer, [tag, <<IO.iodata_length(tag)::32>>])
Expand Down
6 changes: 3 additions & 3 deletions examples/send_mp4.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Publisher do
video_reducer: video_reducer,
audio_track: audio_track,
audio_reducer: audio_reducer,
stream_id: opts[:stream_id],
stream_id: opts[:stream_id]
}, {:continue, :send_init_data}}
end

Expand All @@ -55,7 +55,7 @@ defmodule Publisher do

dcr
|> AVCVideoPacket.new(:sequence_header, 0)
|> VideoData.new(:avc, :keyframe)
|> VideoData.new(:h264, :keyframe)
|> ExFLV.Tag.Serializer.serialize()
|> then(&ClientSession.send_video_data(state.rtmp_sender, state.stream_id, 0, &1))

Expand Down Expand Up @@ -83,7 +83,7 @@ defmodule Publisher do
data =
sample.payload
|> AVCVideoPacket.new(:nalu, sample.pts - sample.dts)
|> VideoData.new(:avc, frame_type)
|> VideoData.new(:h264, frame_type)
|> ExFLV.Tag.Serializer.serialize()

ClientSession.send_video_data(state.rtmp_sender, state.stream_id, timestamp, data)
Expand Down
5 changes: 2 additions & 3 deletions lib/ex_rtmp/chunk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ defmodule ExRTMP.Chunk do
@spec parse_header(binary()) :: {:ok, t(), binary()} | :more
def parse_header(data) do
with {:ok, chunk, rest} <- parse_stream_id(data),
{:ok, chunk, rest} <- parse_message_header(chunk, rest),
{:ok, chunk, rest} <- parse_extended_timestamp(chunk, rest) do
{:ok, chunk, rest}
{:ok, chunk, rest} <- parse_message_header(chunk, rest) do
parse_extended_timestamp(chunk, rest)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/ex_rtmp/chunk_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule ExRTMP.ChunkParser do

defstruct [:unprocessed_data, :messages, :message_first_chunk, chunk_size: 128]

@spec new() :: t()
def new() do
@spec new :: t()
def new do
%__MODULE__{
unprocessed_data: <<>>,
messages: %{},
Expand Down
24 changes: 12 additions & 12 deletions lib/ex_rtmp/client/media_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule ExRTMP.Client.MediaProcessor do
@type codec ::
AudioData.source_format()
| VideoData.codec_id()
| ExVideoData.fourcc()
| ExAudioData.fourcc()
| ExVideoData.codec_id()
| ExAudioData.codec_id()

@type track :: {:codec, codec(), binary()}
@type video_sample ::
Expand All @@ -31,8 +31,8 @@ defmodule ExRTMP.Client.MediaProcessor do

defstruct [:nalu_prefix_size, video?: false, audio?: false]

@spec new() :: t()
def new(), do: %__MODULE__{}
@spec new :: t()
def new, do: %__MODULE__{}

@spec push_video(Message.t(), t()) :: {video_return(), t()}
def push_video(message, processor) do
Expand All @@ -56,7 +56,7 @@ defmodule ExRTMP.Client.MediaProcessor do
defp parse_audio_tag(<<9::4, _::bitstring>> = data), do: ExAudioData.parse!(data)
defp parse_audio_tag(data), do: AudioData.parse!(data)

defp handle_video_tag(%VideoData{codec_id: :avc} = tag, timestamp, processor) do
defp handle_video_tag(%VideoData{codec_id: :h264} = tag, timestamp, processor) do
packet_type = tag.data.packet_type

cond do
Expand All @@ -67,11 +67,11 @@ defmodule ExRTMP.Client.MediaProcessor do
packet_type == :sequence_header ->
processor = %{
processor
| nalu_prefix_size: nalu_prefix_size(:avc, tag.data.data),
| nalu_prefix_size: nalu_prefix_size(:h264, tag.data.data),
video?: true
}

{{:codec, :avc, tag.data.data}, processor}
{{:codec, :h264, tag.data.data}, processor}

packet_type == :end_of_sequence ->
{nil, processor}
Expand Down Expand Up @@ -101,11 +101,11 @@ defmodule ExRTMP.Client.MediaProcessor do
packet_type == :sequence_start ->
processor = %{
processor
| nalu_prefix_size: nalu_prefix_size(tag.fourcc, tag.data),
| nalu_prefix_size: nalu_prefix_size(tag.codec_id, tag.data),
video?: true
}

{{:codec, tag.fourcc, tag.data}, processor}
{{:codec, tag.codec_id, tag.data}, processor}

packet_type in [:sequence_end, :metadata] ->
{nil, processor}
Expand Down Expand Up @@ -149,7 +149,7 @@ defmodule ExRTMP.Client.MediaProcessor do
{{:sample, tag.data, timestamp}, processor}

tag.packet_type == :sequence_start ->
{{:codec, tag.fourcc, tag.data}, %{processor | audio?: true}}
{{:codec, tag.codec_id, tag.data}, %{processor | audio?: true}}

true ->
{nil, processor}
Expand All @@ -165,12 +165,12 @@ defmodule ExRTMP.Client.MediaProcessor do
{{:sample, tag.data, timestamp}, processor}
end

defp nalu_prefix_size(:hvc1, <<_::binary-size(21), _::6, nalu_prefix_size::2, _::binary>>) do
defp nalu_prefix_size(:h265, <<_::binary-size(21), _::6, nalu_prefix_size::2, _::binary>>) do
nalu_prefix_size + 1
end

defp nalu_prefix_size(codec, <<_::38, nalu_prefix_size::2, _::binary>>)
when codec == :avc or codec == :avc1 do
when codec == :h264 or codec == :h265 do
nalu_prefix_size + 1
end

Expand Down
82 changes: 44 additions & 38 deletions lib/ex_rtmp/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule ExRTMP.Message do
alias __MODULE__.Command.NetStream.{DeleteStream, FCPublish, OnStatus, Play, Publish}
alias __MODULE__.Metadata
alias __MODULE__.UserControl.Event
alias ExRTMP.Chunk
alias ExRTMP.{Chunk, Message}

@type stream_id :: non_neg_integer()

Expand Down Expand Up @@ -149,7 +149,7 @@ defmodule ExRTMP.Message do

payload =
if is_struct(message.payload),
do: ExRTMP.Message.Serializer.serialize(message.payload),
do: Message.Serializer.serialize(message.payload),
else: message.payload

payload = IO.iodata_to_binary(payload)
Expand Down Expand Up @@ -221,51 +221,57 @@ defmodule ExRTMP.Message do
@doc false
defp parse_payload(%__MODULE__{type: 20, payload: payload} = msg) do
payload =
case ExRTMP.AMF0.parse(IO.iodata_to_binary(payload)) do
["connect", transaction_id, properties | _rest] ->
%Connect{transaction_id: transaction_id, properties: properties}

["createStream", transaction_id | _rest] ->
%CreateStream{transaction_id: transaction_id}

[result, transaction_id, command_object, data] when result in ["_result", "_error"] ->
%Response{
result: result,
transaction_id: trunc(transaction_id),
command_object: command_object,
data: data
}
payload
|> IO.iodata_to_binary()
|> ExRTMP.AMF0.parse()
|> handle_message_payload()

["publish", _txid, nil, name, type] ->
Publish.new(name, type)
%{msg | payload: payload}
end

["FCPublish", transaction_id, nil, name] ->
FCPublish.new(transaction_id, name)
defp parse_payload(msg), do: msg

["deleteStream", _txid, nil, stream_id] ->
DeleteStream.new(stream_id)
defp handle_message_payload(["connect", transaction_id, properties | _rest]) do
%Connect{transaction_id: transaction_id, properties: properties}
end

["play", _txid, nil, stream_name | opts] ->
play_opts =
case opts do
[] -> []
[start] -> [start: start]
[start, duration] -> [start: start, duration: duration]
[start, duration, reset] -> [start: start, duration: duration, reset: reset]
end
defp handle_message_payload(["createStream", transaction_id | _rest]) do
%CreateStream{transaction_id: transaction_id}
end

Play.new(stream_name, play_opts)
defp handle_message_payload([result, transaction_id, command_object, data])
when result in ["_result", "_error"] do
%Response{
result: result,
transaction_id: trunc(transaction_id),
command_object: command_object,
data: data
}
end

["onStatus", _ts_id, nil, info] ->
%OnStatus{info: info}
defp handle_message_payload(["publish", _txid, nil, name, type]), do: Publish.new(name, type)
defp handle_message_payload(["onStatus", _txid, nil, info]), do: %OnStatus{info: info}

other ->
Logger.warning("Unknown command: #{inspect(List.first(other))}")
payload
defp handle_message_payload(["play", _txid, nil, name | opts]) do
play_opts =
case opts do
[] -> []
[start] -> [start: start]
[start, duration] -> [start: start, duration: duration]
[start, duration, reset] -> [start: start, duration: duration, reset: reset]
end

%{msg | payload: payload}
Play.new(name, play_opts)
end

defp parse_payload(msg), do: msg
defp handle_message_payload(["deleteStream", _txid, nil, stream_id]),
do: DeleteStream.new(stream_id)

defp handle_message_payload(["FCPublish", transaction_id, nil, name]),
do: FCPublish.new(transaction_id, name)

defp handle_message_payload(other) do
Logger.warning("Unknown command: #{inspect(other)}")
other
end
end
16 changes: 8 additions & 8 deletions lib/ex_rtmp/message/command/net_stream/on_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ defmodule ExRTMP.Message.Command.NetStream.OnStatus do
%__MODULE__{info: info}
end

@spec publish_ok() :: t()
def publish_ok(), do: new("NetStream.Publish.Start")
@spec publish_ok :: t()
def publish_ok, do: new("NetStream.Publish.Start")

@spec publish_bad_stream() :: t()
def publish_bad_stream() do
@spec publish_bad_stream :: t()
def publish_bad_stream do
new("NetStream.Publish.BadStream", :error, "Unknown stream")
end

Expand All @@ -31,11 +31,11 @@ defmodule ExRTMP.Message.Command.NetStream.OnStatus do
new("NetStream.Publish.Failed", :error, reason)
end

@spec play_ok() :: t()
def play_ok(), do: new("NetStream.Play.Start")
@spec play_ok :: t()
def play_ok, do: new("NetStream.Play.Start")

@spec play_bad_stream() :: t()
def play_bad_stream() do
@spec play_bad_stream :: t()
def play_bad_stream do
new("NetStream.Play.BadStream", :error, "Unknown stream")
end

Expand Down
2 changes: 1 addition & 1 deletion lib/ex_rtmp/server/client_session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule ExRTMP.Server.ClientSession do
alias ExRTMP.Message
alias ExRTMP.Message.Command.NetConnection
alias ExRTMP.Message.Command.NetConnection.{CreateStream, Response}
alias ExRTMP.Message.Command.NetStream.{DeleteStream, FCPublish, Play, Publish, OnStatus}
alias ExRTMP.Message.Command.NetStream.{DeleteStream, FCPublish, OnStatus, Play, Publish}
alias ExRTMP.Message.Metadata

@default_acknowledgement_size 3_000_000
Expand Down
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule ExRTMP.MixProject do
[
app: :ex_rtmp,
version: @version,
elixir: "~> 1.15",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
deps: deps(),
elixirc_paths: elixirc_paths(Mix.env()),
Expand All @@ -32,8 +32,9 @@ defmodule ExRTMP.MixProject do

defp deps do
[
{:ex_flv, "~> 0.3.0"},
{:ex_flv, "~> 0.4.0"},
{:ex_doc, "~> 0.38", only: :dev, runtime: false},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:media_codecs, "~> 0.8.0", only: :test}
]
end
Expand Down
Loading