Spaces:
Sleeping
Sleeping
File size: 20,151 Bytes
cb32008 5c156e2 cb32008 5c156e2 cb32008 5c156e2 cb32008 5c156e2 cb32008 5c156e2 cb32008 a7e1301 cb32008 a7e1301 cb32008 af6f438 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 5c156e2 cb32008 5c156e2 af6f438 cb32008 a7e1301 cb32008 a7e1301 af6f438 a7e1301 af6f438 cb32008 a7e1301 af6f438 cb32008 a7e1301 af6f438 a7e1301 af6f438 cb32008 a7e1301 af6f438 cb32008 a7e1301 cb32008 a7e1301 cb32008 a7e1301 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 af6f438 cb32008 6c63d28 cb32008 6c63d28 cb32008 e390221 cb32008 6c63d28 cb32008 6c63d28 cb32008 6c63d28 e390221 cb32008 6c63d28 e390221 6c63d28 e390221 cb32008 6c63d28 e390221 6c63d28 e390221 cb32008 e390221 cb32008 5c156e2 e390221 5c156e2 e390221 5c156e2 6c63d28 cb32008 6c63d28 cb32008 6c63d28 cb32008 6c63d28 cb32008 5c156e2 cb32008 5c156e2 cb32008 |
|
# noise_scenarios.py - 노이즈 시나리오 생성 엔진
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import random
def generate_typhoon_scenario(df, intensity=1.0):
"""
태풍 시나리오: 급격한 기압 하강 + 강풍 + 조위 상승
intensity: 0.5(약함) ~ 2.0(매우 강함)
df는 이미 144개로 슬라이싱된 상태
"""
print(f"🌀 태풍 시나리오 생성 (강도: {intensity}, 데이터: {len(df)}개)")
df_noisy = df.copy()
n_points = len(df) # 144개
# 태풍 시나리오 설정 (144개 기준)
if n_points >= 72:
# 충분한 데이터가 있으면 중간-후반에 태풍 배치
typhoon_center = int(n_points * 0.7) # 70% 지점 (약 100번째)
typhoon_duration = min(int(24 * intensity), n_points // 3) # 최대 2시간
else:
# 적은 데이터면 중간에 배치
typhoon_center = n_points // 2
typhoon_duration = min(int(12 * intensity), n_points // 2) # 최대 1시간
start_idx = max(0, typhoon_center - typhoon_duration // 2)
end_idx = min(n_points, typhoon_center + typhoon_duration // 2)
print(f" 🌪️ 태풍 구간: {start_idx}-{end_idx} ({end_idx-start_idx}개 포인트)")
for i in range(start_idx, end_idx):
# 안전한 인덱스 체크
if i >= len(df_noisy):
break
# 태풍 중심으로부터의 거리 (0~1)
if typhoon_duration <= 2:
distance_from_center = 0 # 매우 짧은 경우 균등 적용
else:
distance_from_center = abs(i - typhoon_center) / max(1, typhoon_duration // 2)
typhoon_strength = (1 - distance_from_center) * intensity
# DataFrame 인덱스 리셋 (안전한 접근을 위해)
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1]
# 1. 기압 급강하 (중심에서 최대 -80hPa) - 더 강하게
if 'air_pres' in df_noisy.columns:
pressure_drop = -80 * typhoon_strength * (1 + np.random.normal(0, 0.3))
df_noisy.at[actual_idx, 'air_pres'] += pressure_drop
# 2. 강풍 (최대 40m/s) - 더 강하게
if 'wind_speed' in df_noisy.columns:
wind_boost = 35 * typhoon_strength * (1 + np.random.normal(0, 0.4))
df_noisy.at[actual_idx, 'wind_speed'] += wind_boost
# 3. 풍향 변화 (태풍 회전) - 더 극적으로
if 'wind_dir' in df_noisy.columns:
wind_dir_change = 270 * typhoon_strength * np.sin(distance_from_center * np.pi * 2)
current_dir = df_noisy.at[actual_idx, 'wind_dir']
df_noisy.at[actual_idx, 'wind_dir'] = (current_dir + wind_dir_change) % 360
# 4. 폭풍 해일로 인한 조위 상승 (최대 +150cm) - 더 강하게
if 'tide_level' in df_noisy.columns:
storm_surge = 120 * typhoon_strength * (1 + np.random.normal(0, 0.5))
df_noisy.at[actual_idx, 'tide_level'] += storm_surge
# 5. 기온 변화 (구름으로 인한 온도 하강) - 더 강하게
if 'air_temp' in df_noisy.columns:
temp_drop = -12 * typhoon_strength * (1 + np.random.normal(0, 0.4))
df_noisy.at[actual_idx, 'air_temp'] += temp_drop
return df_noisy
def generate_sensor_malfunction_scenario(df, intensity=1.0):
"""
센서 오작동 시나리오: 랜덤한 극값 + 스파이크 노이즈
intensity: 0.5(약함) ~ 2.0(심각함)
"""
print(f"📡 센서 오작동 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 오작동 포인트 수 (전체의 5-20%)
malfunction_count = int(n_points * 0.05 * intensity)
malfunction_indices = random.sample(range(n_points), malfunction_count)
for idx in malfunction_indices:
# 안전한 인덱스 체크
if idx >= len(df_noisy):
continue
# 랜덤하게 한 개 컬럼 선택하여 오작동
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist()
if 'date' in numeric_cols:
numeric_cols.remove('date')
if not numeric_cols:
continue
malfunction_col = random.choice(numeric_cols)
# 오작동 유형 랜덤 선택
malfunction_type = random.choice(['spike', 'stuck', 'drift', 'outlier'])
# 안전한 인덱스 접근
actual_idx = df_noisy.index[idx] if idx < len(df_noisy) else df_noisy.index[-1]
if malfunction_type == 'spike':
# 급격한 스파이크
spike_magnitude = intensity * 5 * random.choice([-1, 1])
original_val = df_noisy.at[actual_idx, malfunction_col]
df_noisy.at[actual_idx, malfunction_col] = original_val * (1 + spike_magnitude)
elif malfunction_type == 'stuck':
# 값이 고정됨 (5-10개 포인트)
stuck_duration = random.randint(3, int(8 * intensity))
stuck_value = df_noisy.at[actual_idx, malfunction_col]
end_idx = min(n_points - 1, idx + stuck_duration)
for i in range(idx, end_idx + 1):
if i < len(df_noisy):
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = stuck_value
elif malfunction_type == 'drift':
# 점진적 드리프트
drift_duration = random.randint(10, int(30 * intensity))
drift_magnitude = intensity * 2 * random.choice([-1, 1])
end_idx = min(n_points - 1, idx + drift_duration)
for i in range(idx, end_idx + 1):
if i < len(df_noisy):
drift_factor = (i - idx) / max(1, drift_duration) * drift_magnitude
original_val = df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)]
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = original_val * (1 + drift_factor)
elif malfunction_type == 'outlier':
# 극값 아웃라이어
col_std = df_noisy[malfunction_col].std()
col_mean = df_noisy[malfunction_col].mean()
outlier_val = col_mean + random.choice([-1, 1]) * col_std * 5 * intensity
df_noisy.at[actual_idx, malfunction_col] = outlier_val
return df_noisy
def generate_burst_missing_scenario(df, intensity=1.0):
"""
연속 결측치 시나리오: 센서 완전 실패
intensity: 0.5(짧은 결측) ~ 2.0(긴 결측)
"""
print(f"❌ 연속 결측치 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 결측 구간 수 (1-3개)
num_missing_blocks = random.randint(1, 3)
for _ in range(num_missing_blocks):
# 결측 구간 길이 (30분 ~ 6시간), 하지만 전체 길이를 초과하지 않음
max_duration = min(int(72 * intensity), n_points // 2)
missing_duration = random.randint(int(6 * intensity), max(int(6 * intensity) + 1, max_duration))
start_idx = random.randint(0, max(1, n_points - missing_duration))
end_idx = min(n_points, start_idx + missing_duration)
# 결측시킬 컬럼들 랜덤 선택 (tide_level 포함)
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist()
if 'date' in numeric_cols:
numeric_cols.remove('date')
# 강도에 따라 결측 컬럼 수 결정
missing_cols_count = random.randint(1, min(len(numeric_cols), int(3 * intensity)))
missing_cols = random.sample(numeric_cols, missing_cols_count)
print(f" 📍 {start_idx}-{end_idx} 구간에서 {missing_cols} 결측 처리")
# 해당 구간을 NaN으로 설정
for col in missing_cols:
df_noisy.loc[start_idx:end_idx, col] = np.nan
return df_noisy
def generate_extreme_weather_scenario(df, intensity=1.0):
"""
극한 기상 시나리오: 폭염, 한파, 폭설 등
intensity: 0.5(보통) ~ 2.0(극한)
"""
print(f"🌡️ 극한 기상 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 극한 기상 유형 선택
weather_type = random.choice(['heatwave', 'coldwave', 'highpressure', 'lowpressure'])
# 영향 구간 (2-8시간), 하지만 전체 길이를 초과하지 않음
max_duration = min(int(96 * intensity), n_points // 2)
duration = random.randint(int(24 * intensity), max(int(24 * intensity) + 1, max_duration))
start_idx = random.randint(0, max(1, n_points - duration))
end_idx = min(n_points, start_idx + duration)
for i in range(start_idx, end_idx):
# 안전한 인덱스 체크
if i >= len(df_noisy):
break
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1]
progress = (i - start_idx) / max(1, duration) # 0~1
effect_strength = np.sin(progress * np.pi) * intensity # 중간에 최대
if weather_type == 'heatwave':
# 폭염: 고온 + 저기압 + 약한 바람
if 'air_temp' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_temp'] += 15 * effect_strength
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] -= 10 * effect_strength
if 'wind_speed' in df_noisy.columns:
current_wind = df_noisy.at[actual_idx, 'wind_speed']
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.5 * effect_strength)
if 'tide_level' in df_noisy.columns:
# 열팽창으로 미세한 해수면 상승
df_noisy.at[actual_idx, 'tide_level'] += 5 * effect_strength
elif weather_type == 'coldwave':
# 한파: 저온 + 고기압 + 강한 바람
if 'air_temp' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_temp'] -= 20 * effect_strength
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] += 20 * effect_strength
if 'wind_speed' in df_noisy.columns:
df_noisy.at[actual_idx, 'wind_speed'] += 10 * effect_strength
if 'tide_level' in df_noisy.columns:
# 해수 수축으로 미세한 해수면 하강
df_noisy.at[actual_idx, 'tide_level'] -= 3 * effect_strength
elif weather_type == 'highpressure':
# 고기압: 맑은 날씨, 약한 바람
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] += 25 * effect_strength
if 'wind_speed' in df_noisy.columns:
current_wind = df_noisy.at[actual_idx, 'wind_speed']
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.7 * effect_strength)
elif weather_type == 'lowpressure':
# 저기압: 흐린 날씨, 강한 바람
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] -= 20 * effect_strength
if 'wind_speed' in df_noisy.columns:
df_noisy.at[actual_idx, 'wind_speed'] += 8 * effect_strength
if 'tide_level' in df_noisy.columns:
# 저기압으로 인한 조위 상승
df_noisy.at[actual_idx, 'tide_level'] += 15 * effect_strength
return df_noisy
def create_noise_comparison_plot(df_original, df_noisy, scenario_name):
"""
원본 vs 노이즈 데이터 비교 시각화 (개선된 버전)
"""
fig = make_subplots(
rows=3, cols=2,
subplot_titles=['🌊 조위 (tide_level)', '🌬️ 기압 (air_pres)',
'💨 풍속 (wind_speed)', '🌡️ 기온 (air_temp)',
'🧭 풍향 (wind_dir)', '📊 조위 전체 비교'],
vertical_spacing=0.10,
horizontal_spacing=0.08
)
# 안전한 DataFrame 정렬 - 같은 길이와 인덱스로 맞추기
df_orig_slice = df_original.tail(len(df_noisy)).reset_index(drop=True)
df_noisy_reset = df_noisy.reset_index(drop=True)
# 최소 길이로 맞추기
min_len = min(len(df_orig_slice), len(df_noisy_reset))
df_orig_slice = df_orig_slice[:min_len]
df_noisy_reset = df_noisy_reset[:min_len]
# 시간축 (공통 길이 사용)
time_axis = list(range(len(df_orig_slice)))
# 색상 설정 (더 선명하게)
original_color = '#2E86AB' # 진한 파랑
noise_color = '#F24236' # 진한 빨강
# 각 변수별 비교 플롯
variables = ['tide_level', 'air_pres', 'wind_speed', 'air_temp', 'wind_dir']
positions = [(1,1), (1,2), (2,1), (2,2), (3,1)]
var_units = ['cm', 'hPa', 'm/s', '°C', '°']
for var, (row, col), unit in zip(variables, positions, var_units):
if var in df_orig_slice.columns and var in df_noisy_reset.columns:
try:
# 원본 데이터 (실선, 두껍게)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_orig_slice[var],
name=f'🔵 원본',
line=dict(color=original_color, width=3),
showlegend=(row==1 and col==1),
hovertemplate=f'원본 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>'
),
row=row, col=col
)
# 노이즈 데이터 (점선, 두껍게)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_noisy_reset[var],
name=f'🔴 노이즈',
line=dict(color=noise_color, width=2.5, dash='dash'),
showlegend=(row==1 and col==1),
hovertemplate=f'노이즈 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>'
),
row=row, col=col
)
# Y축 범위 자동 조정 (변화를 더 명확히 보기 위해) - 안전하게
orig_vals = df_orig_slice[var].dropna()
noisy_vals = df_noisy_reset[var].dropna()
if len(orig_vals) > 0 and len(noisy_vals) > 0:
original_range = orig_vals.max() - orig_vals.min()
noisy_range = noisy_vals.max() - noisy_vals.min()
# 더 큰 범위를 기준으로 패딩 적용
total_range = max(original_range, noisy_range)
y_center = (orig_vals.mean() + noisy_vals.mean()) / 2
if total_range > 0:
y_min = y_center - total_range * 0.6
y_max = y_center + total_range * 0.6
fig.update_yaxes(range=[y_min, y_max], row=row, col=col)
except Exception as e:
print(f"변수 {var} 플롯 생성 오류: {e}")
continue
# 전체 비교 (tide_level 중심) - 더 크고 선명하게
if 'tide_level' in df_orig_slice.columns and 'tide_level' in df_noisy_reset.columns:
try:
fig.add_trace(
go.Scatter(
x=time_axis, y=df_orig_slice['tide_level'],
name='🔵 원본 조위',
line=dict(color=original_color, width=4),
showlegend=True,
hovertemplate='원본 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>'
),
row=3, col=2
)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_noisy_reset['tide_level'],
name='🔴 노이즈 조위',
line=dict(color=noise_color, width=3, dash='dash'),
showlegend=True,
hovertemplate='노이즈 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>'
),
row=3, col=2
)
except Exception as e:
print(f"전체 조위 비교 플롯 생성 오류: {e}")
# 노이즈 시나리오 구간 하이라이트 추가
def add_scenario_highlight(scenario_name):
if scenario_name == 'typhoon':
# 태풍 구간 계산 (정렬된 DataFrame 기준)
n_points = len(df_orig_slice)
if n_points >= 72:
typhoon_center = int(n_points * 0.7) # 70% 지점
typhoon_duration = min(24, n_points // 3) # 최대 2시간
else:
typhoon_center = n_points // 2
typhoon_duration = min(12, n_points // 2) # 최대 1시간
start_idx = max(0, typhoon_center - typhoon_duration // 2)
end_idx = min(n_points, typhoon_center + typhoon_duration // 2)
print(f" 📍 시각화 태풍 구간: {start_idx}-{end_idx}")
# 모든 서브플롯에 배경 영역 추가
try:
for row in range(1, 4):
for col in range(1, 3):
fig.add_vrect(
x0=start_idx, x1=end_idx,
fillcolor="rgba(255,0,0,0.15)",
layer="below", line_width=0,
annotation_text="🌀 태풍 구간",
annotation_position="top left",
row=row, col=col
)
except Exception as e:
print(f"태풍 구간 하이라이트 오류: {e}")
add_scenario_highlight(scenario_name)
# 레이아웃 개선
fig.update_layout(
title={
'text': f"🌪️ 노이즈 시나리오: {scenario_name}",
'x': 0.5,
'font': {'size': 20, 'color': '#2E86AB'}
},
height=900,
showlegend=True,
legend=dict(
x=0.02, # 왼쪽으로 이동
y=0.98, # 위쪽으로 이동
bgcolor='rgba(255,255,255,0.8)',
bordercolor='gray',
borderwidth=1,
font=dict(size=12)
),
plot_bgcolor='rgba(248,249,250,0.8)',
paper_bgcolor='white'
)
# X축 레이블 개선
fig.update_xaxes(title_text="시간 순서", showgrid=True, gridcolor='lightgray')
fig.update_yaxes(showgrid=True, gridcolor='lightgray')
return fig
def apply_noise_scenario(df, scenario_type, intensity=1.0):
"""
선택된 노이즈 시나리오 적용 (144개 슬라이싱 후)
"""
scenario_functions = {
'typhoon': generate_typhoon_scenario,
'sensor_malfunction': generate_sensor_malfunction_scenario,
'burst_missing': generate_burst_missing_scenario,
'extreme_weather': generate_extreme_weather_scenario
}
if scenario_type not in scenario_functions:
raise ValueError(f"Unknown scenario type: {scenario_type}")
print(f"\n🌪️ {scenario_type} 시나리오 적용 중...")
# 1. 먼저 마지막 144개로 슬라이싱 (실제 모델 입력과 동일)
print(f"📊 원본 데이터: {len(df)}행")
df_sliced = df.tail(144).copy()
print(f"✂️ 슬라이싱 후: {len(df_sliced)}행 (마지막 144개)")
# 2. 슬라이싱된 데이터에 노이즈 적용
df_noisy = scenario_functions[scenario_type](df_sliced, intensity)
# 3. 비교 시각화 생성 (슬라이싱된 원본 vs 노이즈)
plot = create_noise_comparison_plot(df_sliced, df_noisy, scenario_type)
return df_noisy, plot |