HEX
Server: Apache
System: Linux opal14.opalstack.com 3.10.0-1160.108.1.el7.x86_64 #1 SMP Thu Jan 25 16:17:31 UTC 2024 x86_64
User: curbgloabal_opal (1234)
PHP: 8.1.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/erlang/lib/dialyzer-5.0.1/src/dialyzer_coordinator.erl
%% -*- erlang-indent-level: 2 -*-
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% Original author: Stavros Aronis <aronisstav@gmail.com>
%%
%% Purpose: Spawn and coordinate parallel jobs.

-module(dialyzer_coordinator).

%%% Export for dialyzer main process
-export([parallel_job/4]).

%%% Exports for all workers
-export([request_activation/1, job_done/3]).

%%% Exports for the typesig and dataflow analysis workers
-export([wait_for_success_typings/2]).

%%% Exports for the compilation workers
-export([get_next_label/2]).

-export_type([coordinator/0, mode/0, init_data/0, result/0, job/0]).

%%--------------------------------------------------------------------

-type collector()  :: pid().
-type regulator()  :: pid().
-type scc_to_pid() :: ets:tid() | 'none'.

-opaque coordinator() :: {collector(), regulator(), scc_to_pid()}.
-type timing() :: dialyzer_timing:timing_server().

-type scc()     :: [mfa_or_funlbl()].
-type mode()    :: 'typesig' | 'dataflow' | 'compile' | 'warnings' |
                   'contract_remote_types' | 'record_remote_types'.

-type compile_job()  :: file:filename().
-type typesig_job()  :: scc().
-type dataflow_job() :: module().
-type warnings_job() :: module().
-type contract_remote_types_job() :: module().
-type record_remote_types_job() :: module().

-type job() :: compile_job() | typesig_job() | dataflow_job() |
               warnings_job() | contract_remote_types_job() |
               record_remote_types_job().

-type compile_init_data()  :: dialyzer_analysis_callgraph:compile_init_data().
-type typesig_init_data()  :: dialyzer_succ_typings:typesig_init_data().
-type dataflow_init_data() :: dialyzer_succ_typings:dataflow_init_data().
-type warnings_init_data() :: dialyzer_succ_typings:warnings_init_data().
-type contract_remote_types_init_data() ::
                      dialyzer_contracts:contract_remote_types_init_data().
-type record_remote_types_init_data() ::
                      dialyzer_utils:record_remote_types_init_data().

-type compile_result()  :: dialyzer_analysis_callgraph:compile_result().
-type typesig_result()  :: [mfa_or_funlbl()].
-type dataflow_result() :: [mfa_or_funlbl()].
-type warnings_result() :: [dial_warning()].
-type contract_remote_types_result() ::
        dialyzer_contracts:contract_remote_types_result().
-type record_remote_types_result() ::
        dialyzer_utils:record_remote_types_result().

-type init_data() :: compile_init_data() | typesig_init_data() |
		     dataflow_init_data() | warnings_init_data() |
                     contract_remote_types_init_data() |
                     record_remote_types_init_data().

-type result() :: compile_result() | typesig_result() |
		  dataflow_result() | warnings_result() |
                  contract_remote_types_result() |
                  record_remote_types_result().

-type job_result() :: dialyzer_analysis_callgraph:one_file_mid_error() |
                      dialyzer_analysis_callgraph:one_file_result_ok() |
                      typesig_result() | dataflow_result() |
                      warnings_result() | contract_remote_types_result() |
                      record_remote_types_result().

-record(state, {mode           :: mode(),
		active     = 0 :: integer(),
		result         :: result(),
		next_label = 0 :: integer(),
                jobs           :: [job()],
                job_fun        :: fun(),
		init_data      :: init_data(),
		regulator      :: regulator(),
		scc_to_pid     :: scc_to_pid()
	       }).

-include("dialyzer.hrl").

%%--------------------------------------------------------------------
%% API functions for the main dialyzer process.

-spec parallel_job('compile', [compile_job()], compile_init_data(), timing()) ->
		      {compile_result(), integer()};
		  ('typesig', [typesig_job()], typesig_init_data(), timing()) ->
		      typesig_result();
		  ('dataflow', [dataflow_job()], dataflow_init_data(),
		   timing()) -> dataflow_result();
		  ('warnings', [warnings_job()], warnings_init_data(),
		   timing()) -> warnings_result();
                  ('contract_remote_types', [contract_remote_types_job()],
                   contract_remote_types_init_data(), timing()) ->
                      contract_remote_types_result();
                  ('record_remote_types', [record_remote_types_job()],
                   record_remote_types_init_data(), timing()) ->
                      record_remote_types_result().

parallel_job(Mode, Jobs, InitData, Timing) ->
  State = spawn_jobs(Mode, Jobs, InitData, Timing),
  collect_result(State).

%%--------------------------------------------------------------------
%% API functions for workers (dialyzer_worker).

-spec request_activation(coordinator()) -> ok.

request_activation({_Collector, Regulator, _SCCtoPid}) ->
  Regulator ! {req, self()},
  wait_activation().

-spec job_done(job(), job_result(), coordinator()) -> ok.

job_done(Job, Result, {Collector, Regulator, _SCCtoPid}) ->
  Regulator ! done,
  Collector ! {done, Job, Result},
  ok.

-spec get_next_label(integer(), coordinator()) -> integer().

%% For the 'compile' worker.
get_next_label(EstimatedSize, {Collector, _Regulator, _SCCtoPid}) ->
  Collector ! {next_label_request, EstimatedSize, self()},
  receive
    {next_label_reply, NextLabel} -> NextLabel
  end.

