How to Create an Automatic Code Comment Generator using Deep Learning!
A tutorial for automatically generate code comments using Deep Learning.
- About
- Collecting, preparing and exploring the data
- Defining your model
- Evaluate your model
- Conclusion
- CodeSearchNet citation
In this post, you will be create a Deep Learning model that given a piece of code, will automatically generate a comment describing (hopefully 🤞) what the piece of code does. This post will focus on Java code, however, the same approach should be able to be applied to other programming languages such as Python or Javascript.
Collecting, preparing and exploring the data
You will be using the CodeSearchNet Challenge dataset from GitHub as it provides a large collection of clean code in multiple different languages. They have a really nice example on how to download and read in the data in their repo that you'll use to get started.
! wget https://s3.amazonaws.com/code-search-net/CodeSearchNet/v2/java.zip
! unzip java.zip
jsonl_list_to_dataframe
method is directly from the CodeSearchNet Challenge example code and get_dfs
is just a helper for you to properly grab the data into the correct training, validation, and testing splits. Let's see what your data looks like :D!
def jsonl_list_to_dataframe(file_list, columns=None):
"""Load a list of jsonl.gz files into a pandas DataFrame."""
return pd.concat([pd.read_json(f,
orient='records',
compression='gzip',
lines=True)[columns]
for f in file_list], sort=False)
def get_dfs(path):
"""Grabs the different data splits and converts them into dataframes"""
dfs = []
for split in ["train", "valid", "test"]:
files = sorted((path/split).glob("**/*.gz"))
df = jsonl_list_to_dataframe(files, ["code", "docstring"])
dfs.append(df)
return dfs
df_trn, df_val, df_tst = get_dfs(path/"java/final/jsonl")
df_trn.head()
You are going to only use a small subset of the data in order to train your model in a reasonable time. However, if you want to adjust the amount of data used you can just adjust the sample size.
sample = 0.2
df_trn = df_trn.sample(frac = sample)
df_val = df_val.sample(frac = sample)
df_tst = df_tst.sample(frac = sample)
Awesome! Now that you have the data, there's a few other preprocessing steps you need to perform. First we are going to remove any non-english comments. Next, you will also remove the JavaDocs, i.e., any line with an @ symbol or curly braces, as that will significantlly lessen the amount of learning your model will have to do. This also works out well since the JavaDoc syntax can usually be autogenerated from the method's signature.
# From https://stackoverflow.com/a/27084708/5768407
def isASCII(s):
try:
s.encode(encoding='utf-8').decode('ascii')
except UnicodeDecodeError:
return False
else:
return True
df_trn = df_trn[df_trn['docstring'].apply(lambda x: isASCII(x))]
df_val = df_val[df_val['docstring'].apply(lambda x: isASCII(x))]
df_tst = df_tst[df_tst['docstring'].apply(lambda x: isASCII(x))]
def filter_jdocs(df):
methods = []
comments = []
for i, row in progress_bar(list(df.iterrows())):
comment = row["docstring"]
# Remove {} text in comments from https://stackoverflow.com/questions/14596884/remove-text-between-and-in-python/14598135
comment = re.sub("([\{\[]).*?([\)\}])", '', comment)
cleaned = []
for line in comment.split('\n'):
if "@" in line: break
cleaned.append(line)
comments.append('\n'.join(cleaned))
methods.append(row["code"])
new_df = pd.DataFrame(zip(methods, comments), columns = ["code", "docstring"])
return new_df
df_trn = filter_jdocs(df_trn);
df_val = filter_jdocs(df_val);
df_tst = filter_jdocs(df_tst);
Now you are going to remove any empty comments or duplicate comments for your datasets.
df_trn = df_trn[~(df_trn['docstring'] == '')]
df_val = df_val[~(df_val['docstring'] == '')]
df_tst = df_tst[~(df_tst['docstring'] == '')]
df_trn = df_trn[~df_trn['docstring'].duplicated()]
df_val = df_val[~df_val['docstring'].duplicated()]
df_tst = df_tst[~df_tst['docstring'].duplicated()]
Not bad, still leaves you with plenty of data to learn with!
len(df_trn), len(df_val), len(df_tst)
Exploring your data!
As a good machine learning practitioner, it is extremely important to be careful with your data. This includes checking for biases, duplicates, and also describing the data that you have. Not doing so is setting you up for disaster. I have personally experienced such travesty when working on one of my own research projects where I forgot to check for duplicates before splitting my data. Sadly for me and all my restless nights working on the project, the data was full of duplicates and so my test set was contaminated with data points from my training set, which lead to inflated evaluation metrics :(.
Always explore your data!
You'll be doing some basic descriptive statistics for this Exploratory Data Analysis (EDA), which just means calculating some means, medians, and standard deviations for different views of your data. The first view you will be exploring is the tokens that make up your code and comments. To tokenize your data into these tokens you will use something called Byte Pair Encoding, which has shown great results for tokenizing both natural language and code as shown in Karampatsis and Sutton's paper "Maybe Deep Neural Networks are the Best Choice for Modeling Source Code."
A great resource on learning more about how Byte Pair Encoding works is this blog post by Akashdeep Singh Jaswal and this Youtube video by Christopher Manning. Specifically, you will be using the awesome library by Google called sentencepiece.
def df_to_txt_file(df, output, col):
"""Converts a dataframe and converts it into a text file that SentencePiece can use to train a BPE model"""
with open(output/'text.txt', 'w') as f:
f.write('\n'.join(list(df[col])))
return output/'text.txt'
def gen_sp_model(df, output, tokenizer_name, col):
"""Trains a SentencePiece BPE model from a pandas dataframe"""
fname = df_to_txt_file(df, output, col)
sp.SentencePieceTrainer.train(f'--input={fname} --model_prefix={output / tokenizer_name} --hard_vocab_limit=false')
To use Byte Pair Encoding, you have to train the tokenizer on your data. However, no need to train your BPE on all of your data, so you will just be doing it on a subset (10%) of your training data. You are picking to train the BPE model from your training set to not perform any inadvertant data snooping by biasing your BPE model to tokenize more common words in your validation or testing set. This also will help show that you are indeed solving the out of vocabulary problem because you will most likely encounter words in your testing set that were not in your training set.
p_bpe = 0.1
method_tokenizer = "method_bpe"
gen_sp_model(df_trn.sample(frac = p_bpe), path, method_tokenizer, col = "code")
comment_tokenizer = "comment_bpe"
gen_sp_model(df_trn.sample(frac = p_bpe), path, comment_tokenizer, col = "docstring")
Now that you have the ability to tokenize your text, let us explore! First you will just generate the frequency of each of your tokens and while you are at it, let's collect how long your methods are by via the common software metric Lines of Code (LOC).
def get_counter_and_lens(df, spm, col):
toks = []
locs = []
for i, row in progress_bar(list(df.iterrows())):
toks.extend(spm.EncodeAsPieces(row[col]))
locs.append(len(row[col].split('\n')))
cnt = Counter()
for tok in progress_bar(toks):
cnt[tok] += 1
return list(map(len, toks)), cnt, locs
code_lens, code_cnt, locs = get_counter_and_lens(df_trn, method_spm, 'code')
comment_lens, comment_cnt, _ = get_counter_and_lens(df_trn, method_spm, 'docstring')
def plot_counts(counts, top_k = 30):
labels, values = zip(*counts.most_common()[:top_k])
indexes = np.arange(len(labels))
width = 1
plt.figure(num=None, figsize=(22, 4), dpi=60, facecolor='w', edgecolor='k')
plt.bar(indexes, values, width)
plt.xticks(indexes + width * 0.5, labels)
plt.show()
plot_counts(code_cnt, top_k = 30)
plot_counts(comment_cnt, top_k = 30)
Plotting your frequencies as a bar chart you start to see a nice picture of your data. Not that suprising, but the most common token happens to be the period and some other common syntactical tokens like curly braces and also key words like if and return.
def plot_hist(lens, n_bins = 50):
n, bins, patches = plt.hist(lens, n_bins, facecolor='blue', alpha=0.9)
plt.show()
print(mean(code_lens), median(code_lens), stdev(code_lens))
plot_hist(code_lens)
print(mean(locs), median(locs), stdev(locs))
plot_hist(locs)
print(mean(comment_lens), median(comment_lens), stdev(comment_lens))
plot_hist(comment_lens)
As you can see, there are HTML elements left with < and > occurring quite often in your comments dataset that may make it harder for your model to learn to generate the comments that contain those elements. Luckily for us, it won't really affect your models accuracy, but exploring your data like this does allow us to see how your data may be influencing your model.
TODO For You: Perform some further cleaning steps to remove HTML and any other cleaning you deem necessary and see how your performance changes.
Loading the data using FastAI
Now that you have the data processed and cleaned you need a way to get it into the format that FastAI uses. To do that you will use some code from Rachel Thomas' awesome course on NLP, which allows us to create a Sequence to Sequence (since you are going from the sequence of code to the sequence of the code's docstring) DataBunch (this is just the format FastAI uses for managing loading the data into memory for training and evaluating).
def seq2seq_collate(samples, pad_idx=1, pad_first=True, backwards=False):
"Function that collect samples and adds padding. Flips token order if needed"
samples = to_data(samples)
max_len_x,max_len_y = max([len(s[0]) for s in samples]),max([len(s[1]) for s in samples])
res_x = torch.zeros(len(samples), max_len_x).long() + pad_idx
res_y = torch.zeros(len(samples), max_len_y).long() + pad_idx
if backwards: pad_first = not pad_first
for i,s in enumerate(samples):
if pad_first:
res_x[i,-len(s[0]):],res_y[i,-len(s[1]):] = LongTensor(s[0]),LongTensor(s[1])
else:
res_x[i, :len(s[0])],res_y[i, :len(s[1])] = LongTensor(s[0]),LongTensor(s[1])
if backwards: res_x,res_y = res_x.flip(1),res_y.flip(1)
return res_x, res_y
class Seq2SeqDataBunch(TextDataBunch):
"Create a `TextDataBunch` suitable for training an RNN classifier."
@classmethod
def create(cls, train_ds, valid_ds, test_ds=None, path='.', bs=32, val_bs=None, pad_idx=1,
dl_tfms=None, pad_first=False, device=None, no_check=False, backwards=False, **dl_kwargs):
"Function that transform the `datasets` in a `DataBunch` for classification. Passes `**dl_kwargs` on to `DataLoader()`"
datasets = cls._init_ds(train_ds, valid_ds, test_ds)
val_bs = ifnone(val_bs, bs)
collate_fn = partial(seq2seq_collate, pad_idx=pad_idx, pad_first=pad_first, backwards=backwards)
train_sampler = SortishSampler(datasets[0].x, key=lambda t: len(datasets[0][t][0].data), bs=bs//2)
train_dl = DataLoader(datasets[0], batch_size=bs, sampler=train_sampler, drop_last=True, **dl_kwargs)
dataloaders = [train_dl]
for ds in datasets[1:]:
lengths = [len(t) for t in ds.x.items]
sampler = SortSampler(ds.x, key=lengths.__getitem__)
dataloaders.append(DataLoader(ds, batch_size=val_bs, sampler=sampler, **dl_kwargs))
return cls(*dataloaders, path=path, device=device, collate_fn=collate_fn, no_check=no_check)
class Seq2SeqTextList(TextList):
_bunch = Seq2SeqDataBunch
_label_cls = TextList
Here is where you are telling FastAI to use your trained BPE models for tokenizing your data. FastAI's tokenizers will also do some additional processing of your text such as lower casing all words, removing repetitions, etc. You can find a full list of the processing FastAI uses here.
method_processor = SPProcessor(
sp_model = path / (method_tokenizer + ".model"),
sp_vocab = path / (method_tokenizer + ".vocab"),
include_eos = True)
comment_processor = SPProcessor(
sp_model = path / (comment_tokenizer + ".model"),
sp_vocab = path / (comment_tokenizer + ".vocab"),
include_eos = True)
Now that you have your BPE model you will generate the DataBunches suitable for your task, which will be the Seq2Seq DataBunch. You will also filter out sequences that your too long so that you can fit everything onto a Google Colab GPU and to not have your training take too long.
def gen_dbs(df_trn, df_val, df_tst, method_processor, comment_processor, bs = 96, max_seq = 128):
is_valid = [False] * len(df_trn) + [True] * len(df_val)
df_merged = pd.concat([df_trn, df_val])
df_merged = pd.DataFrame(zip(df_merged["code"].to_list(), df_merged["docstring"].to_list(), is_valid),
columns = ["code", "docstring", "valid"]
)
db_trn = (Seq2SeqTextList
.from_df(df_merged, path = path, cols='code', processor = method_processor)
.split_from_df(col='valid')
.label_from_df(cols='docstring', label_cls=TextList, processor = comment_processor)
.filter_by_func(lambda x, y: len(x) > max_seq or len(y) > max_seq)
.databunch(bs = bs))
db_tst = (Seq2SeqTextList
.from_df(df_tst, path = path, cols='code', processor = method_processor)
.split_by_rand_pct(valid_pct = 0.01)
.label_from_df(cols='docstring', label_cls=TextList, processor = comment_processor)
.filter_by_func(lambda x, y: len(x) > max_seq or len(y) > max_seq)
.databunch(bs = 16))
return db_trn, db_tst
db_trn, db_tst = gen_dbs(df_trn, df_val, df_tst, method_processor, comment_processor, bs = 96, max_seq = 128)
db_trn.show_batch()
def shift_tfm(b):
x,y = b
y = F.pad(y, (1, 0), value=1)
return [x,y[:,:-1]], y[:,1:]
# Add the necessary shift transformation for training your Transformer model
db_trn.add_tfm(shift_tfm)
db_tst.add_tfm(shift_tfm)
Defining your model
In this example, you will be using the Transformer architecture that was developed by Vaswani et. al.. If you want a better understanding of this model, I highly suggest The Annotated Transformer blog post and the NLP course by Rachel Thomas, which this model code is copied from.
class PositionalEncoding(nn.Module):
"Encode the position with a sinusoid."
def __init__(self, d):
super().__init__()
self.register_buffer('freq', 1 / (10000 ** (torch.arange(0., d, 2.)/d)))
def forward(self, pos):
inp = torch.ger(pos, self.freq)
enc = torch.cat([inp.sin(), inp.cos()], dim=-1)
return enc
class TransformerEmbedding(nn.Module):
"Embedding + positional encoding + dropout"
def __init__(self, vocab_sz, emb_sz, inp_p=0.):
super().__init__()
self.emb_sz = emb_sz
self.embed = embedding(vocab_sz, emb_sz)
self.pos_enc = PositionalEncoding(emb_sz)
self.drop = nn.Dropout(inp_p)
def forward(self, inp):
pos = torch.arange(0, inp.size(1), device=inp.device).float()
return self.drop(self.embed(inp) * math.sqrt(self.emb_sz) + self.pos_enc(pos))
def feed_forward(d_model, d_ff, ff_p=0., double_drop=True):
layers = [nn.Linear(d_model, d_ff), nn.ReLU()]
if double_drop: layers.append(nn.Dropout(ff_p))
return SequentialEx(*layers, nn.Linear(d_ff, d_model), nn.Dropout(ff_p), MergeLayer(), nn.LayerNorm(d_model))
class MultiHeadAttention(nn.Module):
def __init__(self, n_heads, d_model, d_head=None, p=0., bias=True, scale=True):
super().__init__()
d_head = ifnone(d_head, d_model//n_heads)
self.n_heads,self.d_head,self.scale = n_heads,d_head,scale
self.q_wgt,self.k_wgt,self.v_wgt = [nn.Linear(
d_model, n_heads * d_head, bias=bias) for o in range(3)]
self.out = nn.Linear(n_heads * d_head, d_model, bias=bias)
self.drop_att,self.drop_res = nn.Dropout(p),nn.Dropout(p)
self.ln = nn.LayerNorm(d_model)
def forward(self, q, kv, mask=None):
return self.ln(q + self.drop_res(self.out(self._apply_attention(q, kv, mask=mask))))
def create_attn_mat(self, x, layer, bs):
return layer(x).view(bs, x.size(1), self.n_heads, self.d_head
).permute(0, 2, 1, 3)
def _apply_attention(self, q, kv, mask=None):
bs,seq_len = q.size(0),q.size(1)
wq,wk,wv = map(lambda o: self.create_attn_mat(*o,bs),
zip((q,kv,kv),(self.q_wgt,self.k_wgt,self.v_wgt)))
attn_score = wq @ wk.transpose(2,3)
if self.scale: attn_score /= math.sqrt(self.d_head)
if mask is not None:
attn_score = attn_score.float().masked_fill(mask, -float('inf')).type_as(attn_score)
attn_prob = self.drop_att(F.softmax(attn_score, dim=-1))
attn_vec = attn_prob @ wv
return attn_vec.permute(0, 2, 1, 3).contiguous().view(bs, seq_len, -1)
def get_output_mask(inp, pad_idx=1):
return torch.triu(inp.new_ones(inp.size(1),inp.size(1)), diagonal=1)[None,None].byte()
class EncoderBlock(nn.Module):
"Encoder block of a Transformer model."
#Can't use Sequential directly cause more than one input...
def __init__(self, n_heads, d_model, d_head, d_inner, p=0., bias=True, scale=True, double_drop=True):
super().__init__()
self.mha = MultiHeadAttention(n_heads, d_model, d_head, p=p, bias=bias, scale=scale)
self.ff = feed_forward(d_model, d_inner, ff_p=p, double_drop=double_drop)
def forward(self, x, mask=None): return self.ff(self.mha(x, x, mask=mask))
class DecoderBlock(nn.Module):
"Decoder block of a Transformer model."
#Can't use Sequential directly cause more than one input...
def __init__(self, n_heads, d_model, d_head, d_inner, p=0., bias=True, scale=True, double_drop=True):
super().__init__()
self.mha1 = MultiHeadAttention(n_heads, d_model, d_head, p=p, bias=bias, scale=scale)
self.mha2 = MultiHeadAttention(n_heads, d_model, d_head, p=p, bias=bias, scale=scale)
self.ff = feed_forward(d_model, d_inner, ff_p=p, double_drop=double_drop)
def forward(self, x, enc, mask_out=None): return self.ff(self.mha2(self.mha1(x, x, mask_out), enc))
class Transformer(Module):
def __init__(self, inp_vsz, out_vsz, n_layers=6, n_heads=8, d_model=256, d_head=32,
d_inner=1024, p=0.1, bias=True, scale=True, double_drop=True, pad_idx=1):
self.enc_emb = TransformerEmbedding(inp_vsz, d_model, p)
self.dec_emb = TransformerEmbedding(out_vsz, d_model, 0.)
args = (n_heads, d_model, d_head, d_inner, p, bias, scale, double_drop)
self.encoder = nn.ModuleList([EncoderBlock(*args) for _ in range(n_layers)])
self.decoder = nn.ModuleList([DecoderBlock(*args) for _ in range(n_layers)])
self.out = nn.Linear(d_model, out_vsz)
self.out.weight = self.dec_emb.embed.weight
self.pad_idx = pad_idx
def forward(self, inp, out):
mask_out = get_output_mask(out, self.pad_idx)
enc,out = self.enc_emb(inp),self.dec_emb(out)
enc = compose(self.encoder)(enc)
out = compose(self.decoder)(out, enc, mask_out)
return self.out(out)
To evaluate your model you will be using the commonly used BLEU score, which is a measure for determining how closely your model's generated comment is to the real comment of a method. (This code is also copied from the NLP tutorial from Rachel Thomas)
class NGram():
def __init__(self, ngram, max_n=5000): self.ngram,self.max_n = ngram,max_n
def __eq__(self, other):
if len(self.ngram) != len(other.ngram): return False
return np.all(np.array(self.ngram) == np.array(other.ngram))
def __hash__(self): return int(sum([o * self.max_n**i for i,o in enumerate(self.ngram)]))
def get_grams(x, n, max_n=5000):
return x if n==1 else [NGram(x[i:i+n], max_n=max_n) for i in range(len(x)-n+1)]
def get_correct_ngrams(pred, targ, n, max_n=5000):
pred_grams,targ_grams = get_grams(pred, n, max_n=max_n),get_grams(targ, n, max_n=max_n)
pred_cnt,targ_cnt = Counter(pred_grams),Counter(targ_grams)
return sum([min(c, targ_cnt[g]) for g,c in pred_cnt.items()]),len(pred_grams)
class CorpusBLEU(Callback):
def __init__(self, vocab_sz):
self.vocab_sz = vocab_sz
self.name = 'bleu'
def on_epoch_begin(self, **kwargs):
self.pred_len,self.targ_len,self.corrects,self.counts = 0,0,[0]*4,[0]*4
def on_batch_end(self, last_output, last_target, **kwargs):
last_output = last_output.argmax(dim=-1)
for pred,targ in zip(last_output.cpu().numpy(),last_target.cpu().numpy()):
self.pred_len += len(pred)
self.targ_len += len(targ)
for i in range(4):
c,t = get_correct_ngrams(pred, targ, i+1, max_n=self.vocab_sz)
self.corrects[i] += c
self.counts[i] += t
def on_epoch_end(self, last_metrics, **kwargs):
precs = [c/t for c,t in zip(self.corrects,self.counts)]
len_penalty = exp(1 - self.targ_len/self.pred_len) if self.pred_len < self.targ_len else 1
bleu = len_penalty * ((precs[0]*precs[1]*precs[2]*precs[3]) ** 0.25)
return add_metrics(last_metrics, bleu)
n_x_vocab, n_y_vocab = len(db_trn.train_ds.x.vocab.itos), len(db_trn.train_ds.y.vocab.itos)
model = Transformer(n_x_vocab, n_y_vocab, d_model=256)
learn = Learner(db_trn, model, metrics=[accuracy, CorpusBLEU(n_y_vocab)], loss_func = CrossEntropyFlat())
Now you are going to use the awesome Learning Rate finder provided by FastAI, which is based on the awesome paper from Leslie N. Smith "Cyclical Learning Rates for Training Neural Networks". This way you don't have to do a bunch of hyperparameter searching to find the perfect fit.
learn.lr_find()
learn.recorder.plot(suggestion = True)