HEX
Server: Apache
System: Linux opal14.opalstack.com 3.10.0-1160.108.1.el7.x86_64 #1 SMP Thu Jan 25 16:17:31 UTC 2024 x86_64
User: curbgloabal_opal (1234)
PHP: 8.1.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/erlang/lib/common_test-1.23/src/ct_netconfc.erl
%%----------------------------------------------------------------------
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2012-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%----------------------------------------------------------------------
%% File: ct_netconfc.erl
%%
%% Description:
%%    This file contains the Netconf client interface
%%
%% Netconf servers can be configured by adding the following statement
%% to a configuration file:
%%
%% {server_id(), [option()]}.
%%
%% The server_id() or an associated ct:target_name() shall then be
%% used in calls to open/2 connect/2.
%%
%% If no configuration exists for a server, use open/1 and connect/1.
%%
%% == Logging ==
%%
%% The netconf server uses the `error_logger' for logging of netconf
%% traffic. A special purpose error handler is implemented in
%% `ct_conn_log_h'. To use this error handler, add the `cth_conn_log'
%% hook in your test suite, e.g.
%%
%% suite() ->
%%     [{ct_hooks, [{cth_conn_log, [{ct:conn_log_mod(),ct:conn_log_options()}]}]}].
%%
%% For example:
%%
%% suite() ->
%%     [{ct_hooks,
%%         [{cth_conn_log,[{ct_netconfc,[{log_type,pretty},
%%                                       {hosts,[my_configured_server]}]}]}
%%
%% == Notifications ==
%%
%% The netconf client is also compliant with RFC5277 NETCONF Event
%% Notifications, which defines a mechanism for an asynchronous
%% message notification delivery service for the netconf protocol.
%% Functions supporting this are create_subscription/3
%% get_event_streams/3.
%%
%%----------------------------------------------------------------------
-module(ct_netconfc).

-dialyzer(no_improper_lists).

-include("ct_netconfc.hrl").
-include("ct_util.hrl").
-include_lib("xmerl/include/xmerl.hrl").

%%----------------------------------------------------------------------
%% External exports
%%----------------------------------------------------------------------
-export([connect/1,
         connect/2,
         disconnect/1,
         session/1,
         session/2,
         session/3,
         open/1,
	 open/2,
	 only_open/1,
	 only_open/2,
	 hello/1,
	 hello/2,
	 hello/3,
	 close_session/1,
	 close_session/2,
	 kill_session/2,
	 kill_session/3,
	 send/2,
	 send/3,
	 send_rpc/2,
	 send_rpc/3,
	 lock/2,
	 lock/3,
	 unlock/2,
	 unlock/3,
	 get/2,
	 get/3,
	 get_config/3,
	 get_config/4,
	 edit_config/3,
	 edit_config/4,
	 edit_config/5,
	 delete_config/2,
	 delete_config/3,
	 copy_config/3,
	 copy_config/4,
	 action/2,
	 action/3,
	 create_subscription/2,
	 create_subscription/3,
	 get_event_streams/1,
	 get_event_streams/2,
	 get_event_streams/3,
	 get_capabilities/1,
	 get_capabilities/2,
	 get_session_id/1,
	 get_session_id/2]).

%% historic, no longer documented
-export([create_subscription/1,
	 create_subscription/4,
	 create_subscription/5,
	 create_subscription/6]).

%%----------------------------------------------------------------------
%% Exported types
%%----------------------------------------------------------------------
-export_type([client/0,
              handle/0,
              notification/0]).

%%----------------------------------------------------------------------
%% Internal exports
%%----------------------------------------------------------------------
%% ct_gen_conn callbacks
-export([init/3,
	 handle_msg/3,
	 handle_msg/2,
	 terminate/2,
	 close/1]).

%% ct_conn_log callback
-export([format_data/2]).

%%----------------------------------------------------------------------
%% Internal defines
%%----------------------------------------------------------------------
-define(APPLICATION,?MODULE).
-define(DEFAULT_STREAM,"NETCONF").

-define(error(ConnName,Report),
	error_logger:error_report([{ct_connection,ConnName},
				   {client,self()},
				   {module,?MODULE},
				   {line,?LINE} |
				   Report])).

-define(is_timeout(T), (is_integer(T) orelse T==infinity)).
-define(is_filter(F),
	(?is_simple_xml(F)
	 orelse (F==[])
	 orelse (is_list(F) andalso ?is_simple_xml(hd(F))))).
-define(is_simple_xml(Xml),
	(is_atom(Xml) orelse (is_tuple(Xml) andalso is_atom(element(1,Xml))))).
-define(is_string(S), (is_list(S) andalso is_integer(hd(S)))).

%% Keys into the process dictionary.
-define(KEY(T), {?MODULE, T}).

%%----------------------------------------------------------------------
%% Records
%%----------------------------------------------------------------------
%% Client state
-record(state, {host,
		port,
		connection,      % #connection
		capabilities,
		session_id,
		msg_id = 1,
		hello_status,    % undefined | received | #pending{}
                                 % string() | {error, Reason}
		buf = false,     % binary() | list() | boolean()
		pending = [],    % [#pending]
		receivers = [] :: list() | pid()}).% notification destinations

%% Run-time client options.
-record(options, {ssh = [], % Options for the ssh application
		  host,
		  port = ?DEFAULT_PORT,
		  timeout = ?DEFAULT_TIMEOUT,
                  receivers = [],
		  name,
                  type}).

%% Connection reference
-record(connection, {reference, % {CM,Ch}
		     host,
		     port,
		     name,
                     type}).

%% Pending replies from server
-record(pending, {tref :: false | reference(),  % timer reference
		  msg_id,
                  op,
		  caller}).% pid which sent the request

%%----------------------------------------------------------------------
%% Type declarations
%%----------------------------------------------------------------------
-type client() :: handle() | server_id() | ct:target_name().
-opaque handle() :: pid().

-type option() :: {host | ssh, host()}
                | {port, inet:port_number()}
                | {timeout, timeout()}
                | {capability, string() | [string()]}
                | {receiver, term()}
                | ssh:client_option().

-type session_option() :: {timeout,timeout()}
                        | {receiver, term()}
                        | {capability, string() | [string()]}.

-type host() :: inet:hostname() | inet:ip_address().

-type notification() :: {notification, xml_attributes(), [simple_xml()]}.

-type stream_name() :: string().
-type streams() :: [{stream_name(),[stream_data()]}].
-type stream_data() :: {description,string()} |
		       {replaySupport,string()} |
		       {replayLogCreationTime,string()} |
		       {replayLogAgedTime,string()}.
%% See XML Schema for Event Notifications found in RFC5277 for further
%% detail about the data format for the string values.

-type error_reason() :: term().

-type server_id() :: atom().

-type simple_xml() :: {xml_tag(), xml_attributes(), xml_content()} |
		      {xml_tag(), xml_content()} |
		      xml_tag().
-type xml_tag() :: atom().
-type xml_attributes() :: [{xml_attribute_tag(),xml_attribute_value()}].
-type xml_attribute_tag() :: atom().
-type xml_attribute_value() :: string().
-type xml_content() :: [simple_xml() | iolist()].
-type xpath() :: {xpath,string()}.

-type netconf_db() :: running | startup | candidate.
-type xs_datetime() :: string().
%% This date and time identifyer has the same format as the XML type
%% dateTime and compliant to RFC3339. The format is
%% "[-]CCYY-MM-DDThh:mm:ss[.s][Z|(+|-)hh:mm]"

%%----------------------------------------------------------------------
%% External interface functions
%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Open an SSH connection to a Netconf server
%% If the server options are specified in a configuration file, use
%% open/2.

%% connect/1

-spec connect(Options) -> Result when
      Options :: [option()],
      Result :: {ok, handle()} | {error, error_reason()}.
connect(Options) ->
    connect(Options, #options{type = connection}, []).

%% connect/2

-spec connect(KeyOrName, ExtraOptions) -> Result when
      KeyOrName :: ct:key_or_name(),
      ExtraOptions :: [option()],
      Result :: {ok, handle()} | {error, error_reason()}.

connect(KeyOrName, ExtraOptions) ->
    connect(make_opts(KeyOrName, ExtraOptions),
            #options{name = KeyOrName, type = connection},
            [{name, KeyOrName}]).

%% connect/3

connect(Opts, InitRec, NameOpt) ->
    case make_options(Opts, InitRec) of
        #options{} = Rec ->
            start(Rec, NameOpt, false);
        {error, _} = No ->
            No
    end.

%% make_opts/2

make_opts(KeyOrName, ExtraOptions) ->
    SortedExtra = lists:keysort(1, ExtraOptions),
    SortedConfig = lists:keysort(1, ct:get_config(KeyOrName, [])),
    lists:ukeymerge(1, SortedConfig, SortedExtra).

%%----------------------------------------------------------------------
%% Close the given SSH connection.
-spec disconnect(Conn) -> ok | {error,error_reason()} when
      Conn :: handle().
disconnect(Conn) ->
    case call(Conn,get_ssh_connection) of
        {ok,_} ->
            ct_gen_conn:stop(Conn);
        Error ->
            Error
    end.

%%----------------------------------------------------------------------
%% Open a netconf session as a channel on the given SSH connection,
%% and exchange `hello' messages.

%% session/1

-spec session(Conn) -> Result when
      Conn :: handle(),
      Result :: {ok, handle()} | {error, error_reason()}.

session(Conn) ->
    session(Conn, [], #options{type = channel}, []).

%% session/2

-spec session(Conn, Options) -> Result when
      Conn :: handle(),
      Options :: [session_option()],
      Result :: {ok, handle()} | {error, error_reason()};
             (KeyOrName, Conn) -> Result when
      KeyOrName :: ct:key_or_name(),
      Conn :: handle(),
      Result :: {ok, handle()} | {error, error_reason()}.

session(Conn, Options) when is_list(Options) ->
    session(Conn, Options, #options{type = channel}, []);

session(KeyOrName, Conn) ->
    session(Conn,
            [],
            #options{name = KeyOrName, type = channel},
            [{name, KeyOrName}]).

%% session/3

-spec session(KeyOrName, Conn, Options) -> Result when
      Conn :: handle(),
      Options :: [session_option()],
      KeyOrName :: ct:key_or_name(),
      Result :: {ok, handle()} | {error, error_reason()}.

session(KeyOrName, Conn, ExtraOptions) ->
    session(Conn,
            make_opts(KeyOrName, ExtraOptions),
            #options{name = KeyOrName, type = channel},
            [{name, KeyOrName}]).

%% session/4

session(Conn, Opts, InitRec, NameOpt) ->
    T = make_ref(),
    try
        [_ | {ok, SshConn}] = [T | call(Conn, get_ssh_connection)],
        [_ | #options{} = Rec] = [T | make_session_options(Opts, InitRec)],
        [_ | {ok, Client} = Ok] = [T | start(SshConn, Rec, NameOpt, true)],
        [_ | ok] = [T | hello(Client, caps(Opts), Rec#options.timeout)],
        Ok
    catch
        error: {badmatch, [T | Error]} ->
            Error
    end.

%% caps/1

caps(Opts) ->
    [T || {capability, _} = T <- Opts].

%%----------------------------------------------------------------------
%% Open a netconf session and exchange 'hello' messages.
%% If the server options are specified in a configuration file, use
%% open/2.

%% open/1

-spec open(Options) -> Result when
      Options :: [option()],
      Result :: {ok, handle()} | {error, error_reason()}.

open(Options) ->
    open(Options,
         #options{type = connection_and_channel},
         [],
         true).

-spec open(KeyOrName, ExtraOption) -> Result when
      KeyOrName :: ct:key_or_name(),
      ExtraOption :: [option()],
      Result :: {ok, handle()} | {error, error_reason()}.

open(KeyOrName, ExtraOpts) ->
    open(KeyOrName, ExtraOpts, true).

%% open/3

open(KeyOrName, ExtraOptions, Hello) ->
    open(make_opts(KeyOrName, ExtraOptions),
         #options{name = KeyOrName, type = connection_and_channel},
         [{name, KeyOrName}],
         Hello).

%% open/4

open(Opts, InitRec, NameOpt, Hello) ->
    T = make_ref(),
    try
        [_, #options{} = Rec] = [T, make_options(Opts, InitRec)],
        [_, {ok, Client} = Ok | true] = [T, start(Rec, NameOpt, true) | Hello],
        [_, ok] = [T, hello(Client, caps(Opts), Rec#options.timeout)],
        Ok
    catch
        error: {badmatch, [T, Res | _]} ->
            Res
    end.

%% start/3

start(#options{host = undefined}, _, _) ->
    {error, no_host_address};

start(#options{port = undefined}, _, _) ->
    {error, no_port};

start(#options{host = Host, port = Port} = Opts, NameOpt, Fwd) ->
    start({Host, Port}, Opts, NameOpt, Fwd).

%% start/4

start(Ep, Opts, NameOpt, Fwd) ->
    ct_gen_conn:start(Ep, Opts, ?MODULE, [{reconnect, false},
                                          {use_existing_connection, false},
                                          {forward_messages, Fwd}
                                          | NameOpt]).

%%----------------------------------------------------------------------
%% Like open/1,2, but no 'hello' message is sent.

-spec only_open(Options) -> Result when
      Options :: [option()],
      Result :: {ok, handle()} | {error, error_reason()}.

only_open(Options) ->
    open(Options, #options{type = connection_and_channel}, [], false).

-spec only_open(KeyOrName, ExtraOptions) -> Result when
      KeyOrName :: ct:key_or_name(),
      ExtraOptions :: [option()],
      Result :: {ok, handle()} | {error, error_reason()}.

only_open(KeyOrName, ExtraOpts) ->
    open(KeyOrName, ExtraOpts, false).

%%----------------------------------------------------------------------
%% Send a 'hello' message.

%% hello/1

-spec hello(Client) -> Result when
      Client :: handle(),
      Result :: ok | {error, error_reason()}.

hello(Client) ->
    hello(Client, [], ?DEFAULT_TIMEOUT).

%% hello/2

-spec hello(Client, Timeout) -> Result when
      Client :: handle(),
      Timeout :: timeout(),
      Result :: ok | {error, error_reason()}.

hello(Client, Timeout) ->
    hello(Client, [], Timeout).

%% hello/3

-spec hello(Client, Options, Timeout) -> Result when
      Client :: handle(),
      Options :: [{capability, [string()]}],
      Timeout :: timeout(),
      Result :: ok | {error, error_reason()}.

hello(Client, Options, Timeout) ->
    call(Client, {hello, Options, Timeout}).


%%----------------------------------------------------------------------
%% Get the session id for the session specified by Client.
-spec get_session_id(Client) -> Result when
      Client :: client(),
      Result :: pos_integer() | {error,error_reason()}.
get_session_id(Client) ->
    get_session_id(Client, ?DEFAULT_TIMEOUT).

-spec get_session_id(Client, Timeout) -> Result when
      Client :: client(),
      Timeout :: timeout(),
      Result :: pos_integer() | {error,error_reason()}.
get_session_id(Client, Timeout) ->
    call(Client, get_session_id, Timeout).

%%----------------------------------------------------------------------
%% Get the server side capabilities.
-spec get_capabilities(Client) -> Result when
      Client :: client(),
      Result :: [string()] | {error,error_reason()}.
get_capabilities(Client) ->
    get_capabilities(Client, ?DEFAULT_TIMEOUT).

-spec get_capabilities(Client, Timeout) -> Result when
      Client :: client(),
      Timeout :: timeout(),
      Result :: [string()] | {error,error_reason()}.
get_capabilities(Client, Timeout) ->
    call(Client, get_capabilities, Timeout).

%%----------------------------------------------------------------------
%% Send an XML document to the server.
-spec send(Client, SimpleXml) -> Result when
      Client :: client(),
      SimpleXml :: simple_xml(),
      Result :: simple_xml() | {error,error_reason()}.
send(Client, SimpleXml) ->
    send(Client, SimpleXml, ?DEFAULT_TIMEOUT).

-spec send(Client, SimpleXml, Timeout) -> Result when
      Client :: client(),
      SimpleXml :: simple_xml(),
      Timeout :: timeout(),
      Result :: simple_xml() | {error,error_reason()}.
send(Client, SimpleXml, Timeout) ->
    call(Client,{send, Timeout, SimpleXml}).

%%----------------------------------------------------------------------
%% Wrap the given XML document in a valid netconf 'rpc' request and
%% send to the server.
-spec send_rpc(Client, SimpleXml) -> Result when
      Client :: client(),
      SimpleXml :: simple_xml(),
      Result :: [simple_xml()] | {error,error_reason()}.
send_rpc(Client, SimpleXml) ->
    send_rpc(Client, SimpleXml, ?DEFAULT_TIMEOUT).

-spec send_rpc(Client, SimpleXml, Timeout) -> Result when
      Client :: client(),
      SimpleXml :: simple_xml(),
      Timeout :: timeout(),
      Result :: [simple_xml()] | {error,error_reason()}.
send_rpc(Client, SimpleXml, Timeout) ->
    call(Client,{send_rpc, SimpleXml, Timeout}).

%%----------------------------------------------------------------------
%% Send a 'lock' request.
-spec lock(Client, Target) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Result :: ok | {error,error_reason()}.
lock(Client, Target) ->
    lock(Client, Target,?DEFAULT_TIMEOUT).

-spec lock(Client, Target, Timeout) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
lock(Client, Target, Timeout) ->
    call(Client,{send_rpc_op,lock,[Target],Timeout}).

%%----------------------------------------------------------------------
%% Send a 'unlock' request.
-spec unlock(Client, Target) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Result :: ok | {error,error_reason()}.
unlock(Client, Target) ->
    unlock(Client, Target,?DEFAULT_TIMEOUT).

-spec unlock(Client, Target, Timeout) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
unlock(Client, Target, Timeout) ->
    call(Client, {send_rpc_op, unlock, [Target], Timeout}).

%%----------------------------------------------------------------------
%% Send a 'get' request.
-spec get(Client, Filter) -> Result when
      Client :: client(),
      Filter :: simple_xml() | xpath(),
      Result :: {ok,[simple_xml()]} | {error,error_reason()}.
get(Client, Filter) ->
    get(Client, Filter, ?DEFAULT_TIMEOUT).

-spec get(Client, Filter, Timeout) -> Result when
      Client :: client(),
      Filter :: simple_xml() | xpath(),
      Timeout :: timeout(),
      Result :: {ok,[simple_xml()]} | {error,error_reason()}.
get(Client, Filter, Timeout) ->
    call(Client,{send_rpc_op, get, [Filter], Timeout}).

%%----------------------------------------------------------------------
%% Send a 'get-config' request.
-spec get_config(Client, Source, Filter) -> Result when
      Client :: client(),
      Source :: netconf_db(),
      Filter :: simple_xml() | xpath(),
      Result :: {ok,[simple_xml()]} | {error,error_reason()}.
get_config(Client, Source, Filter) ->
    get_config(Client, Source, Filter, ?DEFAULT_TIMEOUT).

-spec get_config(Client, Source, Filter, Timeout) -> Result when
      Client :: client(),
      Source :: netconf_db(),
      Filter :: simple_xml() | xpath(),
      Timeout :: timeout(),
      Result :: {ok,[simple_xml()]} | {error,error_reason()}.
get_config(Client, Source, Filter, Timeout) ->
    call(Client, {send_rpc_op, get_config, [Source, Filter], Timeout}).

%%----------------------------------------------------------------------
%% Send a 'edit-config' request.
-spec edit_config(Client, Target, Config) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Config :: simple_xml() | [simple_xml()],
      Result :: ok | {error,error_reason()}.
edit_config(Client, Target, Config) ->
    edit_config(Client, Target, Config, ?DEFAULT_TIMEOUT).

-spec edit_config(Client, Target, Config, OptParams) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Config :: simple_xml() | [simple_xml()],
      OptParams :: [simple_xml()],
      Result :: ok | {error,error_reason()};
                 (Client, Target, Config, Timeout) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Config :: simple_xml(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
edit_config(Client, Target, Config, Timeout) when ?is_timeout(Timeout) ->
    edit_config(Client, Target, Config, [], Timeout);
edit_config(Client, Target, Config, OptParams) when is_list(OptParams) ->
    edit_config(Client, Target, Config, OptParams, ?DEFAULT_TIMEOUT).

-spec edit_config(Client, Target, Config, OptParams, Timeout) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Config :: simple_xml() | [simple_xml()],
      OptParams :: [simple_xml()],
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
edit_config(Client, Target, Config, OptParams, Timeout) when not is_list(Config)->
    edit_config(Client, Target, [Config], OptParams, Timeout);
edit_config(Client, Target, Config, OptParams, Timeout) ->
    call(Client, {send_rpc_op, edit_config, [Target,Config,OptParams], Timeout}).


%%----------------------------------------------------------------------
%% Send a 'delete-config' request.
-spec delete_config(Client, Target) -> Result when
      Client :: client(),
      Target :: startup | candidate,
      Result :: ok | {error,error_reason()}.
delete_config(Client, Target) ->
    delete_config(Client, Target, ?DEFAULT_TIMEOUT).

-spec delete_config(Client, Target, Timeout) -> Result when
      Client :: client(),
      Target :: startup | candidate,
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
delete_config(Client, Target, Timeout) when Target == startup;
					    Target == candidate ->
    call(Client,{send_rpc_op, delete_config, [Target], Timeout}).

%%----------------------------------------------------------------------
%% Send a 'copy-config' request.
-spec copy_config(Client, Target, Source) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Source :: netconf_db(),
      Result :: ok | {error,error_reason()}.
copy_config(Client, Source, Target) ->
    copy_config(Client, Source, Target, ?DEFAULT_TIMEOUT).

-spec copy_config(Client, Target, Source, Timeout) -> Result when
      Client :: client(),
      Target :: netconf_db(),
      Source :: netconf_db(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
copy_config(Client, Target, Source, Timeout) ->
    call(Client,{send_rpc_op, copy_config, [Target, Source], Timeout}).

%%----------------------------------------------------------------------
%% Execute an action.
-spec action(Client, Action) -> Result when
      Client :: client(),
      Action :: simple_xml(),
      Result :: ok | {ok,[simple_xml()]} | {error,error_reason()}.
action(Client,Action) ->
    action(Client,Action,?DEFAULT_TIMEOUT).

-spec action(Client, Action, Timeout) -> Result when
      Client :: client(),
      Action :: simple_xml(),
      Timeout :: timeout(),
      Result :: ok | {ok,[simple_xml()]} | {error,error_reason()}.
action(Client,Action,Timeout) ->
    call(Client,{send_rpc_op, action, [Action], Timeout}).

%%----------------------------------------------------------------------
%% Send a 'create-subscription' request
%% See RFC5277, NETCONF Event Notifications

%% create_subscription/2

-spec create_subscription(Client, Values) -> Result when
      Client :: client(),
      Values :: #{stream => Stream,
                  filter => Filter,
                  start => StartTime,
                  stop => StopTime},
      Stream :: stream_name(),
      Filter :: simple_xml() | [simple_xml()],
      StartTime :: xs_datetime(),
      StopTime :: xs_datetime(),
      Result :: ok | {error,error_reason()};
      %% historic, no longer documented
                         (Client, list() | timeout()) -> Result when
      Client :: client(),
      Result :: ok | {error,error_reason()}.

create_subscription(Client, #{} = Values) ->
    create_subscription(Client, Values, ?DEFAULT_TIMEOUT);

%% historic clauses
create_subscription(Client, Timeout)
  when ?is_timeout(Timeout) ->
    create_subscription(Client, #{}, Timeout);
create_subscription(Client, Stream)
  when ?is_string(Stream) ->
    create_subscription(Client, #{stream => Stream});
create_subscription(Client, Filter)
  when ?is_filter(Filter) ->
    create_subscription(Client, #{filter => Filter}).

-spec create_subscription(Client, Values, Timeout) -> Result when
      Client :: client(),
      Values :: #{stream => Stream,
                  filter => Filter,
                  start => StartTime,
                  stop => StopTime},
      Stream :: stream_name(),
      Filter :: simple_xml() | [simple_xml()],
      StartTime :: xs_datetime(),
      StopTime :: xs_datetime(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()};
      %% historic, no longer documented
                         (Client, list(), list() | timeout()) -> Result when
      Client :: client(),
      Result :: ok | {error,error_reason()}.

create_subscription(Client, #{} = Values, Timeout) ->
    Keys = [{stream, ?DEFAULT_STREAM},
            {filter, undefined},
            {start, undefined},
            {stop, undefined}],
    call(Client, {send_rpc_op, {create_subscription, self()},
                               [maps:get(K, Values, D) || {K,D} <- Keys],
                               Timeout});

%% historic clauses, arity 3
create_subscription(Client, Stream, Timeout)
  when ?is_string(Stream), ?is_timeout(Timeout) ->
    create_subscription(Client, #{stream => Stream}, Timeout);
create_subscription(Client, StartTime, StopTime)
  when ?is_string(StartTime), ?is_string(StopTime) ->
    create_subscription(Client, #{start => StartTime, stop => StopTime});
create_subscription(Client, Filter, Timeout)
  when ?is_filter(Filter), ?is_timeout(Timeout) ->
    create_subscription(Client, #{filter => Filter}, Timeout);
create_subscription(Client, Stream, Filter)
  when ?is_string(Stream), ?is_filter(Filter) ->
    create_subscription(Client, #{stream => Stream, filter => Filter}).

%% historic clauses, arity 1,4-5
create_subscription(Client) ->
    create_subscription(Client, #{}).
create_subscription(Client, StartTime, StopTime, Timeout)
  when ?is_string(StartTime), ?is_string(StopTime), ?is_timeout(Timeout) ->
    Values = #{start => StartTime,
               stop => StopTime},
    create_subscription(Client, Values, Timeout);
create_subscription(Client, Stream, StartTime, StopTime)
  when ?is_string(Stream), ?is_string(StartTime), ?is_string(StopTime) ->
    create_subscription(Client, #{stream => Stream,
                                  start => StartTime,
                                  stop => StopTime});
create_subscription(Client, Filter, StartTime, StopTime)
  when ?is_filter(Filter), ?is_string(StartTime), ?is_string(StopTime) ->
    create_subscription(Client, #{filter => Filter,
                                  start => StartTime,
                                  stop => StopTime});
create_subscription(Client, Stream, Filter, Timeout)
  when ?is_string(Stream), ?is_filter(Filter), ?is_timeout(Timeout) ->
    Values = #{stream => Stream,
               filter => Filter},
    create_subscription(Client, Values, Timeout).
create_subscription(Client, Stream, StartTime, StopTime, Timeout)
  when ?is_string(Stream), ?is_string(StartTime), ?is_string(StopTime),
       ?is_timeout(Timeout) ->
    Values = #{stream => Stream,
               start => StartTime,
               stop => StopTime},
    create_subscription(Client, Values, Timeout);
create_subscription(Client, Stream, Filter, StartTime, StopTime)
  when ?is_string(Stream), ?is_filter(Filter), ?is_string(StartTime),
       ?is_string(StopTime) ->
    create_subscription(Client, #{stream => Stream,
                                  filter => Filter,
                                  start => StartTime,
                                  stop => StopTime}).
create_subscription(Client, Stream, Filter, StartTime, StopTime, Timeout) ->
    Values = #{stream => Stream,
               filter => Filter,
               start => StartTime,
               stop => StopTime},
    create_subscription(Client, Values, Timeout).

%%----------------------------------------------------------------------
%% Send a request to get the given event streams
%% See RFC5277, NETCONF Event Notifications
-spec get_event_streams(Client)
		       -> Result when
      Client :: client(),
      Result :: {ok,streams()} | {error,error_reason()}.
get_event_streams(Client) ->
    get_event_streams(Client,[],?DEFAULT_TIMEOUT).

-spec get_event_streams(Client, Timeout)
		       -> Result when
      Client :: client(),
      Timeout :: timeout(),
      Result :: {ok,streams()} | {error,error_reason()};
                       (Client, Streams) -> Result when
      Client :: client(),
      Streams :: [stream_name()],
      Result :: {ok,streams()} | {error,error_reason()}.
get_event_streams(Client,Timeout) when is_integer(Timeout); Timeout==infinity ->
    get_event_streams(Client,[],Timeout);
get_event_streams(Client,Streams) when is_list(Streams) ->
    get_event_streams(Client,Streams,?DEFAULT_TIMEOUT).

-spec get_event_streams(Client, Streams, Timeout)
		       -> Result when
      Client :: client(),
      Streams :: [stream_name()],
      Timeout :: timeout(),
      Result :: {ok,streams()} | {error,error_reason()}.
get_event_streams(Client,Streams,Timeout) ->
    call(Client,{get_event_streams,Streams,Timeout}).


%%----------------------------------------------------------------------
%% Send a 'close-session' request
-spec close_session(Client) -> Result when
      Client :: client(),
      Result :: ok | {error,error_reason()}.
close_session(Client) ->
    close_session(Client, ?DEFAULT_TIMEOUT).

-spec close_session(Client, Timeout) -> Result when
      Client :: client(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
close_session(Client, Timeout) ->
    call(Client,{send_rpc_op, close_session, [], Timeout}, true).


%%----------------------------------------------------------------------
%% Send a 'kill-session' request
-spec kill_session(Client, SessionId) -> Result when
      Client :: client(),
      SessionId :: pos_integer(),
      Result :: ok | {error,error_reason()}.
kill_session(Client, SessionId) ->
    kill_session(Client, SessionId, ?DEFAULT_TIMEOUT).

-spec kill_session(Client, SessionId, Timeout) -> Result when
      Client :: client(),
      SessionId :: pos_integer(),
      Timeout :: timeout(),
      Result :: ok | {error,error_reason()}.
kill_session(Client, SessionId, Timeout) ->
    call(Client,{send_rpc_op, kill_session, [SessionId], Timeout}).


%%----------------------------------------------------------------------
%% Callback functions
%%----------------------------------------------------------------------

%% init/3

init(_KeyOrName,{CM,{Host,Port}},Options) ->
    case ssh_channel(#connection{reference=CM,host=Host,port=Port},Options) of
        {ok,Connection} ->
	    {ok, CM, #state{connection = Connection,
                            receivers = Options#options.receivers}};
	{error,Reason}->
	    {error,Reason}
    end;
init(_KeyOrName,{_Host,_Port},Options) when Options#options.type==connection ->
    case ssh_connect(Options) of
        {ok, Connection} ->
	    ConnPid = Connection#connection.reference,
            {ok, ConnPid, #state{connection = Connection}};
        Error ->
            Error
    end;
init(_KeyOrName,{_Host,_Port},Options) ->
    case ssh_open(Options) of
	{ok, Connection} ->
	    {ConnPid,_} = Connection#connection.reference,
	    {ok, ConnPid, #state{connection = Connection,
                                 receivers = Options#options.receivers}};
	{error,Reason}->
	    {error,Reason}
    end.

%% terminate/2

terminate(_, #state{connection=Connection}) ->
    ssh_close(Connection),
    ok.

%% handle_msg/3

%% Send hello and return to the caller only after reception of the
%% server's hello.
handle_msg({hello, Options, Timeout},
           From,
	   #state{connection = Connection,
                  hello_status = HelloStatus}
           = State) ->
    case do_send(Connection, client_hello(Options)) of
	ok when HelloStatus == undefined -> %% server hello not yet received
            TRef = set_request_timer(Timeout, hello),
            {noreply, State#state{hello_status = #pending{tref = TRef,
                                                          caller = From}}};
        ok ->                               %% or yes: negotiate version
            handle_capx(State);
        Error ->
	    {stop, Error, State}
    end;

handle_msg(get_ssh_connection, _From, #state{connection=Connection}=State) ->
    Reply =
        case Connection#connection.reference of
            {_,_} -> {error,not_an_ssh_connection};
            CM -> {ok,{CM,{Connection#connection.host,
                           Connection#connection.port}}}
        end,
    {reply, Reply, State};

%% Request before server hello. Possible with only_open, since a
%% handle is then returned without waiting for the server.
handle_msg(_, _From, #state{session_id = undefined} = State) ->
    {reply, {error, waiting_for_hello}, State};

handle_msg(get_capabilities, _From, #state{capabilities = Caps} = State) ->
    {reply, Caps, State};

handle_msg(get_session_id, _From, #state{session_id = Id} = State) ->
    {reply, Id, State};

handle_msg({send, Timeout, SimpleXml},
           From,
           #state{connection = Connection,
                  pending = Pending}
           = State) ->
    case do_send(Connection, SimpleXml) of
        ok ->
            TRef = set_request_timer(Timeout, send),
            {noreply, State#state{pending = [#pending{tref = TRef,
                                                      caller = From}
                                             | Pending]}};
        Error ->
            {reply, Error, State}
    end;

handle_msg({send_rpc, SimpleXml, Timeout}, From, State) ->
    do_send_rpc(undefined, SimpleXml, Timeout, From, State);

handle_msg({send_rpc_op, Op, Data, Timeout}, From, State) ->
    SimpleXml = encode_rpc_operation(Op,Data),
    do_send_rpc(Op, SimpleXml, Timeout, From, State);

handle_msg({get_event_streams=Op,Streams,Timeout}, From, State) ->
    Filter = {netconf,?NETMOD_NOTIF_NAMESPACE_ATTR,
             [{streams,[{stream,[{name,[Name]}]} || Name <- Streams]}]},
    SimpleXml = encode_rpc_operation(get,[Filter]),
    do_send_rpc(Op, SimpleXml, Timeout, From, State).

handle_msg({ssh_cm, CM, {data, Ch, _Type, Data}}, State) ->
    ssh_connection:adjust_window(CM,Ch,size(Data)),
    log(State#state.connection, recv, Data),
    handle_data(Data, State);

handle_msg({ssh_cm, _CM, _SshCloseMsg}, State) ->
    %% _SshCloseMsg can probably be one of
    %% {eof,Ch}
    %% {exit_status,Ch,Status}
    %% {exit_signal,Ch,ExitSignal,ErrorMsg,LanguageString}
    %% {signal,Ch,Signal}

    %% This might e.g. happen if the server terminates the connection,
    %% as in kill-session (or if ssh:close is called from somewhere
    %% unexpected).

    %%! Log this??
    %%! Currently the log will say that the client closed the
    %%! connection - due to terminate/2

    {stop, State};

handle_msg({timeout, TRef, hello},
           #state{hello_status = #pending{tref = TRef,
                                          caller = From}}
           = State) ->
    ct_gen_conn:return(From, {error, {hello_session_failed, timeout}}),
    {stop, State#state{hello_status = {error,timeout}}};

handle_msg({timeout, TRef, Op}, #state{pending = Pending} = State) ->
    case lists:keytake(TRef, #pending.tref, Pending) of
        {value, #pending{caller = From}, Rest} ->
            ct_gen_conn:return(From, {error, timeout}),
            %% Discard received bytes in hope that the server has sent
            %% an incomplete message. Otherwise this is doomed to
            %% leave the connection in an unusable state.
            {if Op == close_session -> stop; true -> noreply end,
             State#state{pending = Rest,
                         buf = is_binary(State#state.buf)}};
        false ->
            {noreply, State}
    end.

%% close/1

%% Called by ct_util_server to close registered connections before terminate.
close(Client) ->
    case get_handle(Client) of
	{ok,Pid} ->
	    case ct_gen_conn:stop(Pid) of
		{error,{process_down,Pid,noproc}} ->
		    {error,already_closed};
		Result ->
		    Result
	    end;
	Error ->
	    Error
    end.


%%----------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------
call(Client, Msg) ->
    call(Client, Msg, infinity, false).
call(Client, Msg, Timeout) when is_integer(Timeout); Timeout==infinity ->
    call(Client, Msg, Timeout, false);
call(Client, Msg, WaitStop) when is_boolean(WaitStop) ->
    call(Client, Msg, infinity, WaitStop).
call(Client, Msg, Timeout, WaitStop) ->
    case get_handle(Client) of
	{ok,Pid} ->
	    case ct_gen_conn:call(Pid,Msg,Timeout) of
		{error,{process_down,Pid,noproc}} ->
		    {error,no_such_client};
		{error,{process_down,Pid,normal}} when WaitStop ->
		    %% This will happen when server closes connection
		    %% before client received rpc-reply on
		    %% close-session.
		    ok;
		{error,{process_down,Pid,normal}} ->
		    {error,closed};
		{error,{process_down,Pid,Reason}} ->
		    {error,{closed,Reason}};
		Other when WaitStop ->
		    MRef = erlang:monitor(process,Pid),
		    receive
			{'DOWN',MRef,process,Pid,Normal} when Normal==normal;
							      Normal==noproc ->
			    Other;
			{'DOWN',MRef,process,Pid,Reason} ->
			    {error,{{closed,Reason},Other}}
		    after Timeout ->
			    erlang:demonitor(MRef, [flush]),
			    {error,{timeout,Other}}
		    end;
		Other ->
		    Other
	    end;
	Error ->
	    Error
    end.

get_handle(Client) when is_pid(Client) ->
    {ok,Client};
get_handle(Client) ->
    case ct_util:get_connection(Client, ?MODULE) of
	{ok,{Pid,_}} ->
	    {ok,Pid};
	{error,no_registered_connection} ->
	    {error,{no_connection_found,Client}};
	Error ->
	    Error
    end.

%% make_options/2

make_options(Opts, Rec) ->
    make_options(Opts, Rec#options{port = undefined}, fun opt/2).

opt({T, Host}, Rec)
  when T == ssh;
       T == host ->
    Rec#options{host = Host};

opt({port, Port}, Rec) ->
    Rec#options{port = Port};

opt({timeout, Tmo}, Rec)
  when is_integer(Tmo);
       Tmo == infinity ->
    Rec#options{timeout = Tmo};

opt({timeout, _} = T, _) ->
    throw(T);

opt({capability, _}, Rec) ->
    Rec;

opt({receiver, Dest}, #options{receivers = T} = Rec) ->
    Rec#options{receivers = [Dest | T]};

opt(Opt, #options{ssh = Opts} = Rec) -> %% option verified by ssh
    Rec#options{ssh = [Opt | Opts]}.

%% make_session_options/2

make_session_options(Opts, Rec) ->
    make_options(Opts, Rec, fun session_opt/2).

session_opt({receiver, Dest}, #options{receivers = T} = Rec) ->
    Rec#options{receivers = [Dest | T]};

session_opt({capability, _}, Rec) ->
    Rec;

session_opt({timeout, Tmo}, Rec)
  when is_integer(Tmo);
       Tmo == infinity ->
    Rec#options{timeout = Tmo};

session_opt(T, _Rec) ->
    throw(T).

%% make_options/3

make_options(Opts, Rec, F) ->
    try
        #options{} = lists:foldl(F, Rec, Opts)
    catch
        T ->
            {error, {invalid_option, T}}
    end.

%%%-----------------------------------------------------------------

set_request_timer(infinity, _) ->
    false;

set_request_timer(Tmo, Op) ->
    erlang:start_timer(Tmo, self(), Op).

%%%-----------------------------------------------------------------

cancel_request_timer(false) ->
    ok;

cancel_request_timer(TRef) ->
    erlang:cancel_timer(TRef).

%%%-----------------------------------------------------------------

%% client_hello/1
%%
%% Prepend the 1.0 base capability only if none is specified by the
%% user. Store the versions in the process dictionary until they're
%% examined upon reception of server capabilities in handle_capx/1.

client_hello(Opts)
  when is_list(Opts) ->
    UserCaps = [{T, cap(lists:flatten(Cs))} || {capability = T, Cs} <- Opts],
    Vsns = versions(UserCaps),
    put(?KEY(protocol_vsn), Vsns),
    {hello,
     ?NETCONF_NAMESPACE_ATTR,
     [{capabilities, [{capability, [?NETCONF_BASE_CAP, ?NETCONF_BASE_CAP_VSN]}
                      || [] == Vsns]
                     ++ UserCaps}]}.

%% cap/1
%%
%% Let NETCONF capabilities be specified in the shorthand documented in
%% RFC 6241.

%% This shorthand is documented in RFC 6241 10.4 NETCONF Capabilities
%% URNS, but not in 8 Capabilities.
cap(":base:" ++ _ = Str) ->
    ["urn:ietf:params:netconf", Str];

cap([$:|_] = Str) ->
    ["urn:ietf:params:netconf:capability", Str];

cap(Str) ->
    [Str].

%% versions/1
%%
%% Extract base protocol versions from capability options.

versions(Opts) ->
    [V || {capability, L} <- Opts,
          S <- L,
          ?NETCONF_BASE_CAP ++ X <- [lists:flatten(S)],
          V <- [lists:takewhile(fun(C) -> C /= $? end, X)]].

%% handle_capx/1
%%
%% Ignore parameters as RFC 6241 (NETCONF 1.1) requires in 8.1
%% Capabilities Exchange. Be overly lenient with whitespace since RFC
%% 6241 gives examples with significant trailing whitespace.

handle_capx(#state{hello_status = received, capabilities = Caps} = S) ->
    Remote = [V || ?NETCONF_BASE_CAP ++ X <- Caps,
                   [V|_] <- [string:lexemes(X, "? \t\r\n")]],
    Local = erase(?KEY(protocol_vsn)),
    case protocol_vsn(Local, Remote) of
        false when Remote == [] ->
            Reason = {incorrect_hello, no_base_capability_found},
            {stop, {error, Reason}, S};
        false ->
            Reason = {incompatible_base_capability_vsn, lists:min(Remote)},
            {stop, {error, Reason}, S};
        Vsn ->
            put(?KEY(chunk), Vsn /= "1.0"),
            {reply, ok, rebuf(Vsn, S#state{hello_status = Vsn})}
    end;

handle_capx(#state{hello_status = {error, _} = No} = S) ->
    {stop, No, S}.

%% rebuf/2
%%
%% Turn the message buffer into a list for 1.1 chunking if the
%% negotiated protocol version is > 1.0.

rebuf("1.0", S) ->
    S;

rebuf(_, #state{buf = Bin} = S) ->
    S#state{buf = [Bin, 3]}.

%% protocol_vsn/2

protocol_vsn([], Vsns) ->
    protocol_vsn(["1.0"], Vsns);

protocol_vsn(Local, Remote) ->
    lists:max([false | [V || V <- Remote, lists:member(V, Local)]]).

%%%-----------------------------------------------------------------

encode_rpc_operation(Lock,[Target]) when Lock==lock; Lock==unlock ->
    {Lock,[{target,[Target]}]};
encode_rpc_operation(get,[Filter]) ->
    {get,filter(Filter)};
encode_rpc_operation(get_config,[Source,Filter]) ->
    {'get-config',[{source,[Source]}] ++ filter(Filter)};
encode_rpc_operation(edit_config,[Target,Config,OptParams]) ->
    {'edit-config',[{target,[Target]}] ++ OptParams ++ [{config,Config}]};
encode_rpc_operation(delete_config,[Target]) ->
    {'delete-config',[{target,[Target]}]};
encode_rpc_operation(copy_config,[Target,Source]) ->
    {'copy-config',[{target,[Target]},{source,[Source]}]};
encode_rpc_operation(action,[Action]) ->
    {action,?ACTION_NAMESPACE_ATTR,[{data,[Action]}]};
encode_rpc_operation(kill_session,[SessionId]) ->
    {'kill-session',[{'session-id',[integer_to_list(SessionId)]}]};
encode_rpc_operation(close_session,[]) ->
    'close-session';
encode_rpc_operation({create_subscription,_},
		     [Stream,Filter,StartTime,StopTime]) ->
    {'create-subscription',?NETCONF_NOTIF_NAMESPACE_ATTR,
     [{stream,[Stream]}] ++
	 filter(Filter) ++
	 maybe_element(startTime,StartTime) ++
	 maybe_element(stopTime,StopTime)}.

filter(undefined) ->
    [];
filter({xpath,Filter}) when ?is_string(Filter) ->
    [{filter,[{type,"xpath"},{select, Filter}],[]}];
filter(Filter) when is_list(Filter) ->
    [{filter,[{type,"subtree"}],Filter}];
filter(Filter) ->
    filter([Filter]).

maybe_element(_,undefined) ->
    [];
maybe_element(Tag,Value) ->
    [{Tag,[Value]}].

%%%-----------------------------------------------------------------
%%% Send XML data to server
do_send_rpc(Op, SimpleXml, Timeout, Caller, #state{connection = Connection,
                                                   msg_id = MsgId,
                                                   pending = Pending}
                                            = State) ->
    Msg = {rpc,
           [{'message-id', MsgId} | ?NETCONF_NAMESPACE_ATTR],
           [SimpleXml]},
    Next = MsgId + 1,
    case do_send(Connection, Msg) of
        ok ->
            TRef = set_request_timer(Timeout, Op),
            Rec = #pending{tref = TRef,
                           msg_id = MsgId,
                           op = Op,
                           caller = Caller},
            {noreply, State#state{msg_id = Next,
                                  pending = [Rec | Pending]}};
        Error ->
            {reply, Error, State#state{msg_id = Next}}
    end.

do_send(Connection, Simple) ->
    ssh_send(Connection, frame(to_xml(Simple))).

to_xml(Simple) ->
    Prolog = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
    Chars = xmerl:export_simple([Simple],
                                xmerl_xml,
                                [#xmlAttribute{name = prolog,
                                               value = Prolog}]),
    unicode:characters_to_binary(Chars).

%% frame/1

frame(Bin) ->
    case get(?KEY(chunk)) of
        true ->  %% 1.1 chunking
            [chunk(Bin) | "\n##\n"];
        _ ->     %% 1.0 framing
            [Bin | ?END_TAG]
    end.

%% chunk/1
%%
%% Chunk randomly to exercise the server.

chunk(<<>>) ->
    [];

chunk(Bin) ->
    Sz = min(rand:uniform(1024), size(Bin)),
    <<B:Sz/binary, Rest/binary>> = Bin,
    ["\n#", integer_to_list(Sz), $\n, B | chunk(Rest)].

%%%-----------------------------------------------------------------
%%% Parse and handle received XML data

handle_data(Bin, #state{buf = Head} = S) ->
    case recv(Bin, Head) of
        {error, Reason} ->
            Conn = S#state.connection,
            ?error(Conn#connection.name, [{receive_error, Reason},
                                          {buffer, Head},
                                          {bytes, Bin}]),
            {stop, S};
        {Bytes, Rest} ->
            handle_more(Rest, handle_xml(Bytes, S));
        Buf ->
            {noreply, S#state{buf = Buf}}
    end.

%% handle_more/2

handle_more(_, {stop, _} = No) ->
    No;

handle_more(Bin, {noreply, State}) ->
    handle_data(Bin, State#state{buf = true == get(?KEY(chunk))}).

%% handle_xml/2

handle_xml(Bytes, State) ->
    case parse(Bytes) of
        {ok, Simple, _Rest} ->  %% ignore trailing bytes
            decode(Simple, State);
        {fatal_error,_Loc,Reason,_EndTags,_EventState} ->
            Conn = State#state.connection,
            ?error(Conn#connection.name, [{parse_error, Reason},
                                          {message, Bytes}]),
            {noreply, handle_error(Reason, State)}
    end.

%% parse/1

parse(Bytes) ->
    xmerl_sax_parser:stream(<<>>, [{event_fun, fun sax_event/3},
                                   {event_state, []},
                                   {continuation_fun, fun cont/1},
                                   {continuation_state, Bytes}]).

%% cont/1

cont([] = No) ->
    {<<>>, No};

cont([Bin | Rest]) ->
    {Bin, Rest};

cont(Bin) ->
    {Bin, <<>>}.

%% handle_error/2

handle_error(_Reason, #state{pending = []} = State) ->
    State;

handle_error(Reason, #state{pending = Pending} = State) ->
    %% Assuming the first request gets the first answer.
    Rec = #pending{tref = TRef,
                   caller = Caller}
        = lists:last(Pending),
    cancel_request_timer(TRef),
    ct_gen_conn:return(Caller,{error, {failed_to_parse_received_data, Reason}}),
    State#state{pending = lists:delete(Rec, Pending)}.

%% Event function for the sax parser. It builds a simple XML structure.
%% Care is taken to keep namespace attributes and prefixes as in the original XML.
sax_event(Event,_Loc,State) ->
    sax_event(Event,State).

sax_event({startPrefixMapping, Prefix, Uri},Acc) ->
    %% startPrefixMapping will always come immediately before the
    %% startElement where the namespace is defined.
    [{xmlns,{Prefix,Uri}}|Acc];
sax_event({startElement,_Uri,_Name,QN,Attrs},Acc) ->
    %% Pick out any namespace attributes inserted due to a
    %% startPrefixMapping event.The rest of Acc will then be only
    %% elements.
    {NsAttrs,NewAcc} = split_attrs_and_elements(Acc,[]),
    Tag = qn_to_tag(QN),
    [{Tag,NsAttrs ++ parse_attrs(Attrs),[]}|NewAcc];
sax_event({endElement,_Uri,_Name,_QN},[{Name,Attrs,Cont},{Parent,PA,PC}|Acc]) ->
    [{Parent,PA,[{Name,Attrs,lists:reverse(Cont)}|PC]}|Acc];
sax_event(endDocument,[{Tag,Attrs,Cont}]) ->
    {Tag,Attrs,lists:reverse(Cont)};
sax_event({characters,String},[{Name,Attrs,Cont}|Acc]) ->
    [{Name,Attrs,[String|Cont]}|Acc];
sax_event(_Event,State) ->
    State.

split_attrs_and_elements([{xmlns,{Prefix,Uri}}|Rest],Attrs) ->
    split_attrs_and_elements(Rest,[{xmlnstag(Prefix),Uri}|Attrs]);
split_attrs_and_elements(Elements,Attrs) ->
    {Attrs,Elements}.

xmlnstag([]) ->
    xmlns;
xmlnstag(Prefix) ->
    list_to_atom("xmlns:"++Prefix).

qn_to_tag({[],Name}) ->
    list_to_atom(Name);
qn_to_tag({Prefix,Name}) ->
    list_to_atom(Prefix ++ ":" ++ Name).

parse_attrs([{_Uri, [], Name, Value}|Attrs]) ->
    [{list_to_atom(Name),Value}|parse_attrs(Attrs)];
parse_attrs([{_Uri, Prefix, Name, Value}|Attrs]) ->
    [{list_to_atom(Prefix ++ ":" ++ Name),Value}|parse_attrs(Attrs)];
parse_attrs([]) ->
    [].


%%%-----------------------------------------------------------------

%% decode/2
%%
%% Decode parsed (incoming) XML.

decode({Tag, _, _} = E, #state{} = State) ->
    case decode(get_local_name_atom(Tag), E, State) of
        #state{} = S ->
            {noreply, S};
        {stop, #state{}} = T ->
            T
    end.

%% decode/3

decode('rpc-reply', {_, Attrs, _} = E, State) ->
    decode_rpc_reply(get_msg_id(Attrs), E, State);

%% Incoming hello, outgoing not yet sent.
decode(hello, E, #state{hello_status = undefined} = State) ->
    case decode_hello(E) of
        {ok, SessionId, Capabilities} ->
            State#state{session_id = SessionId,
                        capabilities = Capabilities,
                        hello_status = received};
        {error, _Reason} = No ->
            State#state{hello_status = No}
    end;

%% Incoming hello, outgoing already sent: negotiate protocol version.
decode(hello, E, #state{hello_status = #pending{tref = TRef,
                                                caller = From}}
                 = State) ->
    cancel_request_timer(TRef),
    case decode_hello(E) of
        {ok, SessionId, Capabilities} ->
            reply(From, handle_capx(State#state{session_id = SessionId,
                                                capabilities = Capabilities,
                                                hello_status = received}));
        {error, _Reason} = No ->
            ct_gen_conn:return(From, No),
            {stop, State#state{hello_status = No}}
    end;

%% Duplicate hello: ignore.
decode(hello, E, #state{hello_status = Other} = State) ->
    ConnName = (State#state.connection)#connection.name,
    ?error(ConnName, [{got_unexpected_hello, E},
                      {hello_status, Other}]),
    State;

decode(notification, E, State) ->
    notify(State, E),
    State;

decode(Other, E, State) ->
    decode_send({got_unexpected_msg, Other}, E, State).

%% notify/2

notify(#state{receivers = []} = State, E) ->
    Name = (State#state.connection)#connection.name,
    ?error(Name, [{got_unexpected_notification, E}]);

%% Sending can fail with an atom-valued destination, but it's up to
%% the user.
notify(#state{receivers = T}, E) ->
    lists:foreach(fun(D) -> D ! E end, if is_pid(T) -> [T]; true -> T end).

%% reply/2
%%
%% Explicitly send a reply that can't be returned.

reply(From, {T, Res, State}) ->
    ct_gen_conn:return(From, Res),
    case T of
        reply ->
            State;
        stop ->
            {T, State}
    end.

%% get_msg_id/1

get_msg_id(Attrs) ->
    case find('message-id', Attrs) of
        {_,Str} ->
            list_to_integer(Str);
        false ->
            undefined
    end.

%% recode_rpc_reply/3

decode_rpc_reply(undefined, E, #state{pending = [#pending{msg_id = MsgId}]}
                               = State)
  when MsgId /= undefined ->
    ConnName = (State#state.connection)#connection.name,
    ?error(ConnName, [{warning, rpc_reply_missing_msg_id},
                      {assuming, MsgId}]),
    decode_rpc_reply(MsgId, E, State);

decode_rpc_reply(undefined, _, State) ->
    ConnName = (State#state.connection)#connection.name,
    ?error(ConnName, [{error, rpc_reply_missing_msg_id}]),
    State;

decode_rpc_reply(MsgId,
                 {_, Attrs, Content0}
                 = E,
                 #state{pending = Pending}
                 = State) ->
    case lists:keytake(MsgId, #pending.msg_id, Pending) of
        {value, Rec, Rest} ->
            #pending{tref = TRef, op = Op, caller = From}
                = Rec,
            cancel_request_timer(TRef),
            Content = forward_xmlns_attr(Attrs, Content0),
            {Reply, T} = do_decode_rpc_reply(Op,
                                             Content,
                                             State#state{pending = Rest}),
            ct_gen_conn:return(From, Reply),
            T;
        false ->  %% not a send_rcp or server has sent wrong id
            decode_send({got_unexpected_msg_id, MsgId}, E, State)
    end.

%% decode_send/2
%%
%% Result of send/2,3. Only handle one at a time there since all
%% pendings have msg_id = undefined.

decode_send(ErrorT, Elem, #state{pending = Pending} = State) ->
    case [P || #pending{msg_id = undefined} = P <- Pending] of
        [Rec] ->
            #pending{tref = TRef,
                     caller = From}
                = Rec,
            cancel_request_timer(TRef),
            ct_gen_conn:return(From, Elem),
            State#state{pending = lists:delete(Rec, Pending)};
        _ ->
            Conn = State#state.connection,
            ?error(Conn#connection.name, [ErrorT, {expecting, Pending}]),
            State
    end.

%% do_decode_rpc_reply/3

do_decode_rpc_reply(Op, Result, State)
  when Op == lock;
       Op == unlock;
       Op == edit_config;
       Op == delete_config;
       Op == copy_config;
       Op == kill_session ->
    {decode_ok(Result), State};

do_decode_rpc_reply(Op, Result, State)
  when Op == get;
       Op == get_config;
       Op == action ->
    {decode_data(Result), State};

do_decode_rpc_reply(close_session, Result, State) ->
    case decode_ok(Result) of
        ok ->
            {ok, {stop, State}};
        Other ->
            {Other, State}
    end;

%% Only set a new destination if one (or more) hasn't been set with a
%% receiver option(), to allow more than calls to create_subscription
%% to order notifications.
do_decode_rpc_reply({create_subscription, Pid}, Result, #state{receivers = T}
                                                        = State) ->
    case decode_ok(Result) of
        ok when T == [];
                is_pid(T) ->
            {ok, State#state{receivers = Pid}};
        Other ->
            {Other, State}
    end;

do_decode_rpc_reply(get_event_streams, Result, State) ->
    {decode_streams(decode_data(Result)), State};

do_decode_rpc_reply(undefined, Result, State) ->
    {Result, State}.



decode_ok([{Tag,Attrs,Content}]) ->
    case get_local_name_atom(Tag) of
	ok ->
	    ok;
	'rpc-error' ->
	    {error,forward_xmlns_attr(Attrs,Content)};
	_Other ->
	    {error,{unexpected_rpc_reply,[{Tag,Attrs,Content}]}}
    end;
decode_ok(Other) ->
    {error,{unexpected_rpc_reply,Other}}.

decode_data([{Tag,Attrs,Content}]) ->
    case get_local_name_atom(Tag) of
	ok ->
	    %% when action has return type void
	    ok;
	data ->
	    %% Since content of data has nothing from the netconf
	    %% namespace, we remove the parent's xmlns attribute here
	    %% - just to make the result cleaner
	    {ok,forward_xmlns_attr(remove_xmlnsattr_for_tag(Tag,Attrs),Content)};
	'rpc-error' ->
	    {error,forward_xmlns_attr(Attrs,Content)};
	_Other ->
	    {error,{unexpected_rpc_reply,[{Tag,Attrs,Content}]}}
    end;
decode_data(Other) ->
    {error,{unexpected_rpc_reply,Other}}.

get_qualified_name(Tag) ->
    case string:lexemes(atom_to_list(Tag),":") of
	[TagStr] -> {[],TagStr};
	[PrefixStr,TagStr] -> {PrefixStr,TagStr}
    end.

get_local_name_atom(Tag) ->
    {_,TagStr} = get_qualified_name(Tag),
    list_to_atom(TagStr).


%% Remove the xmlns attr that points to the tag. I.e. if the tag has a
%% prefix, remove {'xmlns:prefix',_}, else remove default {xmlns,_}.
remove_xmlnsattr_for_tag(Tag,Attrs) ->
    {Prefix,_TagStr} = get_qualified_name(Tag),
    lists:keydelete(xmlnstag(Prefix), 1, Attrs).

%% Prepend xmlns attributes from parent to children, omitting those
%% the child sets.
forward_xmlns_attr(ParentAttrs, Children) ->
    Namespace = lists:filter(fun is_xmlns/1, ParentAttrs),
    [{T, Ns ++ A, C} || {T, A, C} <- Children,
                        F <- [fun({K,_}) -> not lists:keymember(K, 1, A) end],
                        Ns <- [lists:filter(F, Namespace)]].

is_xmlns({Key, _}) ->
    Key == xmlns orelse lists:prefix("xmlns:", atom_to_list(Key)).

%% Decode server hello to pick out session id and capabilities
decode_hello({hello, _Attrs, Hello}) ->
    U = make_ref(),
    try
        [{'session-id', _, [SessionId]}, _ | _]
            = [find('session-id', Hello), no_session_id_found | U],
        [{ok, Id}, _ | _]
            = [catch {ok, list_to_integer(SessionId)}, invalid_session_id | U],
        [true, _ | _]
            = [0 < Id, invalid_session_id | U],
        [{capabilities, _, Capabilities}, _ | _]
            = [find(capabilities, Hello), capabilities_not_found | U],
        [{ok, Caps}, _ | _]
            = [decode_caps(Capabilities, [], false), false | U],
        {ok, Id, Caps}
    catch
        error: {badmatch, [Error, false | U]} ->
            Error;
        error: {badmatch, [_, Reason | U]} ->
            {error, {incorrect_hello, Reason}}
    end.

find(Key, List) ->
    lists:keyfind(Key, 1, List).

decode_caps([{capability, [], [?NETCONF_BASE_CAP ++ _ = Cap]} | Caps],
            Acc,
            _) ->
    decode_caps(Caps, [Cap|Acc], true);
decode_caps([{capability, [], [Cap]} | Caps], Acc, Base) ->
    decode_caps(Caps, [Cap|Acc], Base);
decode_caps([H|_], _, _) ->
    {error, {unexpected_capability_element, H}};
decode_caps([], _, false) ->
    {error, {incorrect_hello, no_base_capability_found}};
decode_caps([], Acc, true) ->
    {ok, lists:reverse(Acc)}.


%% Return a list of {Name,Data}, where data is a {Tag,Value} list for each stream
decode_streams({error,Reason}) ->
    {error,Reason};
decode_streams({ok,[{netconf,_,Streams}]}) ->
    {ok,decode_streams(Streams)};
decode_streams([{streams,_,Streams}]) ->
    decode_streams(Streams);
decode_streams([{stream,_,Stream} | Streams]) ->
    {name,_,[Name]} = find(name, Stream),
    [{Name,[{Tag,Value} || {Tag,_,[Value]} <- Stream, Tag /= name]}
     | decode_streams(Streams)];
decode_streams([]) ->
    [].


%%%-----------------------------------------------------------------
%%% Logging

log(Connection,Action) ->
    log(Connection,Action,<<>>).
log(#connection{reference=Ref,host=Host,port=Port,name=Name},Action,Data) ->
    Address =
        case Ref of
            {_,Ch} -> {Host,Port,Ch};
            _ -> {Host,Port}
        end,
    error_logger:info_report(#conn_log{client=self(),
				       address=Address,
				       name=Name,
				       action=Action,
				       module=?MODULE},
			     Data).


%% Log callback - called from the error handler process
format_data(How,Data) ->
    %% Assuming that the data is encoded as UTF-8.  If it is not, then
    %% the printout might be wrong, but the format function will not
    %% crash!
    %% FIXME: should probably read encoding from the data and do
    %% unicode:characters_to_binary(Data,InEncoding,utf8) when calling
    %% log/3 instead of assuming utf8 in as done here!
    do_format_data(How,unicode:characters_to_binary(Data)).

do_format_data(raw,Data) ->
    io_lib:format("~n~ts~n",[hide_password(Data)]);
do_format_data(pretty,Data) ->
    maybe_io_lib_format(indent(Data));
do_format_data(html,Data) ->
    maybe_io_lib_format(html_format(Data)).

maybe_io_lib_format(<<>>) ->
    [];
maybe_io_lib_format(String) ->
    io_lib:format("~n~ts~n",[String]).

%%%-----------------------------------------------------------------
%%% Hide password elements from XML data
hide_password(Bin) ->
    re:replace(Bin,<<"(<password[^>]*>)[^<]*(</password>)">>,<<"\\1*****\\2">>,
	       [global,{return,binary},unicode]).

%%%-----------------------------------------------------------------
%%% HTML formatting
html_format(Bin) ->
    binary:replace(indent(Bin),<<"<">>,<<"&lt;">>,[global]).

%%%-----------------------------------------------------------------
%%% Indentation of XML code
indent(Bin) ->
    String = normalize(hide_password(Bin)),
    IndentedString =
	case erase(part_of_line) of
	    undefined ->
		indent1(String,[]);
	    Part ->
		indent1(lists:reverse(Part)++String,erase(indent))
	end,
    unicode:characters_to_binary(IndentedString).

%% Normalizes the XML document by removing all space and newline
%% between two XML tags.
%% Returns a list, no matter if the input was a list or a binary.
normalize(Bin) ->
    re:replace(Bin,<<">[ \r\n\t]+<">>,<<"><">>,[global,{return,list},unicode]).


indent1("<?"++Rest1,Indent1) ->
    %% Prolog
    {Line,Rest2,Indent2} = indent_line(Rest1,Indent1,[$?,$<]),
    Line++indent1(Rest2,Indent2);
indent1("</"++Rest1,Indent1) ->
    %% Stop tag
    case indent_line1(Rest1,Indent1,[$/,$<]) of
	{[],[],_} ->
	    [];
	{Line,Rest2,Indent2} ->
	    "\n"++Line++indent1(Rest2,Indent2)
    end;
indent1("<"++Rest1,Indent1) ->
    %% Start- or empty tag
    put(tag,get_tag(Rest1)),
    case indent_line(Rest1,Indent1,[$<]) of
	{[],[],_} ->
	    [];
	{Line,Rest2,Indent2} ->
	    "\n"++Line++indent1(Rest2,Indent2)
    end;
indent1([H|T],Indent) ->
    [H|indent1(T,Indent)];
indent1([],_Indent) ->
    [].

indent_line("?>"++Rest,Indent,Line) ->
    %% Prolog
    {lists:reverse(Line)++"?>",Rest,Indent};
indent_line("/></"++Rest,Indent,Line) ->
    %% Empty tag, and stop of parent tag -> one step out in indentation
    {Indent++lists:reverse(Line)++"/>","</"++Rest,Indent--"  "};
indent_line("/>"++Rest,Indent,Line) ->
    %% Empty tag, then probably next tag -> keep indentation
    {Indent++lists:reverse(Line)++"/>",Rest,Indent};
indent_line("></"++Rest,Indent,Line) ->
    LastTag = erase(tag),
    case get_tag(Rest) of
	LastTag ->
	    %% Start and stop tag, but no content
	    indent_line1(Rest,Indent,[$/,$<,$>|Line]);
	_ ->
	    %% Stop tag completed, and then stop tag of parent -> one step out
	    {Indent++lists:reverse(Line)++">","</"++Rest,Indent--"  "}
    end;
indent_line("><"++Rest,Indent,Line) ->
    %% Stop tag completed, and new tag coming -> keep indentation
    {Indent++lists:reverse(Line)++">","<"++Rest,"  "++Indent};
indent_line("</"++Rest,Indent,Line) ->
    %% Stop tag starting -> search for end of this tag
    indent_line1(Rest,Indent,[$/,$<|Line]);
indent_line([H|T],Indent,Line) ->
    indent_line(T,Indent,[H|Line]);
indent_line([],Indent,Line) ->
    %% The line is not complete - will be continued later
    put(part_of_line,Line),
    put(indent,Indent),
    {[],[],Indent}.

indent_line1("></"++Rest,Indent,Line) ->
    %% Stop tag completed, and then stop tag of parent -> one step out
    {Indent++lists:reverse(Line)++">","</"++Rest,Indent--"  "};
indent_line1(">"++Rest,Indent,Line) ->
    %% Stop tag completed -> keep indentation
    {Indent++lists:reverse(Line)++">",Rest,Indent};
indent_line1([H|T],Indent,Line) ->
    indent_line1(T,Indent,[H|Line]);
indent_line1([],Indent,Line) ->
    %% The line is not complete - will be continued later
    put(part_of_line,Line),
    put(indent,Indent),
    {[],[],Indent}.

get_tag("/>"++_) ->
    [];
get_tag(">"++_) ->
    [];
get_tag([H|T]) ->
    [H|get_tag(T)];
get_tag([]) ->
    %% The line is not complete - will be continued later.
    [].


%%%-----------------------------------------------------------------
%%% SSH stuff
ssh_connect(#options{host=Host,timeout=Timeout,port=Port,
                     ssh=SshOpts,name=Name,type=Type}) ->
    case ssh:connect(Host, Port,
		     [{user_interaction,false},
		      {silently_accept_hosts, true}|SshOpts],
                     Timeout) of
	{ok,CM} ->
            Connection = #connection{reference = CM,
                                     host = Host,
                                     port = Port,
                                     name = Name,
                                     type = Type},
            log(Connection,connect),
            {ok,Connection};
	{error,Reason} ->
	    {error,{ssh,could_not_connect_to_server,Reason}}
    end.

ssh_channel(#connection{reference=CM}=Connection0,
            #options{timeout=Timeout,name=Name,type=Type}) ->
    case ssh_connection:session_channel(CM, Timeout) of
        {ok,Ch} ->
            case ssh_connection:subsystem(CM, Ch, "netconf", Timeout) of
                success ->
                    Connection = Connection0#connection{reference = {CM,Ch},
                                                       name = Name,
                                                       type = Type},
                    log(Connection,open),
                    {ok, Connection};
                failure ->
                    ssh_connection:close(CM,Ch),
                    {error,{ssh,could_not_execute_netconf_subsystem}};
                {error,timeout} ->
                    ssh_connection:close(CM,Ch),
                    {error,{ssh,could_not_execute_netconf_subsystem,timeout}}
            end;
        {error, Reason} ->
            {error,{ssh,could_not_open_channel,Reason}}
    end.


ssh_open(Options) ->
    case ssh_connect(Options) of
        {ok,Connection} ->
            case ssh_channel(Connection,Options) of
                {ok,_} = Ok ->
                    Ok;
                Error ->
                    ssh_close(Connection),
                    Error
            end;
        Error ->
            Error
    end.

ssh_send(#connection{reference = {CM,Ch}}=Connection, Data) ->
    case ssh_connection:send(CM, Ch, Data) of
	ok ->
            log(Connection,send,Data),
            ok;
	{error,Reason} ->
            {error,{ssh,failed_to_send_data,Reason}}
    end.

ssh_close(Connection=#connection{reference = {CM,Ch}, type = Type}) ->
    _ = ssh_connection:close(CM,Ch),
    log(Connection,close),
    case Type of
        connection_and_channel ->
            ssh_close(Connection#connection{reference = CM});
        _ ->
            ok
    end,
    ok;
ssh_close(Connection=#connection{reference = CM}) ->
    _ = ssh:close(CM),
    log(Connection,disconnect),
    ok.

%% ===========================================================================

%% recv/1
%%
%% Extract incoming messages using either NETCONF 1.0 framing or
%% NETCONF 1.1 chunking.

recv(Bin, true) ->
    recv(Bin, [<<>>, 3]);
recv(Bin, false) ->
    recv(Bin, <<>>);

recv(Bin, [Head, Len | Chunks]) ->       %% 1.1 chunking
    chunk(<<Head/binary, Bin/binary>>, Chunks, Len);

%% Start looking for the terminating end-of-message sequence ]]>]]>
%% 5 characters from the end of the buffered head, since this binary
%% has already been scanned.
recv(Bin, Head) when is_binary(Head) ->  %% 1.0 framing
    frame(<<Head/binary, Bin/binary>>, max(0, size(Head) - 5)).

%% frame/2
%%
%% Extract a message terminated by the ]]>]]> end-of-message sequence.
%% Don't need to extract characters as UTF-8 since matching byte-wise
%% is unambiguous: the high-order bit of every byte of a multi-byte
%% UTF character is 1, while the end-of-message sequence is ASCII.

frame(Bin, Start) ->
    Sz = size(Bin),
    Scope = {Start, Sz - Start},
    case binary:match(Bin, pattern(), [{scope, Scope}]) of
        {Len, 6} ->
            <<Msg:Len/binary, _:6/binary, Rest/binary>> = Bin,
            {trim(Msg), Rest};
        nomatch ->
            Bin
    end.

%% pattern/0

pattern() ->
    Key = ?KEY(pattern),
    case get(Key) of
        undefined ->
            CP = binary:compile_pattern(<<"]]>]]>">>),
            put(Key, CP),
            CP;
        CP ->
            CP
    end.

%% trim/1
%%
%% Whitespace before an XML declaration is an error, but be somewhat
%% lenient and strip line breaks since the RFC's are unclear on what's
%% allowed following a ]]>]]> delimiter. Typical seems to be a single
%% $\n, but strip any of " \t\r\n", and regardless of NETCONF version.

trim(<<C, Bin/binary>>)
  when C == $\n;
       C == $\r;
       C == $\t;
       C == $  ->
    trim(Bin);

trim(Bin) ->
    Bin.

%% chunk/3
%%
%% The final argument is either 0 to indicate that a specified number
%% of bytes of chunk data should be consumed, or at least 3 to
%% indicate an offset at which to look for a newline following a chunk
%% size.

%% Accumulating chunk-data ...
chunk(Bin, [Sz | Chunks] = L, 0) ->
    case Bin of
        <<Chunk:Sz/binary, Rest/binary>> ->
            chunk(Rest, acc(Chunk, Chunks), 3);  %% complete chunk ...
        _ ->
            [Bin, 0 | L]                         %% ... or not
    end;

%% ... or a header.

chunk(Bin, Chunks, Len)
  when size(Bin) < 4 ->
    [Bin, 3 = Len | Chunks];

%% End of chunks.
chunk(<<"\n##\n", Rest/binary>>, Chunks, _) ->
    case Chunks of
        [] ->
            {error, "end-of-chunks unexpected"}; %% must be at least one
        Bins ->
            {lists:reverse(Bins), Rest}
    end;

%% Matching each of the 10 newline possibilities is faster than
%% searching.
chunk(<<"\n#", Head:1/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:2/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:3/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:4/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:5/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:6/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:7/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:8/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:9/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);
chunk(<<"\n#", Head:10/binary, $\n, Rest/binary>>, Chunks, _) ->
    acc(Head, Rest, Chunks);

chunk(<<"\n#", Bin:11/binary, _/binary>>, _, _) ->
    {error, {"chunk-size too long", Bin}}; %% 32-bits = max 10 digits

chunk(<<"\n#", _/binary>> = Bin, Chunks, _) ->
    [Bin, size(Bin) | Chunks];

chunk(Bin, Chunks, 3 = Len) ->
    case drop(Bin) of
        <<>> ->
            [Bin, Len | Chunks];
        <<"\n#", _/binary>> = B ->
            chunk(B, Chunks, Len);
        _ ->
            {error, {"not a chunk", Bin}}
    end.

%% drop/1

drop(<<"\n#", _/binary>> = Bin) ->
    Bin;

drop(<<C, Bin/binary>>)
  when C == $\n;
       C == $\r;
       C == $\t;
       C == $  ->
    drop(Bin);

drop(Bin) ->
    Bin.

%% acc/2

acc(Chunk, []) ->
    [B || B <- [trim(Chunk)], <<>> /= B];

acc(Chunk, Chunks) ->
    [Chunk | Chunks].

%% acc/3

acc(Head, Rest, Chunks) ->
    case chunk_size(Head) of
        {error, _Reason} = No ->
            No;
        Sz ->
            chunk(Rest, [Sz | Chunks], 0)
    end.

%% chunk_size/1

chunk_size(<<C, _/binary>> = Bin) ->
    try true = $0 < C, binary_to_integer(Bin) of
        Sz when 0 < Sz bsr 32 ->
            {error, {"chunk-size too large", Sz}};
        Sz ->
            Sz
    catch
        error: _ ->
            {error, {"chunk-size invalid", Bin}}
    end.

%%----------------------------------------------------------------------
%% END OF MODULE
%%----------------------------------------------------------------------