Skip to content

Commit e147d14

Browse files
Arpan-KreetiArp-G
authored andcommitted
add changes to observer genserver
minor fix
1 parent 5bfd9e4 commit e147d14

File tree

2 files changed

+161
-2
lines changed

2 files changed

+161
-2
lines changed

apps/csv2sql/lib/csv2sql/import_validator.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ defmodule Csv2sql.ImportValidator do
2727
validated_csv_directory =
2828
Application.get_env(:csv2sql, Csv2sql.MainServer)[:validated_csv_directory]
2929

30-
imported_csv_directory =
30+
src =
3131
if Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_data],
3232
do: Application.get_env(:csv2sql, Csv2sql.MainServer)[:imported_csv_directory],
3333
else: Application.get_env(:csv2sql, Csv2sql.MainServer)[:source_csv_directory]
3434

3535
File.rename!(
36-
"#{imported_csv_directory}/#{file}.csv",
36+
"#{src}/#{file}.csv",
3737
"#{validated_csv_directory}/#{file}.csv"
3838
)
3939

apps/csv2sql/lib/csv2sql/observer.ex

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
defmodule Csv2sql.Observer do
2+
use GenServer
3+
4+
@status_list [:pending, :infer_schema, :insert_schema, :insert_data, :finish]
5+
@stage_list [:waiting, :working, :validation, :finish]
6+
7+
def get_stats do
8+
try do
9+
GenServer.call(__MODULE__, :get_stats)
10+
catch
11+
_, _ -> nil
12+
end
13+
end
14+
15+
def get_stage do
16+
GenServer.call(__MODULE__, :get_stage)
17+
end
18+
19+
def next_file() do
20+
GenServer.call(__MODULE__, :next_file)
21+
end
22+
23+
def start_link(_) do
24+
GenServer.start_link(__MODULE__, :no_args, name: __MODULE__)
25+
end
26+
27+
def update_file_status(file, new_status) do
28+
GenServer.cast(__MODULE__, {:update_status, file, new_status})
29+
end
30+
31+
def change_stage(new_stage) do
32+
GenServer.cast(__MODULE__, {:change_stage, new_stage})
33+
end
34+
35+
def update_active_worker_count(worker_count) do
36+
GenServer.cast(__MODULE__, {:update_active_worker_count, worker_count})
37+
end
38+
39+
def init(_) do
40+
{files_map, files_to_process} = get_file_list()
41+
42+
{:ok,
43+
%{
44+
start_time: DateTime.utc_now(),
45+
file_list: files_map,
46+
files_to_process: files_to_process,
47+
stage: :working,
48+
active_worker_count: Application.get_env(:csv2sql, Csv2sql.MainServer)[:worker_count]
49+
}}
50+
end
51+
52+
def handle_call(
53+
:next_file,
54+
_from,
55+
%{files_to_process: files_to_process, file_list: files_map} = state
56+
) do
57+
{path, rest} = List.pop_at(files_to_process, -1)
58+
59+
row_count =
60+
if path do
61+
%Csv2sql.File{row_count: row_count} = files_map[path]
62+
row_count
63+
else
64+
0
65+
end
66+
67+
{:reply, {path, row_count}, Map.put(state, :files_to_process, rest)}
68+
end
69+
70+
def handle_call(
71+
:get_stats,
72+
_from,
73+
state
74+
) do
75+
{:reply, state, state}
76+
end
77+
78+
def handle_call(:get_stage, _from, %{stage: stage} = state) do
79+
{:reply, stage, state}
80+
end
81+
82+
def handle_cast({:change_stage, new_stage}, state) when new_stage in @stage_list do
83+
{:noreply, Map.put(state, :stage, new_stage)}
84+
end
85+
86+
def handle_cast({:update_active_worker_count, worker_count}, state) do
87+
{:noreply, Map.put(state, :active_worker_count, worker_count)}
88+
end
89+
90+
def handle_cast(
91+
{:update_status, file, status},
92+
state
93+
)
94+
when status in @status_list do
95+
file_struct = state.file_list[file]
96+
97+
new_status =
98+
case {file_struct.status, status} do
99+
{{:insert_data, progress}, :insert_data} ->
100+
current_progress =
101+
progress + Application.get_env(:csv2sql, Csv2sql.Repo)[:insertion_chunk_size]
102+
103+
current_progress =
104+
if current_progress >= file_struct.row_count,
105+
do: :finish,
106+
else: current_progress
107+
108+
if current_progress == :finish, do: :finish, else: {:insert_data, current_progress}
109+
110+
{_, :insert_data} ->
111+
{:insert_data, 0}
112+
113+
_ ->
114+
status
115+
end
116+
117+
file_struct = %Csv2sql.File{state.file_list[file] | status: new_status}
118+
119+
{
120+
:noreply,
121+
Map.put(
122+
state,
123+
:file_list,
124+
Map.put(state.file_list, file, file_struct)
125+
)
126+
}
127+
end
128+
129+
def get_file_list() do
130+
source_dir = Application.get_env(:csv2sql, Csv2sql.MainServer)[:source_csv_directory]
131+
132+
source_dir
133+
|> File.ls!()
134+
|> Enum.reject(fn file ->
135+
extension =
136+
file
137+
|> String.slice(-4..-1)
138+
|> String.downcase()
139+
140+
extension != ".csv"
141+
end)
142+
|> Enum.reduce({%{}, []}, fn file, {file_map, file_list} ->
143+
path = "#{source_dir}/#{file}"
144+
145+
%{size: size} = File.stat!(path)
146+
147+
file_struct = %Csv2sql.File{
148+
name: String.slice(file, 0..-5),
149+
path: path,
150+
raw_size: size,
151+
humanised_size: Sizeable.filesize(size),
152+
row_count: Csv2sql.ImportValidator.get_count_from_csv(path),
153+
status: :pending
154+
}
155+
156+
{Map.put(file_map, path, file_struct), file_list ++ [path]}
157+
end)
158+
end
159+
end

0 commit comments

Comments
 (0)