Spaces:
Sleeping
Sleeping
File size: 20,151 Bytes
cb32008 5c156e2 cb32008 5c156e2 cb32008 5c156e2 cb32008 5c156e2 cb32008 5c156e2 cb32008 a7e1301 cb32008 a7e1301 cb32008 af6f438 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 a7e1301 cb32008 a7e1301 af6f438 a7e1301 af6f438 cb32008 a7e1301 af6f438 cb32008 a7e1301 af6f438 a7e1301 af6f438 cb32008 a7e1301 af6f438 cb32008 a7e1301 cb32008 a7e1301 cb32008 a7e1301 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 6c63d28 cb32008 6c63d28 cb32008 e390221 cb32008 6c63d28 cb32008 6c63d28 cb32008 6c63d28 e390221 cb32008 6c63d28 e390221 6c63d28 e390221 cb32008 6c63d28 e390221 6c63d28 e390221 cb32008 e390221 cb32008 5c156e2 e390221 5c156e2 e390221 5c156e2 6c63d28 cb32008 6c63d28 cb32008 6c63d28 cb32008 6c63d28 cb32008 5c156e2 cb32008 5c156e2 cb32008 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# noise_scenarios.py - 노이즈 시나리오 생성 엔진
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import random
def generate_typhoon_scenario(df, intensity=1.0):
"""
태풍 시나리오: 급격한 기압 하강 + 강풍 + 조위 상승
intensity: 0.5(약함) ~ 2.0(매우 강함)
df는 이미 144개로 슬라이싱된 상태
"""
print(f"🌀 태풍 시나리오 생성 (강도: {intensity}, 데이터: {len(df)}개)")
df_noisy = df.copy()
n_points = len(df) # 144개
# 태풍 시나리오 설정 (144개 기준)
if n_points >= 72:
# 충분한 데이터가 있으면 중간-후반에 태풍 배치
typhoon_center = int(n_points * 0.7) # 70% 지점 (약 100번째)
typhoon_duration = min(int(24 * intensity), n_points // 3) # 최대 2시간
else:
# 적은 데이터면 중간에 배치
typhoon_center = n_points // 2
typhoon_duration = min(int(12 * intensity), n_points // 2) # 최대 1시간
start_idx = max(0, typhoon_center - typhoon_duration // 2)
end_idx = min(n_points, typhoon_center + typhoon_duration // 2)
print(f" 🌪️ 태풍 구간: {start_idx}-{end_idx} ({end_idx-start_idx}개 포인트)")
for i in range(start_idx, end_idx):
# 안전한 인덱스 체크
if i >= len(df_noisy):
break
# 태풍 중심으로부터의 거리 (0~1)
if typhoon_duration <= 2:
distance_from_center = 0 # 매우 짧은 경우 균등 적용
else:
distance_from_center = abs(i - typhoon_center) / max(1, typhoon_duration // 2)
typhoon_strength = (1 - distance_from_center) * intensity
# DataFrame 인덱스 리셋 (안전한 접근을 위해)
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1]
# 1. 기압 급강하 (중심에서 최대 -80hPa) - 더 강하게
if 'air_pres' in df_noisy.columns:
pressure_drop = -80 * typhoon_strength * (1 + np.random.normal(0, 0.3))
df_noisy.at[actual_idx, 'air_pres'] += pressure_drop
# 2. 강풍 (최대 40m/s) - 더 강하게
if 'wind_speed' in df_noisy.columns:
wind_boost = 35 * typhoon_strength * (1 + np.random.normal(0, 0.4))
df_noisy.at[actual_idx, 'wind_speed'] += wind_boost
# 3. 풍향 변화 (태풍 회전) - 더 극적으로
if 'wind_dir' in df_noisy.columns:
wind_dir_change = 270 * typhoon_strength * np.sin(distance_from_center * np.pi * 2)
current_dir = df_noisy.at[actual_idx, 'wind_dir']
df_noisy.at[actual_idx, 'wind_dir'] = (current_dir + wind_dir_change) % 360
# 4. 폭풍 해일로 인한 조위 상승 (최대 +150cm) - 더 강하게
if 'tide_level' in df_noisy.columns:
storm_surge = 120 * typhoon_strength * (1 + np.random.normal(0, 0.5))
df_noisy.at[actual_idx, 'tide_level'] += storm_surge
# 5. 기온 변화 (구름으로 인한 온도 하강) - 더 강하게
if 'air_temp' in df_noisy.columns:
temp_drop = -12 * typhoon_strength * (1 + np.random.normal(0, 0.4))
df_noisy.at[actual_idx, 'air_temp'] += temp_drop
return df_noisy
def generate_sensor_malfunction_scenario(df, intensity=1.0):
"""
센서 오작동 시나리오: 랜덤한 극값 + 스파이크 노이즈
intensity: 0.5(약함) ~ 2.0(심각함)
"""
print(f"📡 센서 오작동 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 오작동 포인트 수 (전체의 5-20%)
malfunction_count = int(n_points * 0.05 * intensity)
malfunction_indices = random.sample(range(n_points), malfunction_count)
for idx in malfunction_indices:
# 안전한 인덱스 체크
if idx >= len(df_noisy):
continue
# 랜덤하게 한 개 컬럼 선택하여 오작동
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist()
if 'date' in numeric_cols:
numeric_cols.remove('date')
if not numeric_cols:
continue
malfunction_col = random.choice(numeric_cols)
# 오작동 유형 랜덤 선택
malfunction_type = random.choice(['spike', 'stuck', 'drift', 'outlier'])
# 안전한 인덱스 접근
actual_idx = df_noisy.index[idx] if idx < len(df_noisy) else df_noisy.index[-1]
if malfunction_type == 'spike':
# 급격한 스파이크
spike_magnitude = intensity * 5 * random.choice([-1, 1])
original_val = df_noisy.at[actual_idx, malfunction_col]
df_noisy.at[actual_idx, malfunction_col] = original_val * (1 + spike_magnitude)
elif malfunction_type == 'stuck':
# 값이 고정됨 (5-10개 포인트)
stuck_duration = random.randint(3, int(8 * intensity))
stuck_value = df_noisy.at[actual_idx, malfunction_col]
end_idx = min(n_points - 1, idx + stuck_duration)
for i in range(idx, end_idx + 1):
if i < len(df_noisy):
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = stuck_value
elif malfunction_type == 'drift':
# 점진적 드리프트
drift_duration = random.randint(10, int(30 * intensity))
drift_magnitude = intensity * 2 * random.choice([-1, 1])
end_idx = min(n_points - 1, idx + drift_duration)
for i in range(idx, end_idx + 1):
if i < len(df_noisy):
drift_factor = (i - idx) / max(1, drift_duration) * drift_magnitude
original_val = df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)]
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = original_val * (1 + drift_factor)
elif malfunction_type == 'outlier':
# 극값 아웃라이어
col_std = df_noisy[malfunction_col].std()
col_mean = df_noisy[malfunction_col].mean()
outlier_val = col_mean + random.choice([-1, 1]) * col_std * 5 * intensity
df_noisy.at[actual_idx, malfunction_col] = outlier_val
return df_noisy
def generate_burst_missing_scenario(df, intensity=1.0):
"""
연속 결측치 시나리오: 센서 완전 실패
intensity: 0.5(짧은 결측) ~ 2.0(긴 결측)
"""
print(f"❌ 연속 결측치 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 결측 구간 수 (1-3개)
num_missing_blocks = random.randint(1, 3)
for _ in range(num_missing_blocks):
# 결측 구간 길이 (30분 ~ 6시간), 하지만 전체 길이를 초과하지 않음
max_duration = min(int(72 * intensity), n_points // 2)
missing_duration = random.randint(int(6 * intensity), max(int(6 * intensity) + 1, max_duration))
start_idx = random.randint(0, max(1, n_points - missing_duration))
end_idx = min(n_points, start_idx + missing_duration)
# 결측시킬 컬럼들 랜덤 선택 (tide_level 포함)
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist()
if 'date' in numeric_cols:
numeric_cols.remove('date')
# 강도에 따라 결측 컬럼 수 결정
missing_cols_count = random.randint(1, min(len(numeric_cols), int(3 * intensity)))
missing_cols = random.sample(numeric_cols, missing_cols_count)
print(f" 📍 {start_idx}-{end_idx} 구간에서 {missing_cols} 결측 처리")
# 해당 구간을 NaN으로 설정
for col in missing_cols:
df_noisy.loc[start_idx:end_idx, col] = np.nan
return df_noisy
def generate_extreme_weather_scenario(df, intensity=1.0):
"""
극한 기상 시나리오: 폭염, 한파, 폭설 등
intensity: 0.5(보통) ~ 2.0(극한)
"""
print(f"🌡️ 극한 기상 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 극한 기상 유형 선택
weather_type = random.choice(['heatwave', 'coldwave', 'highpressure', 'lowpressure'])
# 영향 구간 (2-8시간), 하지만 전체 길이를 초과하지 않음
max_duration = min(int(96 * intensity), n_points // 2)
duration = random.randint(int(24 * intensity), max(int(24 * intensity) + 1, max_duration))
start_idx = random.randint(0, max(1, n_points - duration))
end_idx = min(n_points, start_idx + duration)
for i in range(start_idx, end_idx):
# 안전한 인덱스 체크
if i >= len(df_noisy):
break
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1]
progress = (i - start_idx) / max(1, duration) # 0~1
effect_strength = np.sin(progress * np.pi) * intensity # 중간에 최대
if weather_type == 'heatwave':
# 폭염: 고온 + 저기압 + 약한 바람
if 'air_temp' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_temp'] += 15 * effect_strength
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] -= 10 * effect_strength
if 'wind_speed' in df_noisy.columns:
current_wind = df_noisy.at[actual_idx, 'wind_speed']
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.5 * effect_strength)
if 'tide_level' in df_noisy.columns:
# 열팽창으로 미세한 해수면 상승
df_noisy.at[actual_idx, 'tide_level'] += 5 * effect_strength
elif weather_type == 'coldwave':
# 한파: 저온 + 고기압 + 강한 바람
if 'air_temp' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_temp'] -= 20 * effect_strength
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] += 20 * effect_strength
if 'wind_speed' in df_noisy.columns:
df_noisy.at[actual_idx, 'wind_speed'] += 10 * effect_strength
if 'tide_level' in df_noisy.columns:
# 해수 수축으로 미세한 해수면 하강
df_noisy.at[actual_idx, 'tide_level'] -= 3 * effect_strength
elif weather_type == 'highpressure':
# 고기압: 맑은 날씨, 약한 바람
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] += 25 * effect_strength
if 'wind_speed' in df_noisy.columns:
current_wind = df_noisy.at[actual_idx, 'wind_speed']
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.7 * effect_strength)
elif weather_type == 'lowpressure':
# 저기압: 흐린 날씨, 강한 바람
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] -= 20 * effect_strength
if 'wind_speed' in df_noisy.columns:
df_noisy.at[actual_idx, 'wind_speed'] += 8 * effect_strength
if 'tide_level' in df_noisy.columns:
# 저기압으로 인한 조위 상승
df_noisy.at[actual_idx, 'tide_level'] += 15 * effect_strength
return df_noisy
def create_noise_comparison_plot(df_original, df_noisy, scenario_name):
"""
원본 vs 노이즈 데이터 비교 시각화 (개선된 버전)
"""
fig = make_subplots(
rows=3, cols=2,
subplot_titles=['🌊 조위 (tide_level)', '🌬️ 기압 (air_pres)',
'💨 풍속 (wind_speed)', '🌡️ 기온 (air_temp)',
'🧭 풍향 (wind_dir)', '📊 조위 전체 비교'],
vertical_spacing=0.10,
horizontal_spacing=0.08
)
# 안전한 DataFrame 정렬 - 같은 길이와 인덱스로 맞추기
df_orig_slice = df_original.tail(len(df_noisy)).reset_index(drop=True)
df_noisy_reset = df_noisy.reset_index(drop=True)
# 최소 길이로 맞추기
min_len = min(len(df_orig_slice), len(df_noisy_reset))
df_orig_slice = df_orig_slice[:min_len]
df_noisy_reset = df_noisy_reset[:min_len]
# 시간축 (공통 길이 사용)
time_axis = list(range(len(df_orig_slice)))
# 색상 설정 (더 선명하게)
original_color = '#2E86AB' # 진한 파랑
noise_color = '#F24236' # 진한 빨강
# 각 변수별 비교 플롯
variables = ['tide_level', 'air_pres', 'wind_speed', 'air_temp', 'wind_dir']
positions = [(1,1), (1,2), (2,1), (2,2), (3,1)]
var_units = ['cm', 'hPa', 'm/s', '°C', '°']
for var, (row, col), unit in zip(variables, positions, var_units):
if var in df_orig_slice.columns and var in df_noisy_reset.columns:
try:
# 원본 데이터 (실선, 두껍게)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_orig_slice[var],
name=f'🔵 원본',
line=dict(color=original_color, width=3),
showlegend=(row==1 and col==1),
hovertemplate=f'원본 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>'
),
row=row, col=col
)
# 노이즈 데이터 (점선, 두껍게)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_noisy_reset[var],
name=f'🔴 노이즈',
line=dict(color=noise_color, width=2.5, dash='dash'),
showlegend=(row==1 and col==1),
hovertemplate=f'노이즈 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>'
),
row=row, col=col
)
# Y축 범위 자동 조정 (변화를 더 명확히 보기 위해) - 안전하게
orig_vals = df_orig_slice[var].dropna()
noisy_vals = df_noisy_reset[var].dropna()
if len(orig_vals) > 0 and len(noisy_vals) > 0:
original_range = orig_vals.max() - orig_vals.min()
noisy_range = noisy_vals.max() - noisy_vals.min()
# 더 큰 범위를 기준으로 패딩 적용
total_range = max(original_range, noisy_range)
y_center = (orig_vals.mean() + noisy_vals.mean()) / 2
if total_range > 0:
y_min = y_center - total_range * 0.6
y_max = y_center + total_range * 0.6
fig.update_yaxes(range=[y_min, y_max], row=row, col=col)
except Exception as e:
print(f"변수 {var} 플롯 생성 오류: {e}")
continue
# 전체 비교 (tide_level 중심) - 더 크고 선명하게
if 'tide_level' in df_orig_slice.columns and 'tide_level' in df_noisy_reset.columns:
try:
fig.add_trace(
go.Scatter(
x=time_axis, y=df_orig_slice['tide_level'],
name='🔵 원본 조위',
line=dict(color=original_color, width=4),
showlegend=True,
hovertemplate='원본 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>'
),
row=3, col=2
)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_noisy_reset['tide_level'],
name='🔴 노이즈 조위',
line=dict(color=noise_color, width=3, dash='dash'),
showlegend=True,
hovertemplate='노이즈 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>'
),
row=3, col=2
)
except Exception as e:
print(f"전체 조위 비교 플롯 생성 오류: {e}")
# 노이즈 시나리오 구간 하이라이트 추가
def add_scenario_highlight(scenario_name):
if scenario_name == 'typhoon':
# 태풍 구간 계산 (정렬된 DataFrame 기준)
n_points = len(df_orig_slice)
if n_points >= 72:
typhoon_center = int(n_points * 0.7) # 70% 지점
typhoon_duration = min(24, n_points // 3) # 최대 2시간
else:
typhoon_center = n_points // 2
typhoon_duration = min(12, n_points // 2) # 최대 1시간
start_idx = max(0, typhoon_center - typhoon_duration // 2)
end_idx = min(n_points, typhoon_center + typhoon_duration // 2)
print(f" 📍 시각화 태풍 구간: {start_idx}-{end_idx}")
# 모든 서브플롯에 배경 영역 추가
try:
for row in range(1, 4):
for col in range(1, 3):
fig.add_vrect(
x0=start_idx, x1=end_idx,
fillcolor="rgba(255,0,0,0.15)",
layer="below", line_width=0,
annotation_text="🌀 태풍 구간",
annotation_position="top left",
row=row, col=col
)
except Exception as e:
print(f"태풍 구간 하이라이트 오류: {e}")
add_scenario_highlight(scenario_name)
# 레이아웃 개선
fig.update_layout(
title={
'text': f"🌪️ 노이즈 시나리오: {scenario_name}",
'x': 0.5,
'font': {'size': 20, 'color': '#2E86AB'}
},
height=900,
showlegend=True,
legend=dict(
x=0.02, # 왼쪽으로 이동
y=0.98, # 위쪽으로 이동
bgcolor='rgba(255,255,255,0.8)',
bordercolor='gray',
borderwidth=1,
font=dict(size=12)
),
plot_bgcolor='rgba(248,249,250,0.8)',
paper_bgcolor='white'
)
# X축 레이블 개선
fig.update_xaxes(title_text="시간 순서", showgrid=True, gridcolor='lightgray')
fig.update_yaxes(showgrid=True, gridcolor='lightgray')
return fig
def apply_noise_scenario(df, scenario_type, intensity=1.0):
"""
선택된 노이즈 시나리오 적용 (144개 슬라이싱 후)
"""
scenario_functions = {
'typhoon': generate_typhoon_scenario,
'sensor_malfunction': generate_sensor_malfunction_scenario,
'burst_missing': generate_burst_missing_scenario,
'extreme_weather': generate_extreme_weather_scenario
}
if scenario_type not in scenario_functions:
raise ValueError(f"Unknown scenario type: {scenario_type}")
print(f"\n🌪️ {scenario_type} 시나리오 적용 중...")
# 1. 먼저 마지막 144개로 슬라이싱 (실제 모델 입력과 동일)
print(f"📊 원본 데이터: {len(df)}행")
df_sliced = df.tail(144).copy()
print(f"✂️ 슬라이싱 후: {len(df_sliced)}행 (마지막 144개)")
# 2. 슬라이싱된 데이터에 노이즈 적용
df_noisy = scenario_functions[scenario_type](df_sliced, intensity)
# 3. 비교 시각화 생성 (슬라이싱된 원본 vs 노이즈)
plot = create_noise_comparison_plot(df_sliced, df_noisy, scenario_type)
return df_noisy, plot |