|
| 1 | +defmodule Csv2sql.Observer do |
| 2 | + use GenServer |
| 3 | + |
| 4 | + @status_list [:pending, :infer_schema, :insert_schema, :insert_data, :finish] |
| 5 | + @stage_list [:waiting, :working, :validation, :finish] |
| 6 | + |
| 7 | + def get_stats do |
| 8 | + try do |
| 9 | + GenServer.call(__MODULE__, :get_stats) |
| 10 | + catch |
| 11 | + _, _ -> nil |
| 12 | + end |
| 13 | + end |
| 14 | + |
| 15 | + def get_stage do |
| 16 | + GenServer.call(__MODULE__, :get_stage) |
| 17 | + end |
| 18 | + |
| 19 | + def next_file() do |
| 20 | + GenServer.call(__MODULE__, :next_file) |
| 21 | + end |
| 22 | + |
| 23 | + def start_link(_) do |
| 24 | + GenServer.start_link(__MODULE__, :no_args, name: __MODULE__) |
| 25 | + end |
| 26 | + |
| 27 | + def update_file_status(file, new_status) do |
| 28 | + GenServer.cast(__MODULE__, {:update_status, file, new_status}) |
| 29 | + end |
| 30 | + |
| 31 | + def change_stage(new_stage) do |
| 32 | + GenServer.cast(__MODULE__, {:change_stage, new_stage}) |
| 33 | + end |
| 34 | + |
| 35 | + def update_active_worker_count(worker_count) do |
| 36 | + GenServer.cast(__MODULE__, {:update_active_worker_count, worker_count}) |
| 37 | + end |
| 38 | + |
| 39 | + def init(_) do |
| 40 | + {files_map, files_to_process} = get_file_list() |
| 41 | + |
| 42 | + {:ok, |
| 43 | + %{ |
| 44 | + start_time: DateTime.utc_now(), |
| 45 | + file_list: files_map, |
| 46 | + files_to_process: files_to_process, |
| 47 | + stage: :working, |
| 48 | + active_worker_count: Application.get_env(:csv2sql, Csv2sql.MainServer)[:worker_count] |
| 49 | + }} |
| 50 | + end |
| 51 | + |
| 52 | + def handle_call( |
| 53 | + :next_file, |
| 54 | + _from, |
| 55 | + %{files_to_process: files_to_process, file_list: files_map} = state |
| 56 | + ) do |
| 57 | + {path, rest} = List.pop_at(files_to_process, -1) |
| 58 | + |
| 59 | + row_count = |
| 60 | + if path do |
| 61 | + %Csv2sql.File{row_count: row_count} = files_map[path] |
| 62 | + row_count |
| 63 | + else |
| 64 | + 0 |
| 65 | + end |
| 66 | + |
| 67 | + {:reply, {path, row_count}, Map.put(state, :files_to_process, rest)} |
| 68 | + end |
| 69 | + |
| 70 | + def handle_call( |
| 71 | + :get_stats, |
| 72 | + _from, |
| 73 | + state |
| 74 | + ) do |
| 75 | + {:reply, state, state} |
| 76 | + end |
| 77 | + |
| 78 | + def handle_call(:get_stage, _from, %{stage: stage} = state) do |
| 79 | + {:reply, stage, state} |
| 80 | + end |
| 81 | + |
| 82 | + def handle_cast({:change_stage, new_stage}, state) when new_stage in @stage_list do |
| 83 | + {:noreply, Map.put(state, :stage, new_stage)} |
| 84 | + end |
| 85 | + |
| 86 | + def handle_cast({:update_active_worker_count, worker_count}, state) do |
| 87 | + {:noreply, Map.put(state, :active_worker_count, worker_count)} |
| 88 | + end |
| 89 | + |
| 90 | + def handle_cast( |
| 91 | + {:update_status, file, status}, |
| 92 | + state |
| 93 | + ) |
| 94 | + when status in @status_list do |
| 95 | + file_struct = state.file_list[file] |
| 96 | + |
| 97 | + new_status = |
| 98 | + case {file_struct.status, status} do |
| 99 | + {{:insert_data, progress}, :insert_data} -> |
| 100 | + current_progress = |
| 101 | + progress + Application.get_env(:csv2sql, Csv2sql.Repo)[:insertion_chunk_size] |
| 102 | + |
| 103 | + current_progress = |
| 104 | + if current_progress >= file_struct.row_count, |
| 105 | + do: :finish, |
| 106 | + else: current_progress |
| 107 | + |
| 108 | + if current_progress == :finish, do: :finish, else: {:insert_data, current_progress} |
| 109 | + |
| 110 | + {_, :insert_data} -> |
| 111 | + {:insert_data, 0} |
| 112 | + |
| 113 | + _ -> |
| 114 | + status |
| 115 | + end |
| 116 | + |
| 117 | + file_struct = %Csv2sql.File{state.file_list[file] | status: new_status} |
| 118 | + |
| 119 | + { |
| 120 | + :noreply, |
| 121 | + Map.put( |
| 122 | + state, |
| 123 | + :file_list, |
| 124 | + Map.put(state.file_list, file, file_struct) |
| 125 | + ) |
| 126 | + } |
| 127 | + end |
| 128 | + |
| 129 | + def get_file_list() do |
| 130 | + source_dir = Application.get_env(:csv2sql, Csv2sql.MainServer)[:source_csv_directory] |
| 131 | + |
| 132 | + source_dir |
| 133 | + |> File.ls!() |
| 134 | + |> Enum.reject(fn file -> |
| 135 | + extension = |
| 136 | + file |
| 137 | + |> String.slice(-4..-1) |
| 138 | + |> String.downcase() |
| 139 | + |
| 140 | + extension != ".csv" |
| 141 | + end) |
| 142 | + |> Enum.reduce({%{}, []}, fn file, {file_map, file_list} -> |
| 143 | + path = "#{source_dir}/#{file}" |
| 144 | + |
| 145 | + %{size: size} = File.stat!(path) |
| 146 | + |
| 147 | + file_struct = %Csv2sql.File{ |
| 148 | + name: String.slice(file, 0..-5), |
| 149 | + path: path, |
| 150 | + raw_size: size, |
| 151 | + humanised_size: Sizeable.filesize(size), |
| 152 | + row_count: Csv2sql.ImportValidator.get_count_from_csv(path), |
| 153 | + status: :pending |
| 154 | + } |
| 155 | + |
| 156 | + {Map.put(file_map, path, file_struct), file_list ++ [path]} |
| 157 | + end) |
| 158 | + end |
| 159 | +end |
0 commit comments