-spec wait_for_success_typings([scc() | module()], coordinator()) ->
        'ok'.

%% Helper for 'sigtype' and 'dataflow' workers.
wait_for_success_typings(SCCs, {_Collector, _Regulator, SCCtoPid}) ->
  F = fun(SCC) ->
          %% The SCCs that SCC depends on have always been started.
          try ets:lookup_element(SCCtoPid, SCC, 2) of
            Pid when is_pid(Pid) ->
              Ref = erlang:monitor(process, Pid),
              receive
                {'DOWN', Ref, process, Pid, _Info} ->
                  ok
              end
          catch
            _:_ ->
              %% Already finished.
              ok
          end
      end,
  lists:foreach(F, SCCs).


%%--------------------------------------------------------------------
%% Local functions.

spawn_jobs(Mode, Jobs, InitData, Timing) ->
  Collector = self(),
  Regulator = spawn_regulator(),

  SCCtoPid =
    if
      Mode =:= 'typesig'; Mode =:= 'dataflow' ->
        ets:new(scc_to_pid, [{read_concurrency, true}]);
      true ->
        none
    end,

  Coordinator = {Collector, Regulator, SCCtoPid},

  JobFun = job_fun(SCCtoPid, Mode, InitData, Coordinator),

  %% Limit the number of processes we start in order to save memory.
  MaxNumberOfInitJobs = 20 * dialyzer_utils:parallelism(),
  RestJobs = launch_jobs(Jobs, JobFun, MaxNumberOfInitJobs),

  Unit =
    case Mode of
      'typesig'  -> "SCCs";
      _          -> "modules"
    end,
  JobCount = length(Jobs),
  dialyzer_timing:send_size_info(Timing, JobCount, Unit),

  InitResult =
    case Mode of
      'compile' -> dialyzer_analysis_callgraph:compile_init_result();
      _ -> []
    end,

  #state{mode = Mode, active = JobCount, result = InitResult,
         next_label = 0, job_fun = JobFun, jobs = RestJobs,
         init_data = InitData, regulator = Regulator, scc_to_pid = SCCtoPid}.

launch_jobs(Jobs, _JobFun, 0) ->
  Jobs;
launch_jobs([Job|Jobs], JobFun, N) ->
  JobFun(Job),
  launch_jobs(Jobs, JobFun, N - 1);
launch_jobs([], _JobFun, _) ->
  [].

job_fun(none, Mode, InitData, Coordinator) ->
  fun(Job) ->
      _ = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
      ok
  end;
job_fun(SCCtoPid, Mode, InitData, Coordinator) ->
  fun(Job) ->
      Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
      true = ets:insert(SCCtoPid, {Job, Pid}),
      ok
  end.

collect_result(#state{mode = Mode, active = Active, result = Result,
		      next_label = NextLabel, init_data = InitData,
                      jobs = JobsLeft, job_fun = JobFun,
		      regulator = Regulator, scc_to_pid = SCCtoPid} = State) ->
  receive
    {next_label_request, Estimation, Pid} ->
      Pid ! {next_label_reply, NextLabel},
      collect_result(State#state{next_label = NextLabel + Estimation});
    {done, Job, Data} ->
      NewResult = update_result(Mode, InitData, Job, Data, Result),
      case Active of
	1 ->
          %% This was the last running job. Clean up and return the result.
	  kill_regulator(Regulator),
	  case Mode of
	    'compile' ->
	      {NewResult, NextLabel};
	    _ ->
              if
                SCCtoPid =:= none -> ok;
                true -> ets:delete(SCCtoPid)
              end,
	      NewResult
	  end;
	N ->
          if
            SCCtoPid =:= none -> ok;
            true -> true = ets:delete(SCCtoPid, Job)
          end,
          NewJobsLeft =
            case JobsLeft of
              [] -> [];
              [NewJob|JobsLeft1] ->
                JobFun(NewJob),
                JobsLeft1
            end,
          NewState = State#state{result = NewResult,
                                 jobs = NewJobsLeft,
                                 active = N - 1},
          collect_result(NewState)
      end
  end.

update_result(Mode, InitData, Job, Data, Result) ->
  if
    Mode =:= 'compile' ->
      dialyzer_analysis_callgraph:add_to_result(Job, Data, Result,
						InitData);
    Mode =:= 'typesig'; Mode =:= 'dataflow' ->
      dialyzer_succ_typings:add_to_result(Data, Result, InitData);
    true ->
      Data ++ Result
  end.

%%--------------------------------------------------------------------
%% The regulator server
%%
%% The regulator limits the number of simultaneous running jobs to the
%% number of schedulers. Note that there are usually many more worker
%% processes started, but they are only allowed to do light work (such
%% as monitoring other processes) when they have not been activated.

-spec wait_activation() -> ok.

wait_activation() ->
  receive activate -> ok end.

activate_pid(Pid) ->
  Pid ! activate.

spawn_regulator() ->
  InitTickets = dialyzer_utils:parallelism(),
  spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end).

regulator_loop(Tickets, Queue) ->
  receive
    {req, Pid} ->
      case Tickets of
	0 ->
	  regulator_loop(0, queue:in(Pid, Queue));
	N ->
	  activate_pid(Pid),
	  regulator_loop(N-1, Queue)
      end;
    done ->
      case queue:out(Queue) of
        {empty, NewQueue} ->
          regulator_loop(Tickets + 1, NewQueue);
        {{value, Pid}, NewQueue} ->
          activate_pid(Pid),
          regulator_loop(Tickets, NewQueue)
      end;
    stop -> ok
  end.

kill_regulator(Regulator) ->
  Regulator ! stop.